From c623029c9d51a3f70d7dcf661d73a781d942fd9b Mon Sep 17 00:00:00 2001 From: Steve Androulakis Date: Fri, 10 Jan 2025 14:13:32 -0800 Subject: [PATCH] Temporal Cloud support --- .env.example | 10 ++++- README.md | 95 +++++++++++++++++++++++----------------- api/main.py | 27 +++++++----- scripts/end_chat.py | 2 +- scripts/get_history.py | 17 ++----- scripts/get_tool_data.py | 23 ---------- scripts/run_worker.py | 21 ++++----- scripts/send_confirm.py | 8 ++-- shared/config.py | 56 +++++++++++++++++++++++ 9 files changed, 155 insertions(+), 104 deletions(-) delete mode 100644 scripts/get_tool_data.py create mode 100644 shared/config.py diff --git a/.env.example b/.env.example index 971a8cd..0b66b6f 100644 --- a/.env.example +++ b/.env.example @@ -3,4 +3,12 @@ OPENAI_API_KEY=sk-proj-... RAPIDAPI_KEY=9df2cb5... RAPIDAPI_HOST=sky-scrapper.p.rapidapi.com -STRIPE_API_KEY=sk_test_51J... \ No newline at end of file +STRIPE_API_KEY=sk_test_51J... + +# uncomment and unset these environment variables to connect to the local dev server +TEMPORAL_ADDRESS=namespace.acct.tmprl.cloud:7233 +TEMPORAL_NAMESPACE=default +TEMPORAL_TASK_QUEUE=agent-task-queue +TEMPORAL_TLS_CERT='path/to/cert.pem' +TEMPORAL_TLS_KEY='path/to/key.pem' +# TEMPORAL_API_KEY=abcdef1234567890 # Uncomment not using mTLS \ No newline at end of file diff --git a/README.md b/README.md index fa709b5..064f9f9 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# AI Agent execution using Temporal +# Temporal AI Agent This demo shows a multi-turn conversation with an AI agent running inside a Temporal workflow. The goal is to collect information towards a goal. There's a simple DSL input for collecting information (currently set up to use mock functions to search for events, book flights around those events then create an invoice for those flights). The AI will respond with clarifications and ask for any missing information to that goal. It uses ChatGPT 4o but can be made to use a local LLM via [Ollama](https://ollama.com) (see the deprecated section below). @@ -6,18 +6,51 @@ This demo shows a multi-turn conversation with an AI agent running inside a Temp [![Watch the demo](./agent-youtube-screenshot.jpeg)](https://www.youtube.com/watch?v=GEXllEH2XiQ) -## Setup -* See `.env_example` for the required environment variables and copy to `.env` in the root directory. -* Requires an OpenAI key for the gpt-4o model. Set this in the `OPENAI_API_KEY` environment variable in .env +## Configuration + +This application uses `.env` files for configuration. Copy the `.env.example` file to `.env` and update the values: + +```bash +cp .env.example .env +``` + +The agent requires an OpenAI key for the gpt-4o model. Set this in the `OPENAI_API_KEY` environment variable in .env + +#### Using a local LLM instead of ChatGPT 4o +* Install [Ollama](https://ollama.com) and the [Qwen2.5 14B](https://ollama.com/library/qwen2.5) model (`ollama run qwen2.5:14b`). (note this model is about 9GB to download). + * Local LLM is disabled as ChatGPT 4o was better for this use case. To use Ollama, examine `./activities/tool_activities.py` and rename the functions. + +## Agent Tools * Requires a Rapidapi key for sky-scrapper (how we find flights). Set this in the `RAPIDAPI_KEY` environment variable in .env * It's free to sign up and get a key at [RapidAPI](https://rapidapi.com/apiheya/api/sky-scrapper) * If you're lazy go to `tools/search_flights.py` and replace the `get_flights` function with the mock `search_flights_example` that exists in the same file. * Requires a Stripe key for the `create_invoice` tool. Set this in the `STRIPE_API_KEY` environment variable in .env * It's free to sign up and get a key at [Stripe](https://stripe.com/) * If you're lazy go to `tools/create_invoice.py` and replace the `create_invoice` function with the mock `create_invoice_example` that exists in the same file. -* Install and run Temporal. Follow the instructions in the [Temporal documentation](https://learn.temporal.io/getting_started/python/dev_environment/#set-up-a-local-temporal-service-for-development-with-temporal-cli) to install and run the Temporal server. -### Python Environment +## Configuring Temporal Connection + +By default, this application will connect to a local Temporal server (`localhost:7233`) in the default namespace, using the `agent-task-queue` task queue. You can override these settings in your `.env` file. + +### Use Temporal Cloud + +See `.env.example` for details on connecting to Temporal Cloud using mTLS or API key authentication. + +[Sign up for Temporal Cloud](https://temporal.io/get-cloud) + +### Use a local Temporal Dev Server + +On a Mac +```bash +brew install temporal +temporal server start-dev +``` +See the [Temporal documentation](https://learn.temporal.io/getting_started/python/dev_environment/) for other platforms. + + +## Running the Application + +### Python Backend Requires [Poetry](https://python-poetry.org/) to manage dependencies. @@ -27,44 +60,29 @@ Requires [Poetry](https://python-poetry.org/) to manage dependencies. 3. `poetry install` -### React UI -- `cd frontend` -- `npm install` to install the dependencies. +Run the following commands in separate terminal windows: - -#### Deprecated: -* Install [Ollama](https://ollama.com) and the [Qwen2.5 14B](https://ollama.com/library/qwen2.5) model (`ollama run qwen2.5:14b`). (note this model is about 9GB to download). - * Local LLM is disabled as ChatGPT 4o was better for this use case. To use Ollama, examine `./activities/tool_activities.py` and rename the functions. - - -## Running the demo - -### Run a Temporal Dev Server - -On a Mac +1. Start the Temporal worker: ```bash -brew install temporal -temporal server start-dev +poetry run python scripts/run_worker.py ``` -See the [Temporal documentation](https://learn.temporal.io/getting_started/python/dev_environment/) for other platforms. -### Run a Temporal Worker +2. Start the API server: +```bash +poetry run uvicorn api.main:app --reload +``` +Access the API at `/docs` to see the available endpoints. -From the `/scripts` directory: +### React UI +Start the frontend: +```bash +cd frontend +npm install +npm run dev +``` +Access the UI at `http://localhost:5173` -- Run the worker: `poetry run python run_worker.py` - -Then run the API and UI using the instructions below. - -### API -- `poetry run uvicorn api.main:app --reload` to start the API server. -- Access the API at `/docs` to see the available endpoints. - -### UI -- `npm run dev` to start the dev server. -- Access the UI at `http://localhost:5173` - -## Customizing the agent +## Customizing the Agent - `tool_registry.py` contains the mapping of tool names to tool definitions (so the AI understands how to use them) - `goal_registry.py` contains descriptions of goals and the tools used to achieve them - The tools themselves are defined in their own files in `/tools` @@ -74,4 +92,3 @@ Then run the API and UI using the instructions below. ## TODO - I should prove this out with other tool definitions outside of the event/flight search case (take advantage of my nice DSL). - Currently hardcoded to the Temporal dev server at localhost:7233. Need to support options incl Temporal Cloud. -- UI: Make prettier \ No newline at end of file diff --git a/api/main.py b/api/main.py index da51617..0ed92e4 100644 --- a/api/main.py +++ b/api/main.py @@ -1,12 +1,21 @@ from fastapi import FastAPI +from typing import Optional from temporalio.client import Client + from workflows.tool_workflow import ToolWorkflow from models.data_types import CombinedInput, ToolWorkflowParams from tools.goal_registry import goal_event_flight_invoice from temporalio.exceptions import TemporalError from fastapi.middleware.cors import CORSMiddleware +from shared.config import get_temporal_client, TEMPORAL_TASK_QUEUE app = FastAPI() +temporal_client: Optional[Client] = None + +@app.on_event("startup") +async def startup_event(): + global temporal_client + temporal_client = await get_temporal_client() app.add_middleware( CORSMiddleware, @@ -25,10 +34,9 @@ def root(): @app.get("/tool-data") async def get_tool_data(): """Calls the workflow's 'get_tool_data' query.""" - client = await Client.connect("localhost:7233") try: # Get workflow handle - handle = client.get_workflow_handle("agent-workflow") + handle = temporal_client.get_workflow_handle("agent-workflow") # Check if the workflow is completed workflow_status = await handle.describe() @@ -48,9 +56,8 @@ async def get_tool_data(): @app.get("/get-conversation-history") async def get_conversation_history(): """Calls the workflow's 'get_conversation_history' query.""" - client = await Client.connect("localhost:7233") try: - handle = client.get_workflow_handle("agent-workflow") + handle = temporal_client.get_workflow_handle("agent-workflow") conversation_history = await handle.query("get_conversation_history") return conversation_history @@ -61,8 +68,6 @@ async def get_conversation_history(): @app.post("/send-prompt") async def send_prompt(prompt: str): - client = await Client.connect("localhost:7233") - # Create combined input combined_input = CombinedInput( tool_params=ToolWorkflowParams(None, None), @@ -72,11 +77,11 @@ async def send_prompt(prompt: str): workflow_id = "agent-workflow" # Start (or signal) the workflow - await client.start_workflow( + await temporal_client.start_workflow( ToolWorkflow.run, combined_input, id=workflow_id, - task_queue="agent-task-queue", + task_queue=TEMPORAL_TASK_QUEUE, start_signal="user_prompt", start_signal_args=[prompt], ) @@ -87,9 +92,8 @@ async def send_prompt(prompt: str): @app.post("/confirm") async def send_confirm(): """Sends a 'confirm' signal to the workflow.""" - client = await Client.connect("localhost:7233") workflow_id = "agent-workflow" - handle = client.get_workflow_handle(workflow_id) + handle = temporal_client.get_workflow_handle(workflow_id) await handle.signal("confirm") return {"message": "Confirm signal sent."} @@ -97,11 +101,10 @@ async def send_confirm(): @app.post("/end-chat") async def end_chat(): """Sends a 'end_chat' signal to the workflow.""" - client = await Client.connect("localhost:7233") workflow_id = "agent-workflow" try: - handle = client.get_workflow_handle(workflow_id) + handle = temporal_client.get_workflow_handle(workflow_id) await handle.signal("end_chat") return {"message": "End chat signal sent."} except TemporalError as e: diff --git a/scripts/end_chat.py b/scripts/end_chat.py index ee4381e..c379eeb 100644 --- a/scripts/end_chat.py +++ b/scripts/end_chat.py @@ -1,6 +1,6 @@ import asyncio -from temporalio.client import Client + from workflows.tool_workflow import ToolWorkflow diff --git a/scripts/get_history.py b/scripts/get_history.py index 97fdf8c..1015023 100644 --- a/scripts/get_history.py +++ b/scripts/get_history.py @@ -1,12 +1,12 @@ import asyncio -from temporalio.client import Client -from workflows import ToolWorkflow +from shared.config import get_temporal_client +from workflows.tool_workflow import ToolWorkflow async def main(): # Create client connected to server at the given address - client = await Client.connect("localhost:7233") + client = await get_temporal_client() workflow_id = "agent-workflow" handle = client.get_workflow_handle(workflow_id) @@ -15,16 +15,7 @@ async def main(): history = await handle.query(ToolWorkflow.get_conversation_history) print("Conversation History") - print( - *(f"{speaker.title()}: {message}\n" for speaker, message in history), sep="\n" - ) - - # Queries the workflow for the conversation summary - summary = await handle.query(ToolWorkflow.get_summary_from_history) - - if summary is not None: - print("Conversation Summary:") - print(summary) + print(history) if __name__ == "__main__": diff --git a/scripts/get_tool_data.py b/scripts/get_tool_data.py deleted file mode 100644 index e969f41..0000000 --- a/scripts/get_tool_data.py +++ /dev/null @@ -1,23 +0,0 @@ -import asyncio -import json - -from temporalio.client import Client -from workflows.tool_workflow import ToolWorkflow - - -async def main(): - # Create client connected to server at the given address - client = await Client.connect("localhost:7233") - workflow_id = "agent-workflow" - - handle = client.get_workflow_handle(workflow_id) - - # Queries the workflow for the conversation history - tool_data = await handle.query(ToolWorkflow.get_tool_data) - - # pretty print - print(json.dumps(tool_data, indent=4)) - - -if __name__ == "__main__": - asyncio.run(main()) diff --git a/scripts/run_worker.py b/scripts/run_worker.py index a048476..0f9bd5a 100644 --- a/scripts/run_worker.py +++ b/scripts/run_worker.py @@ -1,27 +1,26 @@ import asyncio -import concurrent.futures -import logging -from temporalio.client import Client +import concurrent.futures + from temporalio.worker import Worker from activities.tool_activities import ToolActivities, dynamic_tool_activity from workflows.tool_workflow import ToolWorkflow -from dotenv import load_dotenv -load_dotenv() +from shared.config import get_temporal_client, TEMPORAL_TASK_QUEUE async def main(): - # Create client connected to server at the given address - client = await Client.connect("localhost:7233") + # Create the client + client = await get_temporal_client() + activities = ToolActivities() # Run the worker with concurrent.futures.ThreadPoolExecutor(max_workers=100) as activity_executor: worker = Worker( client, - task_queue="agent-task-queue", + task_queue=TEMPORAL_TASK_QUEUE, workflows=[ToolWorkflow], activities=[ activities.prompt_llm, @@ -29,12 +28,10 @@ async def main(): ], activity_executor=activity_executor, ) + + print(f"Starting worker, connecting to task queue: {TEMPORAL_TASK_QUEUE}") await worker.run() if __name__ == "__main__": - print("Starting worker") - - logging.basicConfig(level=logging.INFO) - asyncio.run(main()) diff --git a/scripts/send_confirm.py b/scripts/send_confirm.py index 22be926..ed99adf 100644 --- a/scripts/send_confirm.py +++ b/scripts/send_confirm.py @@ -1,12 +1,14 @@ import asyncio import sys -from temporalio.client import Client + +from shared.config import get_temporal_client +from workflows.tool_workflow import ToolWorkflow async def main(): - # 1) Connect to Temporal and signal the workflow - client = await Client.connect("localhost:7233") + # Connect to Temporal and signal the workflow + client = await get_temporal_client() workflow_id = "agent-workflow" diff --git a/shared/config.py b/shared/config.py new file mode 100644 index 0000000..9291863 --- /dev/null +++ b/shared/config.py @@ -0,0 +1,56 @@ +import os +from dotenv import load_dotenv +from temporalio.client import Client +from temporalio.service import TLSConfig + +load_dotenv() + +# Temporal connection settings +TEMPORAL_ADDRESS = os.getenv("TEMPORAL_ADDRESS", "localhost:7233") +TEMPORAL_NAMESPACE = os.getenv("TEMPORAL_NAMESPACE", "default") +TEMPORAL_TASK_QUEUE = os.getenv("TEMPORAL_TASK_QUEUE", "agent-task-queue") + +# Authentication settings +TEMPORAL_TLS_CERT = os.getenv("TEMPORAL_TLS_CERT", "") +TEMPORAL_TLS_KEY = os.getenv("TEMPORAL_TLS_KEY", "") +TEMPORAL_API_KEY = os.getenv("TEMPORAL_API_KEY", "") + +async def get_temporal_client() -> Client: + """ + Creates a Temporal client based on environment configuration. + Supports local server, mTLS, and API key authentication methods. + """ + # Default to no TLS for local development + tls_config = False + print(f"Address: {TEMPORAL_ADDRESS}, Namespace {TEMPORAL_NAMESPACE}") + print("(If unset, then will try to connect to local server)") + + # Configure mTLS if certificate and key are provided + if TEMPORAL_TLS_CERT and TEMPORAL_TLS_KEY: + print(f"TLS cert: {TEMPORAL_TLS_CERT}") + print(f"TLS key: {TEMPORAL_TLS_KEY}") + with open(TEMPORAL_TLS_CERT, "rb") as f: + client_cert = f.read() + with open(TEMPORAL_TLS_KEY, "rb") as f: + client_key = f.read() + tls_config = TLSConfig( + client_cert=client_cert, + client_private_key=client_key, + ) + + # Use API key authentication if provided + if TEMPORAL_API_KEY: + print(f"API key: {TEMPORAL_API_KEY}") + return await Client.connect( + TEMPORAL_ADDRESS, + namespace=TEMPORAL_NAMESPACE, + api_key=TEMPORAL_API_KEY, + tls=True, # Always use TLS with API key + ) + + # Use mTLS or local connection + return await Client.connect( + TEMPORAL_ADDRESS, + namespace=TEMPORAL_NAMESPACE, + tls=tls_config, + ) \ No newline at end of file