Add second task queue to make demo easier.

Legacy (initially broken) activity will run on this task queue.
This commit is contained in:
Rob Holland
2025-02-13 13:18:38 +00:00
parent aeffe75a0a
commit 95d00d86d6
3 changed files with 38 additions and 0 deletions

View File

@@ -0,0 +1,32 @@
import asyncio
import concurrent.futures
from temporalio.worker import Worker
from activities.tool_activities import dynamic_tool_activity
from shared.config import get_temporal_client, TEMPORAL_LEGACY_TASK_QUEUE
async def main():
# Create the client
client = await get_temporal_client()
# Run the worker
with concurrent.futures.ThreadPoolExecutor(max_workers=100) as activity_executor:
worker = Worker(
client,
task_queue=TEMPORAL_LEGACY_TASK_QUEUE,
activities=[
dynamic_tool_activity,
],
activity_executor=activity_executor,
)
print(f"Starting legacy worker, connecting to task queue: {TEMPORAL_LEGACY_TASK_QUEUE}")
await worker.run()
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -9,6 +9,7 @@ load_dotenv(override=True)
TEMPORAL_ADDRESS = os.getenv("TEMPORAL_ADDRESS", "localhost:7233")
TEMPORAL_NAMESPACE = os.getenv("TEMPORAL_NAMESPACE", "default")
TEMPORAL_TASK_QUEUE = os.getenv("TEMPORAL_TASK_QUEUE", "agent-task-queue")
TEMPORAL_LEGACY_TASK_QUEUE = os.getenv("TEMPORAL_LEGACY_TASK_QUEUE", "agent-task-queue-legacy")
# Authentication settings
TEMPORAL_TLS_CERT = os.getenv("TEMPORAL_TLS_CERT", "")

View File

@@ -13,6 +13,8 @@ with workflow.unsafe.imports_passed_through():
from prompts.agent_prompt_generators import generate_genai_prompt
from models.data_types import CombinedInput, ToolWorkflowParams, ToolPromptInput
from shared.config import TEMPORAL_LEGACY_TASK_QUEUE
# Constants
MAX_TURNS_BEFORE_CONTINUE = 250
TOOL_ACTIVITY_START_TO_CLOSE_TIMEOUT = timedelta(seconds=10)
@@ -47,10 +49,13 @@ class ToolWorkflow:
"""Execute a tool after confirmation and handle its result."""
workflow.logger.info(f"Confirmed. Proceeding with tool: {current_tool}")
task_queue = TEMPORAL_LEGACY_TASK_QUEUE if current_tool == "BookTrain" else None
try:
dynamic_result = await workflow.execute_activity(
current_tool,
tool_data["args"],
task_queue=task_queue,
schedule_to_close_timeout=TOOL_ACTIVITY_SCHEDULE_TO_CLOSE_TIMEOUT,
start_to_close_timeout=TOOL_ACTIVITY_START_TO_CLOSE_TIMEOUT,
retry_policy=RetryPolicy(