From 95d00d86d62858dff770e6f96593a32fafad3d11 Mon Sep 17 00:00:00 2001 From: Rob Holland Date: Thu, 13 Feb 2025 13:18:38 +0000 Subject: [PATCH 1/2] Add second task queue to make demo easier. Legacy (initially broken) activity will run on this task queue. --- scripts/run_legacy_worker.py | 32 ++++++++++++++++++++++++++++++++ shared/config.py | 1 + workflows/tool_workflow.py | 5 +++++ 3 files changed, 38 insertions(+) create mode 100644 scripts/run_legacy_worker.py diff --git a/scripts/run_legacy_worker.py b/scripts/run_legacy_worker.py new file mode 100644 index 0000000..501ac65 --- /dev/null +++ b/scripts/run_legacy_worker.py @@ -0,0 +1,32 @@ +import asyncio + +import concurrent.futures + +from temporalio.worker import Worker + +from activities.tool_activities import dynamic_tool_activity + +from shared.config import get_temporal_client, TEMPORAL_LEGACY_TASK_QUEUE + + +async def main(): + # Create the client + client = await get_temporal_client() + + # Run the worker + with concurrent.futures.ThreadPoolExecutor(max_workers=100) as activity_executor: + worker = Worker( + client, + task_queue=TEMPORAL_LEGACY_TASK_QUEUE, + activities=[ + dynamic_tool_activity, + ], + activity_executor=activity_executor, + ) + + print(f"Starting legacy worker, connecting to task queue: {TEMPORAL_LEGACY_TASK_QUEUE}") + await worker.run() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/shared/config.py b/shared/config.py index c2005ec..282e6d2 100644 --- a/shared/config.py +++ b/shared/config.py @@ -9,6 +9,7 @@ load_dotenv(override=True) TEMPORAL_ADDRESS = os.getenv("TEMPORAL_ADDRESS", "localhost:7233") TEMPORAL_NAMESPACE = os.getenv("TEMPORAL_NAMESPACE", "default") TEMPORAL_TASK_QUEUE = os.getenv("TEMPORAL_TASK_QUEUE", "agent-task-queue") +TEMPORAL_LEGACY_TASK_QUEUE = os.getenv("TEMPORAL_LEGACY_TASK_QUEUE", "agent-task-queue-legacy") # Authentication settings TEMPORAL_TLS_CERT = os.getenv("TEMPORAL_TLS_CERT", "") diff --git a/workflows/tool_workflow.py b/workflows/tool_workflow.py index d05691e..4a4aa7b 100644 --- a/workflows/tool_workflow.py +++ b/workflows/tool_workflow.py @@ -13,6 +13,8 @@ with workflow.unsafe.imports_passed_through(): from prompts.agent_prompt_generators import generate_genai_prompt from models.data_types import CombinedInput, ToolWorkflowParams, ToolPromptInput +from shared.config import TEMPORAL_LEGACY_TASK_QUEUE + # Constants MAX_TURNS_BEFORE_CONTINUE = 250 TOOL_ACTIVITY_START_TO_CLOSE_TIMEOUT = timedelta(seconds=10) @@ -47,10 +49,13 @@ class ToolWorkflow: """Execute a tool after confirmation and handle its result.""" workflow.logger.info(f"Confirmed. Proceeding with tool: {current_tool}") + task_queue = TEMPORAL_LEGACY_TASK_QUEUE if current_tool == "BookTrain" else None + try: dynamic_result = await workflow.execute_activity( current_tool, tool_data["args"], + task_queue=task_queue, schedule_to_close_timeout=TOOL_ACTIVITY_SCHEDULE_TO_CLOSE_TIMEOUT, start_to_close_timeout=TOOL_ACTIVITY_START_TO_CLOSE_TIMEOUT, retry_policy=RetryPolicy( From 87e22d336057e8e0f56b420b84a6945e8e60b0ee Mon Sep 17 00:00:00 2001 From: Rob Holland Date: Thu, 13 Feb 2025 13:23:06 +0000 Subject: [PATCH 2/2] Use legacy queue for both train APIs. --- workflows/tool_workflow.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/workflows/tool_workflow.py b/workflows/tool_workflow.py index 4a4aa7b..97893e3 100644 --- a/workflows/tool_workflow.py +++ b/workflows/tool_workflow.py @@ -49,7 +49,7 @@ class ToolWorkflow: """Execute a tool after confirmation and handle its result.""" workflow.logger.info(f"Confirmed. Proceeding with tool: {current_tool}") - task_queue = TEMPORAL_LEGACY_TASK_QUEUE if current_tool == "BookTrain" else None + task_queue = TEMPORAL_LEGACY_TASK_QUEUE if current_tool in ["SearchTrain", "BookTrain"] else None try: dynamic_result = await workflow.execute_activity(