policy tuning (fast activity retries)

This commit is contained in:
Steve Androulakis
2025-02-04 15:05:54 -08:00
parent 3b07a4c96b
commit 2f8a7e9222

View File

@@ -15,8 +15,10 @@ with workflow.unsafe.imports_passed_through():
# Constants # Constants
MAX_TURNS_BEFORE_CONTINUE = 250 MAX_TURNS_BEFORE_CONTINUE = 250
TOOL_ACTIVITY_TIMEOUT = timedelta(seconds=20) TOOL_ACTIVITY_START_TO_CLOSE_TIMEOUT = timedelta(seconds=10)
LLM_ACTIVITY_TIMEOUT = timedelta(minutes=30) TOOL_ACTIVITY_SCHEDULE_TO_CLOSE_TIMEOUT = timedelta(minutes=30)
LLM_ACTIVITY_START_TO_CLOSE_TIMEOUT = timedelta(seconds=5)
LLM_ACTIVITY_SCHEDULE_TO_CLOSE_TIMEOUT = timedelta(minutes=30)
class ToolData(TypedDict, total=False): class ToolData(TypedDict, total=False):
@@ -44,20 +46,23 @@ class ToolWorkflow:
) -> None: ) -> None:
"""Execute a tool after confirmation and handle its result.""" """Execute a tool after confirmation and handle its result."""
workflow.logger.info(f"Confirmed. Proceeding with tool: {current_tool}") workflow.logger.info(f"Confirmed. Proceeding with tool: {current_tool}")
try: try:
dynamic_result = await workflow.execute_activity( dynamic_result = await workflow.execute_activity(
current_tool, current_tool,
tool_data["args"], tool_data["args"],
schedule_to_close_timeout=TOOL_ACTIVITY_TIMEOUT, schedule_to_close_timeout=TOOL_ACTIVITY_SCHEDULE_TO_CLOSE_TIMEOUT,
retry_policy=RetryPolicy(maximum_attempts=3) start_to_close_timeout=TOOL_ACTIVITY_START_TO_CLOSE_TIMEOUT,
retry_policy=RetryPolicy(
initial_interval=timedelta(seconds=5), backoff_coefficient=1
),
) )
dynamic_result["tool"] = current_tool dynamic_result["tool"] = current_tool
self.tool_results.append(dynamic_result) self.tool_results.append(dynamic_result)
except ActivityError as e: except ActivityError as e:
workflow.logger.error(f"Tool execution failed: {str(e)}") workflow.logger.error(f"Tool execution failed: {str(e)}")
dynamic_result = {"error": str(e), "tool": current_tool} dynamic_result = {"error": str(e), "tool": current_tool}
self.add_message("tool_result", dynamic_result) self.add_message("tool_result", dynamic_result)
self.prompt_queue.append( self.prompt_queue.append(
@@ -99,7 +104,7 @@ class ToolWorkflow:
self.conversation_summary = await workflow.start_activity_method( self.conversation_summary = await workflow.start_activity_method(
ToolActivities.prompt_llm, ToolActivities.prompt_llm,
summary_input, summary_input,
schedule_to_close_timeout=TOOL_ACTIVITY_TIMEOUT, schedule_to_close_timeout=LLM_ACTIVITY_SCHEDULE_TO_CLOSE_TIMEOUT,
) )
workflow.logger.info( workflow.logger.info(
f"Continuing as new after {MAX_TURNS_BEFORE_CONTINUE} turns." f"Continuing as new after {MAX_TURNS_BEFORE_CONTINUE} turns."
@@ -156,7 +161,7 @@ class ToolWorkflow:
prompt = self.prompt_queue.popleft() prompt = self.prompt_queue.popleft()
if not prompt.startswith("###"): if not prompt.startswith("###"):
self.add_message("user", prompt) self.add_message("user", prompt)
# Validate the prompt before proceeding # Validate the prompt before proceeding
validation_input = ValidationInput( validation_input = ValidationInput(
prompt=prompt, prompt=prompt,
@@ -166,15 +171,20 @@ class ToolWorkflow:
validation_result = await workflow.execute_activity( validation_result = await workflow.execute_activity(
ToolActivities.validate_llm_prompt, ToolActivities.validate_llm_prompt,
args=[validation_input], args=[validation_input],
schedule_to_close_timeout=LLM_ACTIVITY_TIMEOUT, schedule_to_close_timeout=LLM_ACTIVITY_SCHEDULE_TO_CLOSE_TIMEOUT,
retry_policy=RetryPolicy(initial_interval=timedelta(seconds=5)), start_to_close_timeout=LLM_ACTIVITY_START_TO_CLOSE_TIMEOUT,
retry_policy=RetryPolicy(
initial_interval=timedelta(seconds=5), backoff_coefficient=1
),
) )
if not validation_result.validationResult: if not validation_result.validationResult:
workflow.logger.warning( workflow.logger.warning(
f"Prompt validation failed: {validation_result.validationFailedReason}" f"Prompt validation failed: {validation_result.validationFailedReason}"
) )
self.add_message("agent", validation_result.validationFailedReason) self.add_message(
"agent", validation_result.validationFailedReason
)
continue continue
# Proceed with generating the context and prompt # Proceed with generating the context and prompt
@@ -191,9 +201,10 @@ class ToolWorkflow:
tool_data = await workflow.execute_activity( tool_data = await workflow.execute_activity(
ToolActivities.prompt_llm, ToolActivities.prompt_llm,
prompt_input, prompt_input,
schedule_to_close_timeout=LLM_ACTIVITY_TIMEOUT, schedule_to_close_timeout=LLM_ACTIVITY_SCHEDULE_TO_CLOSE_TIMEOUT,
start_to_close_timeout=LLM_ACTIVITY_START_TO_CLOSE_TIMEOUT,
retry_policy=RetryPolicy( retry_policy=RetryPolicy(
maximum_attempts=5, initial_interval=timedelta(seconds=15) initial_interval=timedelta(seconds=5), backoff_coefficient=1
), ),
) )
self.tool_data = tool_data self.tool_data = tool_data
@@ -299,7 +310,7 @@ class ToolWorkflow:
workflow.logger.debug(f"Adding {actor} message: {response_str[:100]}...") workflow.logger.debug(f"Adding {actor} message: {response_str[:100]}...")
else: else:
workflow.logger.debug(f"Adding {actor} message: {response[:100]}...") workflow.logger.debug(f"Adding {actor} message: {response[:100]}...")
self.conversation_history["messages"].append( self.conversation_history["messages"].append(
{"actor": actor, "response": response} {"actor": actor, "response": response}
) )