From ddd5a106a7ffe98b8d7b8d16bc34fec1a4bc430b Mon Sep 17 00:00:00 2001 From: Steve Androulakis Date: Thu, 2 Jan 2025 12:13:03 -0800 Subject: [PATCH] refactored workflow to be event loop --- prompts/agent_prompt_generators.py | 3 +- workflows/tool_workflow.py | 186 +++++++++++++---------------- 2 files changed, 83 insertions(+), 106 deletions(-) diff --git a/prompts/agent_prompt_generators.py b/prompts/agent_prompt_generators.py index bc2c788..16681e8 100644 --- a/prompts/agent_prompt_generators.py +++ b/prompts/agent_prompt_generators.py @@ -124,7 +124,8 @@ def generate_genai_prompt( ) prompt_lines.append( - "REMINDER: If any required argument is missing, set 'next': 'question' and ask the user for it." + "REMINDER: If any required argument is missing, set 'next': 'question' and ask the user for it. " + "REMINDER: Use 'next': 'confirm' only if NO arguments are missing. " ) prompt_lines.append( """ diff --git a/workflows/tool_workflow.py b/workflows/tool_workflow.py index bec4908..0fab341 100644 --- a/workflows/tool_workflow.py +++ b/workflows/tool_workflow.py @@ -39,152 +39,125 @@ class ToolWorkflow: if params and params.prompt_queue: self.prompt_queue.extend(params.prompt_queue) + waiting_for_confirm = False + current_tool = None + while True: - # 1) Wait for a user prompt or an end-chat + # Wait until *any* signal or user prompt arrives: await workflow.wait_condition( - lambda: bool(self.prompt_queue) or self.chat_ended + lambda: bool(self.prompt_queue) or self.chat_ended or self.confirm ) + # 1) If chat_ended was signaled, handle end and return if self.chat_ended: - # Possibly do a summary if multiple turns + # possibly do a summary if multiple turns if len(self.conversation_history) > 1: summary_context, summary_prompt = self.prompt_summary_with_history() summary_input = ToolPromptInput( - prompt=summary_prompt, - context_instructions=summary_context, + prompt=summary_prompt, context_instructions=summary_context ) self.conversation_summary = await workflow.start_activity_method( ToolActivities.prompt_llm, summary_input, schedule_to_close_timeout=timedelta(seconds=20), ) - workflow.logger.info( "Chat ended. Conversation summary:\n" + f"{self.conversation_summary}" ) return f"{self.conversation_history}" - # 2) Pop the user’s new message from the queue - prompt = self.prompt_queue.popleft() - self.conversation_history.append(("user", prompt)) - - # 3) Call the LLM with the entire conversation + Tools - context_instructions = generate_genai_prompt( - tools_data, self.format_history(), tool_data - ) - prompt_input = ToolPromptInput( - prompt=prompt, - context_instructions=context_instructions, - ) - tool_data = await workflow.execute_activity_method( - ToolActivities.prompt_llm, - prompt_input, - schedule_to_close_timeout=timedelta(seconds=60), - retry_policy=RetryPolicy( - maximum_attempts=5, initial_interval=timedelta(seconds=12) - ), - ) - - # 5) Store it and show the conversation - self.tool_data = tool_data - self.conversation_history.append(("response", str(tool_data))) - - # 6) Check for special flags - next_step = self.tool_data.get("next") # e.g. "confirm", "question", "done" - current_tool = self.tool_data.get( - "tool" - ) # e.g. "FindEvents", "SearchFlights", "CreateInvoice" - - if next_step == "confirm" and current_tool: + # 2) If we received a confirm signal: + if self.confirm and waiting_for_confirm and current_tool: + # Clear the confirm flag so we don't repeatedly confirm self.confirm = False + waiting_for_confirm = False - # Wait for a 'confirm' signal - await workflow.wait_condition(lambda: self.confirm) - workflow.logger.info( - "Confirmed. Proceeding with tool execution: " + current_tool - ) - - # We have enough info to call the tool + # Run the tool + workflow.logger.info(f"Confirmed. Proceeding with tool: {current_tool}") dynamic_result = await workflow.execute_activity( current_tool, - self.tool_data["args"], # single argument + self.tool_data["args"], schedule_to_close_timeout=timedelta(seconds=20), ) - - # Append tool’s result to the conversation self.conversation_history.append( (f"{current_tool}_result", str(dynamic_result)) ) - # Enqueue a follow-up question to the LLM + # Enqueue a follow-up prompt for the LLM self.prompt_queue.append( f"The '{current_tool}' tool completed successfully with {dynamic_result}. " - "INSTRUCTIONS: Use this tool result, and the context_instructions (conversation history) to intelligently pre-fill the next tool's arguments. " - "NOTE: If all listed tools have run, then we should generate a done response. Otherwise: What should we do next? " + "INSTRUCTIONS: Use this tool result, and the conversation history to figure out next steps. " + "If all listed tools have run, then produce a done response." ) - # The loop continues, and on the next iteration, the workflow sees that new "prompt" - # as if the user typed it, calls the LLM, etc. - - elif next_step == "done": - # LLM signals no more tools needed - workflow.logger.info("All steps completed. Exiting workflow.") - return str(self.conversation_history) - - # 7) Optionally handle "continue_as_new" after many turns - if len(self.conversation_history) >= self.max_turns_before_continue: - summary_context, summary_prompt = self.prompt_summary_with_history() - summary_input = ToolPromptInput( - prompt=summary_prompt, - context_instructions=summary_context, - ) - self.conversation_summary = await workflow.start_activity_method( - ToolActivities.prompt_llm, - summary_input, - schedule_to_close_timeout=timedelta(seconds=20), - ) - workflow.logger.info( - f"Continuing as new after {self.max_turns_before_continue} turns." - ) - - workflow.continue_as_new( - args=[ - CombinedInput( - tool_params=ToolWorkflowParams( - conversation_summary=self.conversation_summary, - prompt_queue=self.prompt_queue, - ), - tools_data=tools_data, - ) - ] - ) - - # 8) If "next_step" is "question" or anything else, - # we just keep looping, waiting for user prompt or signals. - + # Loop around again continue - # Handle end of chat - if self.chat_ended: - if len(self.conversation_history) > 1: - # Summarize conversation + # 3) If there's a user prompt waiting, process it (unless we're in some other skipping logic). + if self.prompt_queue: + prompt = self.prompt_queue.popleft() + self.conversation_history.append(("user", prompt)) + + # Pass entire conversation + Tools to LLM + context_instructions = generate_genai_prompt( + tools_data, self.format_history(), self.tool_data + ) + prompt_input = ToolPromptInput( + prompt=prompt, + context_instructions=context_instructions, + ) + tool_data = await workflow.execute_activity_method( + ToolActivities.prompt_llm, + prompt_input, + schedule_to_close_timeout=timedelta(seconds=60), + retry_policy=RetryPolicy( + maximum_attempts=5, initial_interval=timedelta(seconds=12) + ), + ) + self.tool_data = tool_data + self.conversation_history.append(("response", str(tool_data))) + + # Check the next step from LLM + next_step = self.tool_data.get("next") + current_tool = self.tool_data.get("tool") + + if next_step == "confirm" and current_tool: + waiting_for_confirm = True + self.confirm = False # Clear any stale confirm + workflow.logger.info("Waiting for user confirm signal...") + # We do NOT do an immediate wait_condition here; + # instead, let the loop continue so we can still handle prompts/end_chat signals. + + elif next_step == "done": + workflow.logger.info("All steps completed. Exiting workflow.") + return str(self.conversation_history) + + # Possibly continue-as-new after many turns + # todo ensure this doesn't lose critical context + if len(self.conversation_history) >= self.max_turns_before_continue: summary_context, summary_prompt = self.prompt_summary_with_history() summary_input = ToolPromptInput( - prompt=summary_prompt, - context_instructions=summary_context, + prompt=summary_prompt, context_instructions=summary_context ) - self.conversation_summary = await workflow.start_activity_method( ToolActivities.prompt_llm, summary_input, schedule_to_close_timeout=timedelta(seconds=20), ) - - workflow.logger.info( - "Chat ended. Conversation summary:\n" - + f"{self.conversation_summary}" - ) - return f"{self.conversation_summary}" + workflow.logger.info( + f"Continuing as new after {self.max_turns_before_continue} turns." + ) + workflow.continue_as_new( + args=[ + CombinedInput( + tool_params=ToolWorkflowParams( + conversation_summary=self.conversation_summary, + prompt_queue=self.prompt_queue, + ), + tools_data=tools_data, + ) + ] + ) @workflow.signal async def user_prompt(self, prompt: str) -> None: @@ -206,7 +179,7 @@ class ToolWorkflow: return self.conversation_history @workflow.query - def get_summary_from_history(self) -> Optional[str]: + def get_summary_from_history(self) -> Optional[dict]: return self.conversation_summary @workflow.query @@ -232,5 +205,8 @@ class ToolWorkflow: def prompt_summary_with_history(self) -> tuple[str, str]: history_string = self.format_history() context_instructions = f"Here is the conversation history between a user and a chatbot: {history_string}" - actual_prompt = "Please produce a two sentence summary of this conversation." + actual_prompt = ( + "Please produce a two sentence summary of this conversation. " + 'Put the summary in the format { "summary": "" }' + ) return (context_instructions, actual_prompt)