Merge branch 'ecommerce' of https://github.com/joshmsmith/temporal-ai-agent into ecommerce

This commit is contained in:
Laine
2025-04-17 09:19:33 -04:00
20 changed files with 387 additions and 90 deletions

4
.gitignore vendored
View File

@@ -31,4 +31,6 @@ coverage.xml
# PyCharm / IntelliJ settings # PyCharm / IntelliJ settings
.idea/ .idea/
.env .env
.env*

View File

@@ -47,3 +47,5 @@ See [the guide to adding goals and tools](./adding-goals-and-tools.md) for more
## For Temporal SAs ## For Temporal SAs
Check out the [slides](https://docs.google.com/presentation/d/1wUFY4v17vrtv8llreKEBDPLRtZte3FixxBUn0uWy5NU/edit#slide=id.g3333e5deaa9_0_0) here and the enablement guide here (TODO). Check out the [slides](https://docs.google.com/presentation/d/1wUFY4v17vrtv8llreKEBDPLRtZte3FixxBUn0uWy5NU/edit#slide=id.g3333e5deaa9_0_0) here and the enablement guide here (TODO).

View File

@@ -4,9 +4,10 @@ The agent is set up to allow for multiple goals and to switch back to choosing a
It may be helpful to review the [architecture](./architecture.md) for a guide and definition of goals, tools, etc. It may be helpful to review the [architecture](./architecture.md) for a guide and definition of goals, tools, etc.
## Adding a New Goal Category ## Adding a New Goal Category
Goal Categories lets you pick which groups of goals to show. Set via an .env setting, GOAL_CATEGORIES. Goal Categories lets you pick which groups of goals to show. Set via an .env setting, `GOAL_CATEGORIES`.
Even if you don't intend to use the goal in a multi-goal scenario, goal categories are useful for others.
1. Pick a unique one that has some business meaning 1. Pick a unique one that has some business meaning
2. Use it in your .env file 2. Use it in your [.env](./.env) file
3. Add to [.env.example](./.env.example) 3. Add to [.env.example](./.env.example)
4. Use it in your Goal definition, see below. 4. Use it in your Goal definition, see below.
@@ -35,7 +36,7 @@ tools=[
## Adding Tools ## Adding Tools
### Optional Tools ### Note on Optional Tools
Tools can be optional - you can indicate this in the tool listing of goal description (see above section re: goal registry) by adding something like, "This step is optional and can be skipped by moving to the next tool." Here is an example from an older iteration of the `goal_hr_schedule_pto` goal, when it was going to have an optional step to check for existing calendar conflicts: Tools can be optional - you can indicate this in the tool listing of goal description (see above section re: goal registry) by adding something like, "This step is optional and can be skipped by moving to the next tool." Here is an example from an older iteration of the `goal_hr_schedule_pto` goal, when it was going to have an optional step to check for existing calendar conflicts:
``` ```
@@ -84,4 +85,15 @@ There are three ways to manage confirmation of tool runs:
), ),
``` ```
If you really want to wait for user confirmation, record it on the workflow (as a Signal) and not rely on the LLM to probably get it, use option #3. If you really want to wait for user confirmation, record it on the workflow (as a Signal) and not rely on the LLM to probably get it, use option #3.
I recommend exploring all three. For a demo, I would decide if you want the Arguments confirmation in the UI, and if not I'd generally go with option #2 but use #3 for tools that make business sense to confirm, e.g. those tools that take action/write data. I recommend exploring all three. For a demo, I would decide if you want the Arguments confirmation in the UI, and if not I'd generally go with option #2 but use #3 for tools that make business sense to confirm, e.g. those tools that take action/write data.
## Add a Goal & Tools Checklist
[ ] Add goal in [/tools/goal_registry.py](tools/goal_registry.py) <br />
- [ ] If a new category, add Goal Category to [.env](./.env) and [.env.example](./.env.example) <br />
- [ ] don't forget the goal list at the bottom of the [goal_registry.py](tools/goal_registry.py) <br />
[ ] Add Tools listed in the Goal Registry to the [tool_registry.py](tools/tool_registry.py) <br />
[ ] Define your tools as Activities in `/tools` <br />
[ ] Add your tools to [tool list](tools/__init__.py) in the tool get_handler() <br />
And that's it! Happy AI Agent building!

View File

@@ -2,6 +2,7 @@ using System.Net.Http.Json;
using System.Text.Json; using System.Text.Json;
using Temporalio.Activities; using Temporalio.Activities;
using TrainSearchWorker.Models; using TrainSearchWorker.Models;
using Microsoft.Extensions.Logging;
namespace TrainSearchWorker.Activities; namespace TrainSearchWorker.Activities;
@@ -23,6 +24,7 @@ public class TrainActivities
[Activity] [Activity]
public async Task<JourneyResponse> SearchTrains(SearchTrainsRequest request) public async Task<JourneyResponse> SearchTrains(SearchTrainsRequest request)
{ {
ActivityExecutionContext.Current.Logger.LogInformation($"SearchTrains from {request.From} to {request.To}");
var response = await _client.GetAsync( var response = await _client.GetAsync(
$"api/search?from={Uri.EscapeDataString(request.From)}" + $"api/search?from={Uri.EscapeDataString(request.From)}" +
$"&to={Uri.EscapeDataString(request.To)}" + $"&to={Uri.EscapeDataString(request.To)}" +
@@ -30,17 +32,21 @@ public class TrainActivities
$"&return_time={Uri.EscapeDataString(request.ReturnTime)}"); $"&return_time={Uri.EscapeDataString(request.ReturnTime)}");
response.EnsureSuccessStatusCode(); response.EnsureSuccessStatusCode();
// Deserialize into JourneyResponse rather than List<Journey> // Deserialize into JourneyResponse rather than List<Journey>
var journeyResponse = await response.Content.ReadFromJsonAsync<JourneyResponse>(_jsonOptions) var journeyResponse = await response.Content.ReadFromJsonAsync<JourneyResponse>(_jsonOptions)
?? throw new InvalidOperationException("Received null response from API"); ?? throw new InvalidOperationException("Received null response from API");
ActivityExecutionContext.Current.Logger.LogInformation("SearchTrains completed");
return journeyResponse; return journeyResponse;
} }
[Activity] [Activity]
public async Task<BookTrainsResponse> BookTrains(BookTrainsRequest request) public async Task<BookTrainsResponse> BookTrains(BookTrainsRequest request)
{ {
ActivityExecutionContext.Current.Logger.LogInformation($"Booking trains with IDs: {request.TrainIds}");
// Build the URL using the train IDs from the request // Build the URL using the train IDs from the request
var url = $"api/book/{Uri.EscapeDataString(request.TrainIds)}"; var url = $"api/book/{Uri.EscapeDataString(request.TrainIds)}";
@@ -52,6 +58,8 @@ public class TrainActivities
var bookingResponse = await response.Content.ReadFromJsonAsync<BookTrainsResponse>(_jsonOptions) var bookingResponse = await response.Content.ReadFromJsonAsync<BookTrainsResponse>(_jsonOptions)
?? throw new InvalidOperationException("Received null response from API"); ?? throw new InvalidOperationException("Received null response from API");
ActivityExecutionContext.Current.Logger.LogInformation("BookTrains completed");
return bookingResponse; return bookingResponse;
} }

View File

@@ -2,10 +2,19 @@ using Microsoft.Extensions.DependencyInjection;
using Temporalio.Client; using Temporalio.Client;
using Temporalio.Worker; using Temporalio.Worker;
using TrainSearchWorker.Activities; using TrainSearchWorker.Activities;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Console;
// Set up dependency injection // Set up dependency injection
var services = new ServiceCollection(); var services = new ServiceCollection();
var loggerFactory = LoggerFactory.Create(builder =>
{
builder
.AddSimpleConsole(options => options.TimestampFormat = "[HH:mm:ss] ")
.SetMinimumLevel(LogLevel.Information);
});
// Add HTTP client // Add HTTP client
services.AddHttpClient("TrainApi", client => services.AddHttpClient("TrainApi", client =>
{ {
@@ -31,7 +40,10 @@ Console.WriteLine($"Connecting to Temporal at address: {address}");
Console.WriteLine($"Using namespace: {ns}"); Console.WriteLine($"Using namespace: {ns}");
// Create worker options // Create worker options
var options = new TemporalWorkerOptions("agent-task-queue-legacy"); var options = new TemporalWorkerOptions("agent-task-queue-legacy")
{
LoggerFactory = loggerFactory
};
// Register activities // Register activities
var activities = serviceProvider.GetRequiredService<TrainActivities>(); var activities = serviceProvider.GetRequiredService<TrainActivities>();

View File

@@ -7,6 +7,7 @@
</PropertyGroup> </PropertyGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="Microsoft.Extensions.Logging.Console" Version="9.0.4" />
<PackageReference Include="Temporalio" Version="1.0.0" /> <PackageReference Include="Temporalio" Version="1.0.0" />
<PackageReference Include="Microsoft.Extensions.Http" Version="8.0.0" /> <PackageReference Include="Microsoft.Extensions.Http" Version="8.0.0" />
</ItemGroup> </ItemGroup>

View File

@@ -1,5 +1,5 @@
[tool.poetry] [tool.poetry]
name = "temporal-AI-agent" name = "temporal_AI_agent"
version = "0.1.0" version = "0.1.0"
description = "Temporal AI Agent" description = "Temporal AI Agent"
license = "MIT" license = "MIT"
@@ -13,7 +13,13 @@ packages = [
] ]
[tool.poetry.urls] [tool.poetry.urls]
"Bug Tracker" = "https://github.com/temporalio/samples-python/issues" "Bug Tracker" = "https://github.com/temporal-community/temporal-ai-agent/issues"
[tool.poe.tasks]
format = [{cmd = "black ."}, {cmd = "isort ."}]
lint = [{cmd = "black --check ."}, {cmd = "isort --check-only ."}, {ref = "lint-types" }]
lint-types = "mypy --check-untyped-defs --namespace-packages ."
test = "pytest"
[tool.poetry.dependencies] [tool.poetry.dependencies]
python = ">=3.10,<4.0" python = ">=3.10,<4.0"
@@ -37,9 +43,16 @@ gtfs-kit = "^10.1.1"
[tool.poetry.group.dev.dependencies] [tool.poetry.group.dev.dependencies]
pytest = "^7.3" pytest = "^7.3"
pytest-asyncio = "^0.18.3"
black = "^23.7" black = "^23.7"
isort = "^5.12" isort = "^5.12"
[build-system] [build-system]
requires = ["poetry-core>=1.4.0"] requires = ["poetry-core>=1.4.0"]
build-backend = "poetry.core.masonry.api" build-backend = "poetry.core.masonry.api"
[tool.pytest.ini_options]
asyncio_mode = "auto"
log_cli = true
log_cli_level = "INFO"
log_cli_format = "%(asctime)s [%(levelname)8s] %(message)s (%(filename)s:%(lineno)s)"

View File

@@ -1,8 +1,8 @@
from tools.search_events import find_events from tools.search_flights import search_flights
import json import json
# Example usage # Example usage
if __name__ == "__main__": if __name__ == "__main__":
search_args = {"city": "Sydney", "month": "July"} search_args = {"city": "Sydney", "month": "July"}
results = find_events(search_args) results = search_flights(search_args)
print(json.dumps(results, indent=2)) print(json.dumps(results, indent=2))

View File

@@ -192,7 +192,7 @@ dotnet run
``` ```
If you're running your train API above on a different host/port then change the API URL in `Program.cs`. Otherwise, be sure to run it using `python thirdparty/train_api.py`. If you're running your train API above on a different host/port then change the API URL in `Program.cs`. Otherwise, be sure to run it using `python thirdparty/train_api.py`.
#### Goals: FIN/Money Movement #### Goals: FIN - Money Movement and Loan Application
Make sure you have the mock users you want (such as yourself) in [the account mock data file](./tools/data/customer_account_data.json). Make sure you have the mock users you want (such as yourself) in [the account mock data file](./tools/data/customer_account_data.json).
- `AGENT_GOAL=goal_fin_move_money` - This scenario _can_ initiate a secondary workflow to move money. Check out [this repo](https://github.com/temporal-sa/temporal-money-transfer-java) - you'll need to get the worker running and connected to the same account as the agentic worker. - `AGENT_GOAL=goal_fin_move_money` - This scenario _can_ initiate a secondary workflow to move money. Check out [this repo](https://github.com/temporal-sa/temporal-money-transfer-java) - you'll need to get the worker running and connected to the same account as the agentic worker.
@@ -200,6 +200,12 @@ By default it will _not_ make a real workflow, it'll just fake it. If you get th
```bash ```bash
FIN_START_REAL_WORKFLOW=FALSE #set this to true to start a real workflow FIN_START_REAL_WORKFLOW=FALSE #set this to true to start a real workflow
``` ```
- `AGENT_GOAL=goal_fin_loan_application` - This scenario _can_ initiate a secondary workflow to apply for a loan. Check out [this repo](https://github.com/temporal-sa/temporal-latency-optimization-scenarios) - you'll need to get the worker running and connected to the same account as the agentic worker.
By default it will _not_ make a real workflow, it'll just fake it. If you get the worker running and want to start a workflow, in your [.env](./.env):
```bash
FIN_START_REAL_WORKFLOW=FALSE #set this to true to start a real workflow
```
#### Goals: HR/PTO #### Goals: HR/PTO
Make sure you have the mock users you want in (such as yourself) in [the PTO mock data file](./tools/data/employee_pto_data.json). Make sure you have the mock users you want in (such as yourself) in [the PTO mock data file](./tools/data/employee_pto_data.json).

0
tests/__init__.py Normal file
View File

View File

@@ -1,53 +0,0 @@
import asyncio
from temporalio.client import Client, WorkflowExecutionStatus
from temporalio.worker import Worker
from temporalio.testing import TestWorkflowEnvironment
from api.main import get_initial_agent_goal
from models.data_types import AgentGoalWorkflowParams, CombinedInput
from workflows.agent_goal_workflow import AgentGoalWorkflow
from activities.tool_activities import ToolActivities, dynamic_tool_activity
async def asyncSetUp(self):
# Set up the test environment
self.env = await TestWorkflowEnvironment.create_local()
async def asyncTearDown(self):
# Clean up after tests
await self.env.shutdown()
async def test_flight_booking(client: Client):
task_queue_name = "agent-ai-workflow"
workflow_id = "agent-workflow"
initial_agent_goal = get_initial_agent_goal()
# Create combined input
combined_input = CombinedInput(
tool_params=AgentGoalWorkflowParams(None, None),
agent_goal=initial_agent_goal,
)
workflow_id = "agent-workflow"
async with Worker(client, task_queue=task_queue_name, workflows=[AgentGoalWorkflow], activities=[ToolActivities.agent_validatePrompt, ToolActivities.agent_toolPlanner, dynamic_tool_activity]):
# todo set goal categories for scenarios
handle = await client.start_workflow(
AgentGoalWorkflow.run, id=workflow_id, task_queue=task_queue_name
)
# todo send signals based on
await handle.signal(AgentGoalWorkflow.user_prompt, "book flights")
await handle.signal(AgentGoalWorkflow.user_prompt, "sydney in september")
assert WorkflowExecutionStatus.RUNNING == (await handle.describe()).status
#assert ["Hello, user1", "Hello, user2"] == await handle.result()
await handle.signal(AgentGoalWorkflow.user_prompt, "I'm all set, end conversation")
assert WorkflowExecutionStatus.COMPLETED == (await handle.describe()).status

55
tests/conftest.py Normal file
View File

@@ -0,0 +1,55 @@
import asyncio
import multiprocessing
import sys
from typing import AsyncGenerator
import pytest
import pytest_asyncio
from temporalio.client import Client
from temporalio.testing import WorkflowEnvironment
# Due to https://github.com/python/cpython/issues/77906, multiprocessing on
# macOS starting with Python 3.8 has changed from "fork" to "spawn". For
# pre-3.8, we are changing it for them.
if sys.version_info < (3, 8) and sys.platform.startswith("darwin"):
multiprocessing.set_start_method("spawn", True)
def pytest_addoption(parser):
parser.addoption(
"--workflow-environment",
default="local",
help="Which workflow environment to use ('local', 'time-skipping', or target to existing server)",
)
@pytest.fixture(scope="session")
def event_loop():
# See https://github.com/pytest-dev/pytest-asyncio/issues/68
# See https://github.com/pytest-dev/pytest-asyncio/issues/257
# Also need ProactorEventLoop on older versions of Python with Windows so
# that asyncio subprocess works properly
if sys.version_info < (3, 8) and sys.platform == "win32":
loop = asyncio.ProactorEventLoop()
else:
loop = asyncio.get_event_loop_policy().new_event_loop()
yield loop
loop.close()
@pytest_asyncio.fixture(scope="session")
async def env(request) -> AsyncGenerator[WorkflowEnvironment, None]:
env_type = request.config.getoption("--workflow-environment")
if env_type == "local":
env = await WorkflowEnvironment.start_local()
elif env_type == "time-skipping":
env = await WorkflowEnvironment.start_time_skipping()
else:
env = WorkflowEnvironment.from_client(await Client.connect(env_type))
yield env
await env.shutdown()
@pytest_asyncio.fixture
async def client(env: WorkflowEnvironment) -> Client:
return env.client

View File

@@ -0,0 +1,80 @@
from temporalio.client import Client, WorkflowExecutionStatus
from temporalio.worker import Worker
import concurrent.futures
from temporalio.testing import WorkflowEnvironment
from api.main import get_initial_agent_goal
from models.data_types import AgentGoalWorkflowParams, CombinedInput
from workflows.agent_goal_workflow import AgentGoalWorkflow
from activities.tool_activities import ToolActivities, dynamic_tool_activity
from unittest.mock import patch
from dotenv import load_dotenv
import os
from contextlib import contextmanager
@contextmanager
def my_context():
print("Setup")
yield "some_value" # Value assigned to 'as' variable
print("Cleanup")
async def test_flight_booking(client: Client):
#load_dotenv("test_flights_single.env")
with my_context() as value:
print(f"Working with {value}")
# Create the test environment
#env = await WorkflowEnvironment.start_local()
#client = env.client
task_queue_name = "agent-ai-workflow"
workflow_id = "agent-workflow"
with concurrent.futures.ThreadPoolExecutor(max_workers=100) as activity_executor:
worker = Worker(
client,
task_queue=task_queue_name,
workflows=[AgentGoalWorkflow],
activities=[ToolActivities.agent_validatePrompt, ToolActivities.agent_toolPlanner, ToolActivities.get_wf_env_vars, dynamic_tool_activity],
activity_executor=activity_executor,
)
async with worker:
initial_agent_goal = get_initial_agent_goal()
# Create combined input
combined_input = CombinedInput(
tool_params=AgentGoalWorkflowParams(None, None),
agent_goal=initial_agent_goal,
)
prompt="Hello!"
#async with Worker(client, task_queue=task_queue_name, workflows=[AgentGoalWorkflow], activities=[ToolActivities.agent_validatePrompt, ToolActivities.agent_toolPlanner, dynamic_tool_activity]):
# todo set goal categories for scenarios
handle = await client.start_workflow(
AgentGoalWorkflow.run,
combined_input,
id=workflow_id,
task_queue=task_queue_name,
start_signal="user_prompt",
start_signal_args=[prompt],
)
# todo send signals to simulate user input
# await handle.signal(AgentGoalWorkflow.user_prompt, "book flights") # for multi-goal
await handle.signal(AgentGoalWorkflow.user_prompt, "sydney in september")
assert WorkflowExecutionStatus.RUNNING == (await handle.describe()).status
#assert ["Hello, user1", "Hello, user2"] == await handle.result()
await handle.signal(AgentGoalWorkflow.user_prompt, "I'm all set, end conversation")
#assert WorkflowExecutionStatus.COMPLETED == (await handle.describe()).status
result = await handle.result()
#todo dump workflow history for analysis optional
#todo assert result is good

24
todo.md
View File

@@ -1,15 +1,5 @@
# todo list # todo list
[ ] goal change management tweaks <br />
- [x] maybe make the choose_Agent_goal tag not be system/not always included? <br />
- [x] try taking out list-agents as a tool because agent_prompt_generators may do it for you <br />
- [x] make goal selection not be a system tool but be an option in .env, see how that works, includes taking it out of the goal/toolset for all goals <br />
- [x] test single-goal <br />
- [x] test claude and grok<br />
- [x] document in sample env and docs how to control <br />
[ ] expand [tests](./tests/agent_goal_workflow_test.py)<br /> [ ] expand [tests](./tests/agent_goal_workflow_test.py)<br />
[x] try claude-3-7-sonnet-20250219, see [tool_activities.py](./activities/tool_activities.py) <br />
[x] test Grok with changes
[ ] adding fintech goals <br /> [ ] adding fintech goals <br />
- Fraud Detection and Prevention - The AI monitors transactions across accounts, flagging suspicious activities (e.g., unusual spending patterns or login attempts) and autonomously freezing accounts or notifying customers and compliance teams.<br /> - Fraud Detection and Prevention - The AI monitors transactions across accounts, flagging suspicious activities (e.g., unusual spending patterns or login attempts) and autonomously freezing accounts or notifying customers and compliance teams.<br />
@@ -17,7 +7,21 @@
- Portfolio Management and Rebalancing - The AI monitors a customers investment portfolio, rebalancing it automatically based on market trends, risk tolerance, and financial goals (e.g., shifting assets between stocks, bonds, or crypto).<br /> - Portfolio Management and Rebalancing - The AI monitors a customers investment portfolio, rebalancing it automatically based on market trends, risk tolerance, and financial goals (e.g., shifting assets between stocks, bonds, or crypto).<br />
[ ] new loan/fraud check/update with start <br /> [ ] new loan/fraud check/update with start <br />
[ ] financial advise - args being freeform customer input about their financial situation, goals
[ ] tool is maybe a new tool asking the LLM to advise
[ ] for demo simulate failure - add utilities/simulated failures from pipeline demo <br />
[ ] ecommerce goals <br />
- [ ] add to docs <br />
- [ ] decide about api key names with Laine <br />
[ ] LLM failure->autoswitch: <br />
- detect failure in the activity using failurecount <br />
- activity switches to secondary LLM defined in .env
- activity reports switch to workflow
[ ] for demo simulate failure - add utilities/simulated failures from pipeline demo <br />
[ ] ask the ai agent how it did at the end of the conversation, was it efficient? successful? insert a search attribute to document that before return <br /> [ ] ask the ai agent how it did at the end of the conversation, was it efficient? successful? insert a search attribute to document that before return <br />
- Insight into the agents performance <br /> - Insight into the agents performance <br />

View File

@@ -16,6 +16,7 @@ from .hr.checkpaybankstatus import checkpaybankstatus
from .fin.check_account_valid import check_account_valid from .fin.check_account_valid import check_account_valid
from .fin.get_account_balances import get_account_balance from .fin.get_account_balances import get_account_balance
from .fin.move_money import move_money from .fin.move_money import move_money
from .fin.submit_loan_application import submit_loan_application
from .ecommerce.get_order import get_order from .ecommerce.get_order import get_order
from .ecommerce.track_package import track_package from .ecommerce.track_package import track_package
@@ -57,7 +58,9 @@ def get_handler(tool_name: str):
if tool_name == "FinCheckAccountBalance": if tool_name == "FinCheckAccountBalance":
return get_account_balance return get_account_balance
if tool_name == "FinMoveMoneyOrder": if tool_name == "FinMoveMoneyOrder":
return move_money return move_money
if tool_name == "FinCheckAccountSubmitLoanApproval":
return submit_loan_application
if tool_name == "GetOrder": if tool_name == "GetOrder":
return get_order return get_order
if tool_name == "TrackPackage": if tool_name == "TrackPackage":

View File

@@ -31,7 +31,6 @@ class MoneyMovementWorkflowParameterObj:
# this assumes it's a valid account - use check_account_valid() to verify that first # this assumes it's a valid account - use check_account_valid() to verify that first
async def move_money(args: dict) -> dict: async def move_money(args: dict) -> dict:
print("in move_money")
account_key = args.get("accountkey") account_key = args.get("accountkey")
account_type: str = args.get("accounttype") account_type: str = args.get("accounttype")
amount = args.get("amount") amount = args.get("amount")

View File

@@ -0,0 +1,103 @@
from datetime import date, timedelta
import os
from pathlib import Path
import json
from temporalio.client import (
Client,
WithStartWorkflowOperation,
WorkflowHandle,
WorkflowUpdateFailedError,
)
from temporalio import common
from dataclasses import dataclass
from typing import Optional
import asyncio
from temporalio.exceptions import WorkflowAlreadyStartedError
from shared.config import get_temporal_client
# Define data structures to match the Java workflow's expected input/output
# see https://github.com/temporal-sa/temporal-latency-optimization-scenarios for more details
@dataclass
class TransactionRequest:
amount: float
sourceAccount: str
targetAccount: str
@dataclass
class TxResult:
transactionId: str
status: str
#demonstrate starting a workflow and early return pattern while the workflow continues
async def submit_loan_application(args: dict) -> dict:
account_key = args.get("accountkey")
amount = args.get("amount")
loan_status: dict = await start_workflow(amount=amount,account_name=account_key)
if loan_status.get("error") is None:
return {'status': loan_status.get("loan_application_status"), 'detailed_status': loan_status.get("application_details"), 'next_step': loan_status.get("advisement"), 'confirmation_id': loan_status.get("transaction_id")}
else:
print(loan_status)
return loan_status
# Async function to start workflow
async def start_workflow(amount: str, account_name: str, )-> dict:
# Connect to Temporal
client = await get_temporal_client()
start_real_workflow = os.getenv("FIN_START_REAL_WORKFLOW")
if start_real_workflow is not None and start_real_workflow.lower() == "false":
START_REAL_WORKFLOW = False
return {'loan_application_status': "applied", 'application_details': "loan application is submitted and initial validation is complete",'transaction_id': "APPLICATION"+account_name, 'advisement': "You'll receive a confirmation for final approval in three business days", }
else:
START_REAL_WORKFLOW = True
# Define the workflow ID and task queue
workflow_id = "LOAN_APPLICATION-"+account_name+"-"+date.today().strftime('%Y-%m-%d')
task_queue = "LatencyOptimizationTEST"
# Create a TransactionRequest (matching the Java workflow's expected input)
tx_request = TransactionRequest(
amount=float(amount),
targetAccount=account_name,
sourceAccount=account_name,
)
start_op = WithStartWorkflowOperation(
"TransactionWorkflowLocalBeforeUpdate",
tx_request,
id=workflow_id,
id_conflict_policy=common.WorkflowIDConflictPolicy.USE_EXISTING,
task_queue=task_queue,
)
try:
print("trying update-with-start")
tx_result = TxResult(
await client.execute_update_with_start_workflow(
"returnInitResult",
start_workflow_operation=start_op,
)
)
except WorkflowUpdateFailedError:
print("aww man got exception WorkflowUpdateFailedError" )
tx_result = None
return_msg = "Loan could not be processed for " + account_name
return {"error": return_msg}
workflow_handle = await start_op.workflow_handle()
print(tx_result)
print(f"Update result: Transaction ID = {tx_result.transactionId}, Message = {tx_result.status}")
# Optionally, wait for the workflow to complete and get the final result
# final_result = await handle.result()
# print(f"Workflow completed with result: {final_result}")
# return {'status': loan_status.get("loan_status"), 'detailed_status': loan_status.get("results"), 'next_step': loan_status.get("advisement"), 'confirmation_id': loan_status.get("workflowID")}
return {'loan_application_status': "applied", 'application_details': "loan application is submitted and initial validation is complete",'transaction_id': tx_result.transactionId, 'advisement': "You'll receive a confirmation for final approval in three business days", }

View File

@@ -305,6 +305,7 @@ goal_fin_check_account_balances = AgentGoal(
) )
# this tool checks account balances, and uses ./data/customer_account_data.json as dummy data # this tool checks account balances, and uses ./data/customer_account_data.json as dummy data
# it also uses a separate workflow/tool, see ./setup.md for details
goal_fin_move_money = AgentGoal( goal_fin_move_money = AgentGoal(
id = "goal_fin_move_money", id = "goal_fin_move_money",
category_tag="fin", category_tag="fin",
@@ -322,7 +323,7 @@ goal_fin_move_money = AgentGoal(
starter_prompt=starter_prompt_generic, starter_prompt=starter_prompt_generic,
example_conversation_history="\n ".join( example_conversation_history="\n ".join(
[ [
"user: I'd like transfer some money", "user: I'd like to transfer some money",
"agent: Sure! I can help you out with that. May I have account number and email address?", "agent: Sure! I can help you out with that. May I have account number and email address?",
"user: account number is 11235813", "user: account number is 11235813",
"user_confirmed_tool_run: <user clicks confirm on FincheckAccountIsValid tool>", "user_confirmed_tool_run: <user clicks confirm on FincheckAccountIsValid tool>",
@@ -343,6 +344,34 @@ goal_fin_move_money = AgentGoal(
), ),
) )
# this starts a loan approval process
# it also uses a separate workflow/tool, see ./setup.md for details #todo
goal_fin_loan_application = AgentGoal(
id = "goal_fin_loan_application",
category_tag="fin",
agent_name="Easy Loan Apply",
agent_friendly_description="Initiate loan application.",
tools=[
tool_registry.financial_check_account_is_valid,
tool_registry.financial_submit_loan_approval, #todo
],
description="The user wants to apply for a loan at the financial institution. To assist with that goal, help the user gather args for these tools in order: "
"1. FinCheckAccountIsValid: validate the user's account is valid"
"2. FinCheckAccountSubmitLoanApproval: submit the loan for approval",
starter_prompt=starter_prompt_generic,
example_conversation_history="\n ".join(
[
"user: I'd like to apply for a loan",
"agent: Sure! I can help you out with that. May I have account number for confirmation?",
"user: account number is 11235813",
"user_confirmed_tool_run: <user clicks confirm on FincheckAccountIsValid tool>",
"tool_result: { 'status': account valid }",
"agent: Great! We've validated your account. What will the loan amount be?",
"user: I'd like a loan for $500",
"user_confirmed_tool_run: <user clicks confirm on FinCheckAccountSubmitLoanApproval tool>",
"tool_result: { 'status': submitted, 'detailed_status': loan application is submitted and initial validation is complete, 'confirmation id': 333421, 'next_step': You'll receive a confirmation for final approval in three business days }",
"agent: I have submitted your loan application process and the initial validation is successful. Your application ID is 333421. You'll receive a notification for final approval from us in three business days. "
# ----- E-Commerce Goals --- # ----- E-Commerce Goals ---
#todo: add goal to list all orders for last X amount of time? #todo: add goal to list all orders for last X amount of time?
# this tool checks account balances, and uses ./data/customer_account_data.json as dummy data # this tool checks account balances, and uses ./data/customer_account_data.json as dummy data
@@ -430,5 +459,7 @@ goal_list.append(goal_hr_check_pto)
goal_list.append(goal_hr_check_paycheck_bank_integration_status) goal_list.append(goal_hr_check_paycheck_bank_integration_status)
goal_list.append(goal_fin_check_account_balances) goal_list.append(goal_fin_check_account_balances)
goal_list.append(goal_fin_move_money) goal_list.append(goal_fin_move_money)
goal_list.append(goal_fin_loan_application)
goal_list.append(goal_ecomm_list_orders) goal_list.append(goal_ecomm_list_orders)
goal_list.append(goal_ecomm_order_status) goal_list.append(goal_ecomm_order_status)

View File

@@ -318,6 +318,22 @@ financial_move_money = ToolDefinition(
], ],
) )
financial_submit_loan_approval = ToolDefinition(
name="FinCheckAccountSubmitLoanApproval",
description="Submit a loan application. "
"Returns the loan status. ",
arguments=[
ToolArgument(
name="accountkey",
type="string",
description="email address or account ID of user",
),
ToolArgument(
name="amount",
type="string",
description="amount requested for the loan",
# ----- ECommerce Use Case Tools ----- # ----- ECommerce Use Case Tools -----
ecomm_list_orders = ToolDefinition( ecomm_list_orders = ToolDefinition(
name="ListOrders", name="ListOrders",

View File

@@ -108,7 +108,7 @@ class AgentGoalWorkflow:
conversation_history=self.conversation_history, conversation_history=self.conversation_history,
agent_goal=self.goal, agent_goal=self.goal,
) )
validation_result = await workflow.execute_activity( validation_result = await workflow.execute_activity_method(
ToolActivities.agent_validatePrompt, ToolActivities.agent_validatePrompt,
args=[validation_input], args=[validation_input],
schedule_to_close_timeout=LLM_ACTIVITY_SCHEDULE_TO_CLOSE_TIMEOUT, schedule_to_close_timeout=LLM_ACTIVITY_SCHEDULE_TO_CLOSE_TIMEOUT,
@@ -134,7 +134,7 @@ class AgentGoalWorkflow:
prompt_input = ToolPromptInput(prompt=prompt, context_instructions=context_instructions) prompt_input = ToolPromptInput(prompt=prompt, context_instructions=context_instructions)
# connect to LLM and execute to get next steps # connect to LLM and execute to get next steps
tool_data = await workflow.execute_activity( tool_data = await workflow.execute_activity_method(
ToolActivities.agent_toolPlanner, ToolActivities.agent_toolPlanner,
prompt_input, prompt_input,
schedule_to_close_timeout=LLM_ACTIVITY_SCHEDULE_TO_CLOSE_TIMEOUT, schedule_to_close_timeout=LLM_ACTIVITY_SCHEDULE_TO_CLOSE_TIMEOUT,
@@ -211,7 +211,7 @@ class AgentGoalWorkflow:
#Signal that comes from api/main.py via a post to /confirm #Signal that comes from api/main.py via a post to /confirm
@workflow.signal @workflow.signal
async def confirmed(self) -> None: async def confirm(self) -> None:
"""Signal handler for user confirmation of tool execution.""" """Signal handler for user confirmation of tool execution."""
workflow.logger.info("Received user signal: confirmation") workflow.logger.info("Received user signal: confirmation")
self.confirmed = True self.confirmed = True
@@ -317,8 +317,9 @@ class AgentGoalWorkflow:
async def lookup_wf_env_settings(self, combined_input: CombinedInput)->None: async def lookup_wf_env_settings(self, combined_input: CombinedInput)->None:
env_lookup_input = EnvLookupInput( env_lookup_input = EnvLookupInput(
show_confirm_env_var_name = "SHOW_CONFIRM", show_confirm_env_var_name = "SHOW_CONFIRM",
show_confirm_default = True) show_confirm_default = True,
env_output:EnvLookupOutput = await workflow.execute_activity( )
env_output:EnvLookupOutput = await workflow.execute_activity_method(
ToolActivities.get_wf_env_vars, ToolActivities.get_wf_env_vars,
env_lookup_input, env_lookup_input,
start_to_close_timeout=LLM_ACTIVITY_START_TO_CLOSE_TIMEOUT, start_to_close_timeout=LLM_ACTIVITY_START_TO_CLOSE_TIMEOUT,
@@ -363,9 +364,11 @@ class AgentGoalWorkflow:
# also don't forget you can look at the workflow itself and do queries if you want # also don't forget you can look at the workflow itself and do queries if you want
def print_useful_workflow_vars(self, status_or_step:str) -> None: def print_useful_workflow_vars(self, status_or_step:str) -> None:
print(f"***{status_or_step}:***") print(f"***{status_or_step}:***")
print(f"force confirm? {self.tool_data['force_confirm']}") if self.tool_data:
print(f"next step: {self.tool_data.get('next')}") print(f"force confirm? {self.tool_data['force_confirm']}")
print(f"current_tool: {self.tool_data.get('tool')}") print(f"next step: {self.tool_data.get('next')}")
print(f"self.confirm: {self.confirmed}") print(f"current_tool: {self.tool_data.get('tool')}")
print(f"waiting_for_confirm (about to be set to true): {self.waiting_for_confirm}") else:
print("no tool data initialized yet")
print(f"self.confirmed: {self.confirmed}")