From b62df55861613aff95c6fbae28851bc86b488562 Mon Sep 17 00:00:00 2001 From: Steve Androulakis Date: Mon, 3 Mar 2025 07:44:55 +0000 Subject: [PATCH] better error handling for workers down. sure steve, ship code the day before the keynote --- api/main.py | 47 +++++++++++++-------- frontend/src/pages/App.jsx | 84 +++++++++++++++++++++++--------------- 2 files changed, 81 insertions(+), 50 deletions(-) diff --git a/api/main.py b/api/main.py index 81ce54d..b381bb6 100644 --- a/api/main.py +++ b/api/main.py @@ -3,7 +3,9 @@ from typing import Optional from temporalio.client import Client from temporalio.exceptions import TemporalError from temporalio.api.enums.v1 import WorkflowExecutionStatus +from fastapi import HTTPException from dotenv import load_dotenv +import asyncio import os from workflows.agent_goal_workflow import AgentGoalWorkflow @@ -18,12 +20,13 @@ temporal_client: Optional[Client] = None # Load environment variables load_dotenv() + def get_agent_goal(): """Get the agent goal from environment variables.""" goal_name = os.getenv("AGENT_GOAL", "goal_match_train_invoice") goals = { "goal_match_train_invoice": goal_match_train_invoice, - "goal_event_flight_invoice": goal_event_flight_invoice + "goal_event_flight_invoice": goal_event_flight_invoice, } return goals.get(goal_name, goal_event_flight_invoice) @@ -76,32 +79,44 @@ async def get_conversation_history(): try: handle = temporal_client.get_workflow_handle("agent-workflow") - status_names = { - WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_TERMINATED: "WORKFLOW_EXECUTION_STATUS_TERMINATED", - WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_CANCELED: "WORKFLOW_EXECUTION_STATUS_CANCELED", - WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_FAILED: "WORKFLOW_EXECUTION_STATUS_FAILED", - } - failed_states = [ WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_TERMINATED, WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_CANCELED, WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_FAILED, ] - # Check workflow status first description = await handle.describe() if description.status in failed_states: - status_name = status_names.get(description.status, "UNKNOWN_STATUS") - print(f"Workflow is in {status_name} state. Returning empty history.") + print("Workflow is in a failed state. Returning empty history.") return [] - # Only query if workflow is running - conversation_history = await handle.query("get_conversation_history") - return conversation_history + # Set a timeout for the query + try: + conversation_history = await asyncio.wait_for( + handle.query("get_conversation_history"), + timeout=5, # Timeout after 5 seconds + ) + return conversation_history + except asyncio.TimeoutError: + raise HTTPException( + status_code=404, + detail="Temporal query timed out (worker may be unavailable).", + ) except TemporalError as e: - print(f"Temporal error: {e}") - return [] + error_message = str(e) + print(f"Temporal error: {error_message}") + + # If worker is down or no poller is available, return a 404 + if "no poller seen for task queue recently" in error_message: + raise HTTPException( + status_code=404, detail="Workflow worker unavailable or not found." + ) + + # For other Temporal errors, return a 500 + raise HTTPException( + status_code=500, detail="Internal server error while querying workflow." + ) @app.post("/send-prompt") @@ -155,7 +170,7 @@ async def end_chat(): async def start_workflow(): # Get the configured goal agent_goal = get_agent_goal() - + # Create combined input combined_input = CombinedInput( tool_params=AgentGoalWorkflowParams(None, None), diff --git a/frontend/src/pages/App.jsx b/frontend/src/pages/App.jsx index 4fde2ae..636d7e1 100644 --- a/frontend/src/pages/App.jsx +++ b/frontend/src/pages/App.jsx @@ -38,68 +38,84 @@ export default function App() { const debouncedUserInput = useDebounce(userInput, DEBOUNCE_DELAY); - // Error handling utility with auto-dismiss + const errorTimerRef = useRef(null); + const handleError = useCallback((error, context) => { console.error(`${context}:`, error); - const errorMessage = error.status === 400 - ? error.message - : `Error ${context.toLowerCase()}. Please try again.`; - - setError({ - visible: true, - message: errorMessage - }); - const timer = setTimeout(() => setError(INITIAL_ERROR_STATE), 3000); - return () => clearTimeout(timer); + const isConversationFetchError = error.status === 404; + const errorMessage = isConversationFetchError + ? "Error fetching conversation. Retrying..." // Updated message + : `Error ${context.toLowerCase()}. Please try again.`; + + setError(prevError => { + // If the same 404 error is already being displayed, don't reset state (prevents flickering) + if (prevError.visible && prevError.message === errorMessage) { + return prevError; + } + return { visible: true, message: errorMessage }; + }); + + // Clear any existing timeout + if (errorTimerRef.current) { + clearTimeout(errorTimerRef.current); + } + + // Only auto-dismiss non-404 errors after 3 seconds + if (!isConversationFetchError) { + errorTimerRef.current = setTimeout(() => setError(INITIAL_ERROR_STATE), 3000); + } }, []); - + + + const clearErrorOnSuccess = useCallback(() => { + if (errorTimerRef.current) { + clearTimeout(errorTimerRef.current); + } + setError(INITIAL_ERROR_STATE); + }, []); + const fetchConversationHistory = useCallback(async () => { try { const data = await apiService.getConversationHistory(); const newConversation = data.messages || []; - setConversation(prevConversation => { - // Only update if there are actual changes - if (JSON.stringify(prevConversation) !== JSON.stringify(newConversation)) { - return newConversation; - } - return prevConversation; - }); - + setConversation(prevConversation => + JSON.stringify(prevConversation) !== JSON.stringify(newConversation) ? newConversation : prevConversation + ); + if (newConversation.length > 0) { const lastMsg = newConversation[newConversation.length - 1]; const isAgentMessage = lastMsg.actor === "agent"; setLoading(!isAgentMessage); setDone(lastMsg.response.next === "done"); - - setLastMessage(prevLastMessage => { - if (!prevLastMessage || lastMsg.response.response !== prevLastMessage.response.response) { - return lastMsg; - } - return prevLastMessage; - }); + + setLastMessage(prevLastMessage => + !prevLastMessage || lastMsg.response.response !== prevLastMessage.response.response + ? lastMsg + : prevLastMessage + ); } else { setLoading(false); setDone(true); setLastMessage(null); } + + // Successfully fetched data, clear any persistent errors + clearErrorOnSuccess(); } catch (err) { handleError(err, "fetching conversation"); } - }, [handleError]); - + }, [handleError, clearErrorOnSuccess]); + // Setup polling with cleanup useEffect(() => { pollingRef.current = setInterval(fetchConversationHistory, POLL_INTERVAL); - return () => { - if (pollingRef.current) { - clearInterval(pollingRef.current); - } - }; + return () => clearInterval(pollingRef.current); }, [fetchConversationHistory]); + const scrollToBottom = useCallback(() => { if (containerRef.current) {