mirror of
https://github.com/temporal-community/temporal-ai-agent.git
synced 2026-03-15 22:18:09 +01:00
refactored workflow to be event loop
This commit is contained in:
@@ -39,152 +39,125 @@ class ToolWorkflow:
|
||||
if params and params.prompt_queue:
|
||||
self.prompt_queue.extend(params.prompt_queue)
|
||||
|
||||
waiting_for_confirm = False
|
||||
current_tool = None
|
||||
|
||||
while True:
|
||||
# 1) Wait for a user prompt or an end-chat
|
||||
# Wait until *any* signal or user prompt arrives:
|
||||
await workflow.wait_condition(
|
||||
lambda: bool(self.prompt_queue) or self.chat_ended
|
||||
lambda: bool(self.prompt_queue) or self.chat_ended or self.confirm
|
||||
)
|
||||
|
||||
# 1) If chat_ended was signaled, handle end and return
|
||||
if self.chat_ended:
|
||||
# Possibly do a summary if multiple turns
|
||||
# possibly do a summary if multiple turns
|
||||
if len(self.conversation_history) > 1:
|
||||
summary_context, summary_prompt = self.prompt_summary_with_history()
|
||||
summary_input = ToolPromptInput(
|
||||
prompt=summary_prompt,
|
||||
context_instructions=summary_context,
|
||||
prompt=summary_prompt, context_instructions=summary_context
|
||||
)
|
||||
self.conversation_summary = await workflow.start_activity_method(
|
||||
ToolActivities.prompt_llm,
|
||||
summary_input,
|
||||
schedule_to_close_timeout=timedelta(seconds=20),
|
||||
)
|
||||
|
||||
workflow.logger.info(
|
||||
"Chat ended. Conversation summary:\n"
|
||||
+ f"{self.conversation_summary}"
|
||||
)
|
||||
return f"{self.conversation_history}"
|
||||
|
||||
# 2) Pop the user’s new message from the queue
|
||||
prompt = self.prompt_queue.popleft()
|
||||
self.conversation_history.append(("user", prompt))
|
||||
|
||||
# 3) Call the LLM with the entire conversation + Tools
|
||||
context_instructions = generate_genai_prompt(
|
||||
tools_data, self.format_history(), tool_data
|
||||
)
|
||||
prompt_input = ToolPromptInput(
|
||||
prompt=prompt,
|
||||
context_instructions=context_instructions,
|
||||
)
|
||||
tool_data = await workflow.execute_activity_method(
|
||||
ToolActivities.prompt_llm,
|
||||
prompt_input,
|
||||
schedule_to_close_timeout=timedelta(seconds=60),
|
||||
retry_policy=RetryPolicy(
|
||||
maximum_attempts=5, initial_interval=timedelta(seconds=12)
|
||||
),
|
||||
)
|
||||
|
||||
# 5) Store it and show the conversation
|
||||
self.tool_data = tool_data
|
||||
self.conversation_history.append(("response", str(tool_data)))
|
||||
|
||||
# 6) Check for special flags
|
||||
next_step = self.tool_data.get("next") # e.g. "confirm", "question", "done"
|
||||
current_tool = self.tool_data.get(
|
||||
"tool"
|
||||
) # e.g. "FindEvents", "SearchFlights", "CreateInvoice"
|
||||
|
||||
if next_step == "confirm" and current_tool:
|
||||
# 2) If we received a confirm signal:
|
||||
if self.confirm and waiting_for_confirm and current_tool:
|
||||
# Clear the confirm flag so we don't repeatedly confirm
|
||||
self.confirm = False
|
||||
waiting_for_confirm = False
|
||||
|
||||
# Wait for a 'confirm' signal
|
||||
await workflow.wait_condition(lambda: self.confirm)
|
||||
workflow.logger.info(
|
||||
"Confirmed. Proceeding with tool execution: " + current_tool
|
||||
)
|
||||
|
||||
# We have enough info to call the tool
|
||||
# Run the tool
|
||||
workflow.logger.info(f"Confirmed. Proceeding with tool: {current_tool}")
|
||||
dynamic_result = await workflow.execute_activity(
|
||||
current_tool,
|
||||
self.tool_data["args"], # single argument
|
||||
self.tool_data["args"],
|
||||
schedule_to_close_timeout=timedelta(seconds=20),
|
||||
)
|
||||
|
||||
# Append tool’s result to the conversation
|
||||
self.conversation_history.append(
|
||||
(f"{current_tool}_result", str(dynamic_result))
|
||||
)
|
||||
|
||||
# Enqueue a follow-up question to the LLM
|
||||
# Enqueue a follow-up prompt for the LLM
|
||||
self.prompt_queue.append(
|
||||
f"The '{current_tool}' tool completed successfully with {dynamic_result}. "
|
||||
"INSTRUCTIONS: Use this tool result, and the context_instructions (conversation history) to intelligently pre-fill the next tool's arguments. "
|
||||
"NOTE: If all listed tools have run, then we should generate a done response. Otherwise: What should we do next? "
|
||||
"INSTRUCTIONS: Use this tool result, and the conversation history to figure out next steps. "
|
||||
"If all listed tools have run, then produce a done response."
|
||||
)
|
||||
# The loop continues, and on the next iteration, the workflow sees that new "prompt"
|
||||
# as if the user typed it, calls the LLM, etc.
|
||||
|
||||
elif next_step == "done":
|
||||
# LLM signals no more tools needed
|
||||
workflow.logger.info("All steps completed. Exiting workflow.")
|
||||
return str(self.conversation_history)
|
||||
|
||||
# 7) Optionally handle "continue_as_new" after many turns
|
||||
if len(self.conversation_history) >= self.max_turns_before_continue:
|
||||
summary_context, summary_prompt = self.prompt_summary_with_history()
|
||||
summary_input = ToolPromptInput(
|
||||
prompt=summary_prompt,
|
||||
context_instructions=summary_context,
|
||||
)
|
||||
self.conversation_summary = await workflow.start_activity_method(
|
||||
ToolActivities.prompt_llm,
|
||||
summary_input,
|
||||
schedule_to_close_timeout=timedelta(seconds=20),
|
||||
)
|
||||
workflow.logger.info(
|
||||
f"Continuing as new after {self.max_turns_before_continue} turns."
|
||||
)
|
||||
|
||||
workflow.continue_as_new(
|
||||
args=[
|
||||
CombinedInput(
|
||||
tool_params=ToolWorkflowParams(
|
||||
conversation_summary=self.conversation_summary,
|
||||
prompt_queue=self.prompt_queue,
|
||||
),
|
||||
tools_data=tools_data,
|
||||
)
|
||||
]
|
||||
)
|
||||
|
||||
# 8) If "next_step" is "question" or anything else,
|
||||
# we just keep looping, waiting for user prompt or signals.
|
||||
|
||||
# Loop around again
|
||||
continue
|
||||
|
||||
# Handle end of chat
|
||||
if self.chat_ended:
|
||||
if len(self.conversation_history) > 1:
|
||||
# Summarize conversation
|
||||
# 3) If there's a user prompt waiting, process it (unless we're in some other skipping logic).
|
||||
if self.prompt_queue:
|
||||
prompt = self.prompt_queue.popleft()
|
||||
self.conversation_history.append(("user", prompt))
|
||||
|
||||
# Pass entire conversation + Tools to LLM
|
||||
context_instructions = generate_genai_prompt(
|
||||
tools_data, self.format_history(), self.tool_data
|
||||
)
|
||||
prompt_input = ToolPromptInput(
|
||||
prompt=prompt,
|
||||
context_instructions=context_instructions,
|
||||
)
|
||||
tool_data = await workflow.execute_activity_method(
|
||||
ToolActivities.prompt_llm,
|
||||
prompt_input,
|
||||
schedule_to_close_timeout=timedelta(seconds=60),
|
||||
retry_policy=RetryPolicy(
|
||||
maximum_attempts=5, initial_interval=timedelta(seconds=12)
|
||||
),
|
||||
)
|
||||
self.tool_data = tool_data
|
||||
self.conversation_history.append(("response", str(tool_data)))
|
||||
|
||||
# Check the next step from LLM
|
||||
next_step = self.tool_data.get("next")
|
||||
current_tool = self.tool_data.get("tool")
|
||||
|
||||
if next_step == "confirm" and current_tool:
|
||||
waiting_for_confirm = True
|
||||
self.confirm = False # Clear any stale confirm
|
||||
workflow.logger.info("Waiting for user confirm signal...")
|
||||
# We do NOT do an immediate wait_condition here;
|
||||
# instead, let the loop continue so we can still handle prompts/end_chat signals.
|
||||
|
||||
elif next_step == "done":
|
||||
workflow.logger.info("All steps completed. Exiting workflow.")
|
||||
return str(self.conversation_history)
|
||||
|
||||
# Possibly continue-as-new after many turns
|
||||
# todo ensure this doesn't lose critical context
|
||||
if len(self.conversation_history) >= self.max_turns_before_continue:
|
||||
summary_context, summary_prompt = self.prompt_summary_with_history()
|
||||
summary_input = ToolPromptInput(
|
||||
prompt=summary_prompt,
|
||||
context_instructions=summary_context,
|
||||
prompt=summary_prompt, context_instructions=summary_context
|
||||
)
|
||||
|
||||
self.conversation_summary = await workflow.start_activity_method(
|
||||
ToolActivities.prompt_llm,
|
||||
summary_input,
|
||||
schedule_to_close_timeout=timedelta(seconds=20),
|
||||
)
|
||||
|
||||
workflow.logger.info(
|
||||
"Chat ended. Conversation summary:\n"
|
||||
+ f"{self.conversation_summary}"
|
||||
)
|
||||
return f"{self.conversation_summary}"
|
||||
workflow.logger.info(
|
||||
f"Continuing as new after {self.max_turns_before_continue} turns."
|
||||
)
|
||||
workflow.continue_as_new(
|
||||
args=[
|
||||
CombinedInput(
|
||||
tool_params=ToolWorkflowParams(
|
||||
conversation_summary=self.conversation_summary,
|
||||
prompt_queue=self.prompt_queue,
|
||||
),
|
||||
tools_data=tools_data,
|
||||
)
|
||||
]
|
||||
)
|
||||
|
||||
@workflow.signal
|
||||
async def user_prompt(self, prompt: str) -> None:
|
||||
@@ -206,7 +179,7 @@ class ToolWorkflow:
|
||||
return self.conversation_history
|
||||
|
||||
@workflow.query
|
||||
def get_summary_from_history(self) -> Optional[str]:
|
||||
def get_summary_from_history(self) -> Optional[dict]:
|
||||
return self.conversation_summary
|
||||
|
||||
@workflow.query
|
||||
@@ -232,5 +205,8 @@ class ToolWorkflow:
|
||||
def prompt_summary_with_history(self) -> tuple[str, str]:
|
||||
history_string = self.format_history()
|
||||
context_instructions = f"Here is the conversation history between a user and a chatbot: {history_string}"
|
||||
actual_prompt = "Please produce a two sentence summary of this conversation."
|
||||
actual_prompt = (
|
||||
"Please produce a two sentence summary of this conversation. "
|
||||
'Put the summary in the format { "summary": "<plain text>" }'
|
||||
)
|
||||
return (context_instructions, actual_prompt)
|
||||
|
||||
Reference in New Issue
Block a user