LLM planner, not perfect but ok

This commit is contained in:
Steve Androulakis
2025-01-01 16:57:08 -08:00
parent 33af355363
commit 245d64fca9
8 changed files with 275 additions and 123 deletions

View File

@@ -25,7 +25,6 @@ class ToolWorkflow:
@workflow.run
async def run(self, combined_input: CombinedInput) -> str:
params = combined_input.tool_params
tools_data = combined_input.tools_data
@@ -39,79 +38,124 @@ class ToolWorkflow:
self.prompt_queue.extend(params.prompt_queue)
while True:
# 1) Wait for a user prompt or an end-chat
await workflow.wait_condition(
lambda: bool(self.prompt_queue) or self.chat_ended
)
if self.prompt_queue:
# 1) Get the user prompt -> call initial LLM
prompt = self.prompt_queue.popleft()
self.conversation_history.append(("user", prompt))
context_instructions = generate_genai_prompt_from_tools_data(
tools_data, self.format_history()
)
prompt_input = ToolPromptInput(
prompt=prompt,
context_instructions=context_instructions,
)
responsePrechecked = await workflow.execute_activity_method(
ToolActivities.prompt_llm,
prompt_input,
schedule_to_close_timeout=timedelta(seconds=20),
)
# 2) Validate + parse in one shot
tool_data = await workflow.execute_activity_method(
ToolActivities.validate_and_parse_json,
args=[responsePrechecked, tools_data, self.format_history()],
schedule_to_close_timeout=timedelta(seconds=40),
retry_policy=RetryPolicy(initial_interval=timedelta(seconds=10)),
)
# store it
self.tool_data = tool_data
self.conversation_history.append(("response", str(tool_data)))
if self.tool_data.get("next") == "confirm":
dynamic_result = await workflow.execute_activity(
self.tool_data["tool"], # dynamic activity name
self.tool_data["args"], # single argument to pass
schedule_to_close_timeout=timedelta(seconds=20),
)
return dynamic_result
# Continue as new after X turns
if len(self.conversation_history) >= self.max_turns_before_continue:
# Summarize conversation
if self.chat_ended:
# Possibly do a summary if multiple turns
if len(self.conversation_history) > 1:
summary_context, summary_prompt = self.prompt_summary_with_history()
summary_input = ToolPromptInput(
prompt=summary_prompt,
context_instructions=summary_context,
)
self.conversation_summary = await workflow.start_activity_method(
ToolActivities.prompt_llm,
summary_input,
schedule_to_close_timeout=timedelta(seconds=20),
)
workflow.logger.info(
"Continuing as new after %i turns."
% self.max_turns_before_continue,
)
workflow.logger.info(
"Chat ended. Conversation summary:\n"
+ f"{self.conversation_summary}"
)
return f"{self.conversation_history}"
workflow.continue_as_new(
args=[
CombinedInput(
tool_params=ToolWorkflowParams(
conversation_summary=self.conversation_summary,
prompt_queue=self.prompt_queue,
),
tools_data=tools_data,
)
]
)
# 2) Pop the users new message from the queue
prompt = self.prompt_queue.popleft()
self.conversation_history.append(("user", prompt))
# 3) Call the LLM with the entire conversation + Tools
context_instructions = generate_genai_prompt_from_tools_data(
tools_data, self.format_history()
)
prompt_input = ToolPromptInput(
prompt=prompt,
context_instructions=context_instructions,
)
responsePrechecked = await workflow.execute_activity_method(
ToolActivities.prompt_llm,
prompt_input,
schedule_to_close_timeout=timedelta(seconds=20),
)
# 4) Validate + parse in one shot
tool_data = await workflow.execute_activity_method(
ToolActivities.validate_and_parse_json,
args=[responsePrechecked, tools_data, self.format_history()],
schedule_to_close_timeout=timedelta(seconds=40),
retry_policy=RetryPolicy(initial_interval=timedelta(seconds=10)),
)
# 5) Store it and show the conversation
self.tool_data = tool_data
self.conversation_history.append(("response", str(tool_data)))
# 6) Check for special flags
next_step = self.tool_data.get("next") # e.g. "confirm", "question", "done"
current_tool = self.tool_data.get(
"tool"
) # e.g. "FindEvents", "SearchFlights", "CreateInvoice"
if next_step == "confirm" and current_tool:
# We have enough info to call the tool
dynamic_result = await workflow.execute_activity(
current_tool,
self.tool_data["args"], # single argument
schedule_to_close_timeout=timedelta(seconds=20),
)
# Append tools result to the conversation
self.conversation_history.append(
(f"{current_tool}_result", str(dynamic_result))
)
# Enqueue a follow-up question to the LLM
self.prompt_queue.append(
f"The '{current_tool}' tool completed successfully with {dynamic_result}. "
"INSTRUCTIONS: Use this tool result, and the context_instructions (conversation history) to intelligently pre-fill the next tool's arguments. "
"What should we do next? "
)
# The loop continues, and on the next iteration, the workflow sees that new "prompt"
# as if the user typed it, calls the LLM, etc.
elif next_step == "done":
# LLM signals no more tools needed
workflow.logger.info("All steps completed. Exiting workflow.")
return str(self.conversation_history)
# 7) Optionally handle "continue_as_new" after many turns
if len(self.conversation_history) >= self.max_turns_before_continue:
summary_context, summary_prompt = self.prompt_summary_with_history()
summary_input = ToolPromptInput(
prompt=summary_prompt,
context_instructions=summary_context,
)
self.conversation_summary = await workflow.start_activity_method(
ToolActivities.prompt_llm,
summary_input,
schedule_to_close_timeout=timedelta(seconds=20),
)
workflow.logger.info(
f"Continuing as new after {self.max_turns_before_continue} turns."
)
workflow.continue_as_new(
args=[
CombinedInput(
tool_params=ToolWorkflowParams(
conversation_summary=self.conversation_summary,
prompt_queue=self.prompt_queue,
),
tools_data=tools_data,
)
]
)
# 8) If "next_step" is "question" or anything else,
# we just keep looping, waiting for user prompt or signals.
continue