diff --git a/scripts/run_legacy_worker.py b/scripts/run_legacy_worker.py new file mode 100644 index 0000000..501ac65 --- /dev/null +++ b/scripts/run_legacy_worker.py @@ -0,0 +1,32 @@ +import asyncio + +import concurrent.futures + +from temporalio.worker import Worker + +from activities.tool_activities import dynamic_tool_activity + +from shared.config import get_temporal_client, TEMPORAL_LEGACY_TASK_QUEUE + + +async def main(): + # Create the client + client = await get_temporal_client() + + # Run the worker + with concurrent.futures.ThreadPoolExecutor(max_workers=100) as activity_executor: + worker = Worker( + client, + task_queue=TEMPORAL_LEGACY_TASK_QUEUE, + activities=[ + dynamic_tool_activity, + ], + activity_executor=activity_executor, + ) + + print(f"Starting legacy worker, connecting to task queue: {TEMPORAL_LEGACY_TASK_QUEUE}") + await worker.run() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/shared/config.py b/shared/config.py index c2005ec..282e6d2 100644 --- a/shared/config.py +++ b/shared/config.py @@ -9,6 +9,7 @@ load_dotenv(override=True) TEMPORAL_ADDRESS = os.getenv("TEMPORAL_ADDRESS", "localhost:7233") TEMPORAL_NAMESPACE = os.getenv("TEMPORAL_NAMESPACE", "default") TEMPORAL_TASK_QUEUE = os.getenv("TEMPORAL_TASK_QUEUE", "agent-task-queue") +TEMPORAL_LEGACY_TASK_QUEUE = os.getenv("TEMPORAL_LEGACY_TASK_QUEUE", "agent-task-queue-legacy") # Authentication settings TEMPORAL_TLS_CERT = os.getenv("TEMPORAL_TLS_CERT", "") diff --git a/workflows/tool_workflow.py b/workflows/tool_workflow.py index d05691e..4a4aa7b 100644 --- a/workflows/tool_workflow.py +++ b/workflows/tool_workflow.py @@ -13,6 +13,8 @@ with workflow.unsafe.imports_passed_through(): from prompts.agent_prompt_generators import generate_genai_prompt from models.data_types import CombinedInput, ToolWorkflowParams, ToolPromptInput +from shared.config import TEMPORAL_LEGACY_TASK_QUEUE + # Constants MAX_TURNS_BEFORE_CONTINUE = 250 TOOL_ACTIVITY_START_TO_CLOSE_TIMEOUT = timedelta(seconds=10) @@ -47,10 +49,13 @@ class ToolWorkflow: """Execute a tool after confirmation and handle its result.""" workflow.logger.info(f"Confirmed. Proceeding with tool: {current_tool}") + task_queue = TEMPORAL_LEGACY_TASK_QUEUE if current_tool == "BookTrain" else None + try: dynamic_result = await workflow.execute_activity( current_tool, tool_data["args"], + task_queue=task_queue, schedule_to_close_timeout=TOOL_ACTIVITY_SCHEDULE_TO_CLOSE_TIMEOUT, start_to_close_timeout=TOOL_ACTIVITY_START_TO_CLOSE_TIMEOUT, retry_policy=RetryPolicy(