mirror of
https://github.com/temporal-community/temporal-ai-agent.git
synced 2026-03-15 14:08:08 +01:00
Merge pull request #7 from robholland/rh-legacy-worker
Add second task queue to make demo easier.
This commit is contained in:
32
scripts/run_legacy_worker.py
Normal file
32
scripts/run_legacy_worker.py
Normal file
@@ -0,0 +1,32 @@
|
|||||||
|
import asyncio
|
||||||
|
|
||||||
|
import concurrent.futures
|
||||||
|
|
||||||
|
from temporalio.worker import Worker
|
||||||
|
|
||||||
|
from activities.tool_activities import dynamic_tool_activity
|
||||||
|
|
||||||
|
from shared.config import get_temporal_client, TEMPORAL_LEGACY_TASK_QUEUE
|
||||||
|
|
||||||
|
|
||||||
|
async def main():
|
||||||
|
# Create the client
|
||||||
|
client = await get_temporal_client()
|
||||||
|
|
||||||
|
# Run the worker
|
||||||
|
with concurrent.futures.ThreadPoolExecutor(max_workers=100) as activity_executor:
|
||||||
|
worker = Worker(
|
||||||
|
client,
|
||||||
|
task_queue=TEMPORAL_LEGACY_TASK_QUEUE,
|
||||||
|
activities=[
|
||||||
|
dynamic_tool_activity,
|
||||||
|
],
|
||||||
|
activity_executor=activity_executor,
|
||||||
|
)
|
||||||
|
|
||||||
|
print(f"Starting legacy worker, connecting to task queue: {TEMPORAL_LEGACY_TASK_QUEUE}")
|
||||||
|
await worker.run()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
asyncio.run(main())
|
||||||
@@ -9,6 +9,7 @@ load_dotenv(override=True)
|
|||||||
TEMPORAL_ADDRESS = os.getenv("TEMPORAL_ADDRESS", "localhost:7233")
|
TEMPORAL_ADDRESS = os.getenv("TEMPORAL_ADDRESS", "localhost:7233")
|
||||||
TEMPORAL_NAMESPACE = os.getenv("TEMPORAL_NAMESPACE", "default")
|
TEMPORAL_NAMESPACE = os.getenv("TEMPORAL_NAMESPACE", "default")
|
||||||
TEMPORAL_TASK_QUEUE = os.getenv("TEMPORAL_TASK_QUEUE", "agent-task-queue")
|
TEMPORAL_TASK_QUEUE = os.getenv("TEMPORAL_TASK_QUEUE", "agent-task-queue")
|
||||||
|
TEMPORAL_LEGACY_TASK_QUEUE = os.getenv("TEMPORAL_LEGACY_TASK_QUEUE", "agent-task-queue-legacy")
|
||||||
|
|
||||||
# Authentication settings
|
# Authentication settings
|
||||||
TEMPORAL_TLS_CERT = os.getenv("TEMPORAL_TLS_CERT", "")
|
TEMPORAL_TLS_CERT = os.getenv("TEMPORAL_TLS_CERT", "")
|
||||||
|
|||||||
@@ -13,6 +13,8 @@ with workflow.unsafe.imports_passed_through():
|
|||||||
from prompts.agent_prompt_generators import generate_genai_prompt
|
from prompts.agent_prompt_generators import generate_genai_prompt
|
||||||
from models.data_types import CombinedInput, ToolWorkflowParams, ToolPromptInput
|
from models.data_types import CombinedInput, ToolWorkflowParams, ToolPromptInput
|
||||||
|
|
||||||
|
from shared.config import TEMPORAL_LEGACY_TASK_QUEUE
|
||||||
|
|
||||||
# Constants
|
# Constants
|
||||||
MAX_TURNS_BEFORE_CONTINUE = 250
|
MAX_TURNS_BEFORE_CONTINUE = 250
|
||||||
TOOL_ACTIVITY_START_TO_CLOSE_TIMEOUT = timedelta(seconds=10)
|
TOOL_ACTIVITY_START_TO_CLOSE_TIMEOUT = timedelta(seconds=10)
|
||||||
@@ -47,10 +49,13 @@ class ToolWorkflow:
|
|||||||
"""Execute a tool after confirmation and handle its result."""
|
"""Execute a tool after confirmation and handle its result."""
|
||||||
workflow.logger.info(f"Confirmed. Proceeding with tool: {current_tool}")
|
workflow.logger.info(f"Confirmed. Proceeding with tool: {current_tool}")
|
||||||
|
|
||||||
|
task_queue = TEMPORAL_LEGACY_TASK_QUEUE if current_tool in ["SearchTrain", "BookTrain"] else None
|
||||||
|
|
||||||
try:
|
try:
|
||||||
dynamic_result = await workflow.execute_activity(
|
dynamic_result = await workflow.execute_activity(
|
||||||
current_tool,
|
current_tool,
|
||||||
tool_data["args"],
|
tool_data["args"],
|
||||||
|
task_queue=task_queue,
|
||||||
schedule_to_close_timeout=TOOL_ACTIVITY_SCHEDULE_TO_CLOSE_TIMEOUT,
|
schedule_to_close_timeout=TOOL_ACTIVITY_SCHEDULE_TO_CLOSE_TIMEOUT,
|
||||||
start_to_close_timeout=TOOL_ACTIVITY_START_TO_CLOSE_TIMEOUT,
|
start_to_close_timeout=TOOL_ACTIVITY_START_TO_CLOSE_TIMEOUT,
|
||||||
retry_policy=RetryPolicy(
|
retry_policy=RetryPolicy(
|
||||||
|
|||||||
Reference in New Issue
Block a user