Files
temporal-ai-agent/workflows/agent_goal_workflow.py
Steve Androulakis 5d55a9fe80 Model Context Protocol (MCP) support with new use case (#42)
* initial mcp

* food ordering with mcp

* prompt eng

* splitting out goals and updating docs

* a diff so I can get tests from codex

* a diff so I can get tests from codex

* oops, missing files

* tests, file formatting

* readme and setup updates

* setup.md link fixes

* readme change

* readme change

* readme change

* stripe food setup script

* single agent mode default

* prompt engineering for better multi agent performance

* performance should be greatly improved

* Update goals/finance.py

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Update activities/tool_activities.py

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* co-pilot PR suggested this change, and now fixed it

* stronger wording around json format response

* formatting

* moved docs to dir

* moved image assets under docs

* cleanup env example, stripe guidance

* cleanup

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2025-06-09 16:39:57 -07:00

449 lines
19 KiB
Python

from collections import deque
from datetime import timedelta
from typing import Any, Deque, Dict, List, Optional, TypedDict, Union
from temporalio import workflow
from temporalio.common import RetryPolicy
from models.data_types import (
ConversationHistory,
EnvLookupInput,
EnvLookupOutput,
NextStep,
ValidationInput,
)
from models.tool_definitions import AgentGoal
from workflows import workflow_helpers as helpers
from workflows.workflow_helpers import (
LLM_ACTIVITY_SCHEDULE_TO_CLOSE_TIMEOUT,
LLM_ACTIVITY_START_TO_CLOSE_TIMEOUT,
)
with workflow.unsafe.imports_passed_through():
from activities.tool_activities import ToolActivities, mcp_list_tools
from goals import goal_list
from models.data_types import CombinedInput, ToolPromptInput
from prompts.agent_prompt_generators import generate_genai_prompt
from tools.tool_registry import create_mcp_tool_definitions
# Constants
MAX_TURNS_BEFORE_CONTINUE = 250
# ToolData as part of the workflow is what's accessible to the UI - see LLMResponse.jsx for example
class ToolData(TypedDict, total=False):
next: NextStep
tool: str
args: Dict[str, Any]
response: str
force_confirm: bool = True
@workflow.defn
class AgentGoalWorkflow:
"""Workflow that manages tool execution with user confirmation and conversation history."""
def __init__(self) -> None:
self.conversation_history: ConversationHistory = {"messages": []}
self.prompt_queue: Deque[str] = deque()
self.conversation_summary: Optional[str] = None
self.chat_ended: bool = False
self.tool_data: Optional[ToolData] = None
self.confirmed: bool = (
False # indicates that we have confirmation to proceed to run tool
)
self.tool_results: List[Dict[str, Any]] = []
self.goal: AgentGoal = {"tools": []}
self.show_tool_args_confirmation: bool = (
True # set from env file in activity lookup_wf_env_settings
)
self.multi_goal_mode: bool = (
False # set from env file in activity lookup_wf_env_settings
)
self.mcp_tools_info: Optional[dict] = None # stores complete MCP tools result
# see ../api/main.py#temporal_client.start_workflow() for how the input parameters are set
@workflow.run
async def run(self, combined_input: CombinedInput) -> str:
"""Main workflow execution method."""
# setup phase, starts with blank tool_params and agent_goal prompt as defined in tools/goal_registry.py
params = combined_input.tool_params
self.goal = combined_input.agent_goal
await self.lookup_wf_env_settings(combined_input)
# If the goal has an MCP server definition, dynamically load MCP tools
if self.goal.mcp_server_definition:
await self.load_mcp_tools()
# add message from sample conversation provided in tools/goal_registry.py, if it exists
if params and params.conversation_summary:
self.add_message("conversation_summary", params.conversation_summary)
self.conversation_summary = params.conversation_summary
if params and params.prompt_queue:
self.prompt_queue.extend(params.prompt_queue)
waiting_for_confirm = False
current_tool = None
# This is the main interactive loop. Main responsibilities:
# - Selecting and changing goals as directed by the user
# - reacting to user input (from signals)
# - validating user input to make sure it makes sense with the current goal and tools
# - calling the LLM through activities to determine next steps and prompts
# - executing the selected tools via activities
while True:
# wait indefinitely for input from signals - user_prompt, end_chat, or confirm as defined below
await workflow.wait_condition(
lambda: bool(self.prompt_queue) or self.chat_ended or self.confirmed
)
# handle chat should end. When chat ends, push conversation history to workflow results.
if self.chat_should_end():
return f"{self.conversation_history}"
# Execute the tool
if self.ready_for_tool_execution(waiting_for_confirm, current_tool):
waiting_for_confirm = await self.execute_tool(current_tool)
continue
# process forward on the prompt queue if any
if self.prompt_queue:
# get most recent prompt
prompt = self.prompt_queue.popleft()
workflow.logger.info(
f"workflow step: processing message on the prompt queue, message is {prompt}"
)
# Validate user-provided prompts
if self.is_user_prompt(prompt):
self.add_message("user", prompt)
# Validate the prompt before proceeding
validation_input = ValidationInput(
prompt=prompt,
conversation_history=self.conversation_history,
agent_goal=self.goal,
)
validation_result = await workflow.execute_activity_method(
ToolActivities.agent_validatePrompt,
args=[validation_input],
schedule_to_close_timeout=LLM_ACTIVITY_SCHEDULE_TO_CLOSE_TIMEOUT,
start_to_close_timeout=LLM_ACTIVITY_START_TO_CLOSE_TIMEOUT,
retry_policy=RetryPolicy(
initial_interval=timedelta(seconds=5), backoff_coefficient=1
),
)
# If validation fails, provide that feedback to the user - i.e., "your words make no sense, puny human" end this iteration of processing
if not validation_result.validationResult:
workflow.logger.warning(
f"Prompt validation failed: {validation_result.validationFailedReason}"
)
self.add_message(
"agent", validation_result.validationFailedReason
)
continue
# If valid, proceed with generating the context and prompt
context_instructions = generate_genai_prompt(
agent_goal=self.goal,
conversation_history=self.conversation_history,
multi_goal_mode=self.multi_goal_mode,
raw_json=self.tool_data,
mcp_tools_info=self.mcp_tools_info,
)
prompt_input = ToolPromptInput(
prompt=prompt, context_instructions=context_instructions
)
# connect to LLM and execute to get next steps
tool_data = await workflow.execute_activity_method(
ToolActivities.agent_toolPlanner,
prompt_input,
schedule_to_close_timeout=LLM_ACTIVITY_SCHEDULE_TO_CLOSE_TIMEOUT,
start_to_close_timeout=LLM_ACTIVITY_START_TO_CLOSE_TIMEOUT,
retry_policy=RetryPolicy(
initial_interval=timedelta(seconds=5), backoff_coefficient=1
),
)
tool_data["force_confirm"] = self.show_tool_args_confirmation
self.tool_data = tool_data
# process the tool as dictated by the prompt response - what to do next, and with which tool
next_step = tool_data.get("next")
current_tool = tool_data.get("tool")
workflow.logger.info(
f"next_step: {next_step}, current tool is {current_tool}"
)
# make sure we're ready to run the tool & have everything we need
if next_step == "confirm" and current_tool:
args = tool_data.get("args", {})
# if we're missing arguments, ask for them
if await helpers.handle_missing_args(
current_tool, args, tool_data, self.prompt_queue
):
continue
waiting_for_confirm = True
# We have needed arguments, if we want to force the user to confirm, set that up
if self.show_tool_args_confirmation:
self.confirmed = False # set that we're not confirmed
workflow.logger.info("Waiting for user confirm signal...")
# if we have all needed arguments (handled above) and not holding for a debugging confirm, proceed:
else:
self.confirmed = True
# else if the next step is to pick a new goal, set that to be the goal
elif next_step == "pick-new-goal":
workflow.logger.info("All steps completed. Resetting goal.")
self.change_goal("goal_choose_agent_type")
# else if the next step is to be done with the conversation such as if the user requests it via asking to "end conversation"
elif next_step == "done":
self.add_message("agent", tool_data)
# here we could send conversation to AI for analysis
# end the workflow
return str(self.conversation_history)
self.add_message("agent", tool_data)
await helpers.continue_as_new_if_needed(
self.conversation_history,
self.prompt_queue,
self.goal,
MAX_TURNS_BEFORE_CONTINUE,
self.add_message,
)
# Signal that comes from api/main.py via a post to /send-prompt
@workflow.signal
async def user_prompt(self, prompt: str) -> None:
"""Signal handler for receiving user prompts."""
workflow.logger.info(f"signal received: user_prompt, prompt is {prompt}")
if self.chat_ended:
workflow.logger.info(f"Message dropped due to chat closed: {prompt}")
return
self.prompt_queue.append(prompt)
# Signal that comes from api/main.py via a post to /confirm
@workflow.signal
async def confirm(self) -> None:
"""Signal handler for user confirmation of tool execution."""
workflow.logger.info("Received user signal: confirmation")
self.confirmed = True
# Signal that comes from api/main.py via a post to /end-chat
@workflow.signal
async def end_chat(self) -> None:
"""Signal handler for ending the chat session."""
workflow.logger.info("signal received: end_chat")
self.chat_ended = True
# Signal that can be sent from Temporal Workflow UI to enable debugging confirm and override .env setting
@workflow.signal
async def enable_debugging_confirm(self) -> None:
"""Signal handler for enabling debugging confirm UI & associated logic."""
workflow.logger.info("signal received: enable_debugging_confirm")
self.enable_debugging_confirm = True
# Signal that can be sent from Temporal Workflow UI to disable debugging confirm and override .env setting
@workflow.signal
async def disable_debugging_confirm(self) -> None:
"""Signal handler for disabling debugging confirm UI & associated logic."""
workflow.logger.info("signal received: disable_debugging_confirm")
self.enable_debugging_confirm = False
@workflow.query
def get_conversation_history(self) -> ConversationHistory:
"""Query handler to retrieve the full conversation history."""
return self.conversation_history
@workflow.query
def get_agent_goal(self) -> AgentGoal:
"""Query handler to retrieve the current goal of the agent."""
return self.goal
@workflow.query
def get_summary_from_history(self) -> Optional[str]:
"""Query handler to retrieve the conversation summary if available.
Used only for continue as new of the workflow."""
return self.conversation_summary
@workflow.query
def get_latest_tool_data(self) -> Optional[ToolData]:
"""Query handler to retrieve the latest tool data response if available."""
return self.tool_data
def add_message(self, actor: str, response: Union[str, Dict[str, Any]]) -> None:
"""Add a message to the conversation history.
Args:
actor: The entity that generated the message (e.g., "user", "agent")
response: The message content, either as a string or structured data
"""
if isinstance(response, dict):
response_str = str(response)
workflow.logger.debug(f"Adding {actor} message: {response_str[:100]}...")
else:
workflow.logger.debug(f"Adding {actor} message: {response[:100]}...")
self.conversation_history["messages"].append(
{"actor": actor, "response": response}
)
def change_goal(self, goal: str) -> None:
"""Change the goal (usually on request of the user).
Args:
goal: goal to change to)
"""
if goal is not None:
for listed_goal in goal_list:
if listed_goal.id == goal:
self.goal = listed_goal
workflow.logger.info("Changed goal to " + goal)
if goal is None:
workflow.logger.warning(
"Goal not set after goal reset, probably bad."
) # if this happens, there's probably a problem with the goal list
# workflow function that defines if chat should end
def chat_should_end(self) -> bool:
if self.chat_ended:
workflow.logger.info("Chat-end signal received. Chat ending.")
return True
else:
return False
# define if we're ready for tool execution
def ready_for_tool_execution(
self, waiting_for_confirm: bool, current_tool: Any
) -> bool:
if self.confirmed and waiting_for_confirm and current_tool and self.tool_data:
return True
else:
return False
# LLM-tagged prompts start with "###"
# all others are from the user
def is_user_prompt(self, prompt) -> bool:
if prompt.startswith("###"):
return False
else:
return True
# look up env settings in an activity so they're part of history
async def lookup_wf_env_settings(self, combined_input: CombinedInput) -> None:
env_lookup_input = EnvLookupInput(
show_confirm_env_var_name="SHOW_CONFIRM",
show_confirm_default=True,
)
env_output: EnvLookupOutput = await workflow.execute_activity_method(
ToolActivities.get_wf_env_vars,
env_lookup_input,
start_to_close_timeout=LLM_ACTIVITY_START_TO_CLOSE_TIMEOUT,
retry_policy=RetryPolicy(
initial_interval=timedelta(seconds=5), backoff_coefficient=1
),
)
self.show_tool_args_confirmation = env_output.show_confirm
self.multi_goal_mode = env_output.multi_goal_mode
# execute the tool - return False if we're not waiting for confirm anymore (always the case if it works successfully)
#
async def execute_tool(self, current_tool: str) -> bool:
workflow.logger.info(
f"workflow step: user has confirmed, executing the tool {current_tool}"
)
self.confirmed = False
waiting_for_confirm = False
confirmed_tool_data = self.tool_data.copy()
confirmed_tool_data["next"] = "user_confirmed_tool_run"
self.add_message("user_confirmed_tool_run", confirmed_tool_data)
# execute the tool by key as defined in tools/__init__.py
await helpers.handle_tool_execution(
current_tool,
self.tool_data,
self.tool_results,
self.add_message,
self.prompt_queue,
self.goal,
)
# set new goal if we should
if len(self.tool_results) > 0:
if (
"ChangeGoal" in self.tool_results[-1].values()
and "new_goal" in self.tool_results[-1].keys()
):
new_goal = self.tool_results[-1].get("new_goal")
self.change_goal(new_goal)
elif (
"ListAgents" in self.tool_results[-1].values()
and self.goal.id != "goal_choose_agent_type"
):
self.change_goal("goal_choose_agent_type")
return waiting_for_confirm
# debugging helper - drop this in various places in the workflow to get status
# also don't forget you can look at the workflow itself and do queries if you want
def print_useful_workflow_vars(self, status_or_step: str) -> None:
print(f"***{status_or_step}:***")
if self.goal:
print(f"current goal: {self.goal.id}")
if self.tool_data:
print(f"force confirm? {self.tool_data['force_confirm']}")
print(f"next step: {self.tool_data.get('next')}")
print(f"current_tool: {self.tool_data.get('tool')}")
else:
print("no tool data initialized yet")
print(f"self.confirmed: {self.confirmed}")
async def load_mcp_tools(self) -> None:
"""Load MCP tools dynamically from the server definition"""
if not self.goal.mcp_server_definition:
return
workflow.logger.info(
f"Loading MCP tools from server: {self.goal.mcp_server_definition.name}"
)
# Get the list of tools to include (if specified)
include_tools = self.goal.mcp_server_definition.included_tools
# Call the MCP list tools activity
mcp_tools_result = await workflow.execute_activity(
mcp_list_tools,
args=[self.goal.mcp_server_definition, include_tools],
start_to_close_timeout=LLM_ACTIVITY_START_TO_CLOSE_TIMEOUT,
retry_policy=RetryPolicy(
initial_interval=timedelta(seconds=5), backoff_coefficient=1
),
summary=f"{self.goal.mcp_server_definition.name}",
)
if mcp_tools_result.get("success", False):
tools_info = mcp_tools_result.get("tools", {})
workflow.logger.info(f"Successfully loaded {len(tools_info)} MCP tools")
# Store complete MCP tools result for use in prompt generation
self.mcp_tools_info = mcp_tools_result
# Convert MCP tools to ToolDefinition objects and add to goal
mcp_tool_definitions = create_mcp_tool_definitions(tools_info)
self.goal.tools.extend(mcp_tool_definitions)
workflow.logger.info(f"Added {len(mcp_tool_definitions)} MCP tools to goal")
else:
error_msg = mcp_tools_result.get("error", "Unknown error")
workflow.logger.error(f"Failed to load MCP tools: {error_msg}")
# Continue execution without MCP tools