mirror of
https://github.com/temporal-community/temporal-ai-agent.git
synced 2026-03-15 14:08:08 +01:00
Merge pull request #22 from steveandroulakis/keynote-main
better error handling for workers down.
This commit is contained in:
43
api/main.py
43
api/main.py
@@ -3,7 +3,9 @@ from typing import Optional
|
|||||||
from temporalio.client import Client
|
from temporalio.client import Client
|
||||||
from temporalio.exceptions import TemporalError
|
from temporalio.exceptions import TemporalError
|
||||||
from temporalio.api.enums.v1 import WorkflowExecutionStatus
|
from temporalio.api.enums.v1 import WorkflowExecutionStatus
|
||||||
|
from fastapi import HTTPException
|
||||||
from dotenv import load_dotenv
|
from dotenv import load_dotenv
|
||||||
|
import asyncio
|
||||||
import os
|
import os
|
||||||
|
|
||||||
from workflows.agent_goal_workflow import AgentGoalWorkflow
|
from workflows.agent_goal_workflow import AgentGoalWorkflow
|
||||||
@@ -18,12 +20,13 @@ temporal_client: Optional[Client] = None
|
|||||||
# Load environment variables
|
# Load environment variables
|
||||||
load_dotenv()
|
load_dotenv()
|
||||||
|
|
||||||
|
|
||||||
def get_agent_goal():
|
def get_agent_goal():
|
||||||
"""Get the agent goal from environment variables."""
|
"""Get the agent goal from environment variables."""
|
||||||
goal_name = os.getenv("AGENT_GOAL", "goal_match_train_invoice")
|
goal_name = os.getenv("AGENT_GOAL", "goal_match_train_invoice")
|
||||||
goals = {
|
goals = {
|
||||||
"goal_match_train_invoice": goal_match_train_invoice,
|
"goal_match_train_invoice": goal_match_train_invoice,
|
||||||
"goal_event_flight_invoice": goal_event_flight_invoice
|
"goal_event_flight_invoice": goal_event_flight_invoice,
|
||||||
}
|
}
|
||||||
return goals.get(goal_name, goal_event_flight_invoice)
|
return goals.get(goal_name, goal_event_flight_invoice)
|
||||||
|
|
||||||
@@ -76,32 +79,44 @@ async def get_conversation_history():
|
|||||||
try:
|
try:
|
||||||
handle = temporal_client.get_workflow_handle("agent-workflow")
|
handle = temporal_client.get_workflow_handle("agent-workflow")
|
||||||
|
|
||||||
status_names = {
|
|
||||||
WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_TERMINATED: "WORKFLOW_EXECUTION_STATUS_TERMINATED",
|
|
||||||
WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_CANCELED: "WORKFLOW_EXECUTION_STATUS_CANCELED",
|
|
||||||
WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_FAILED: "WORKFLOW_EXECUTION_STATUS_FAILED",
|
|
||||||
}
|
|
||||||
|
|
||||||
failed_states = [
|
failed_states = [
|
||||||
WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_TERMINATED,
|
WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_TERMINATED,
|
||||||
WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_CANCELED,
|
WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_CANCELED,
|
||||||
WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_FAILED,
|
WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_FAILED,
|
||||||
]
|
]
|
||||||
|
|
||||||
# Check workflow status first
|
|
||||||
description = await handle.describe()
|
description = await handle.describe()
|
||||||
if description.status in failed_states:
|
if description.status in failed_states:
|
||||||
status_name = status_names.get(description.status, "UNKNOWN_STATUS")
|
print("Workflow is in a failed state. Returning empty history.")
|
||||||
print(f"Workflow is in {status_name} state. Returning empty history.")
|
|
||||||
return []
|
return []
|
||||||
|
|
||||||
# Only query if workflow is running
|
# Set a timeout for the query
|
||||||
conversation_history = await handle.query("get_conversation_history")
|
try:
|
||||||
|
conversation_history = await asyncio.wait_for(
|
||||||
|
handle.query("get_conversation_history"),
|
||||||
|
timeout=5, # Timeout after 5 seconds
|
||||||
|
)
|
||||||
return conversation_history
|
return conversation_history
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
raise HTTPException(
|
||||||
|
status_code=404,
|
||||||
|
detail="Temporal query timed out (worker may be unavailable).",
|
||||||
|
)
|
||||||
|
|
||||||
except TemporalError as e:
|
except TemporalError as e:
|
||||||
print(f"Temporal error: {e}")
|
error_message = str(e)
|
||||||
return []
|
print(f"Temporal error: {error_message}")
|
||||||
|
|
||||||
|
# If worker is down or no poller is available, return a 404
|
||||||
|
if "no poller seen for task queue recently" in error_message:
|
||||||
|
raise HTTPException(
|
||||||
|
status_code=404, detail="Workflow worker unavailable or not found."
|
||||||
|
)
|
||||||
|
|
||||||
|
# For other Temporal errors, return a 500
|
||||||
|
raise HTTPException(
|
||||||
|
status_code=500, detail="Internal server error while querying workflow."
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@app.post("/send-prompt")
|
@app.post("/send-prompt")
|
||||||
|
|||||||
@@ -38,20 +38,41 @@ export default function App() {
|
|||||||
|
|
||||||
const debouncedUserInput = useDebounce(userInput, DEBOUNCE_DELAY);
|
const debouncedUserInput = useDebounce(userInput, DEBOUNCE_DELAY);
|
||||||
|
|
||||||
// Error handling utility with auto-dismiss
|
const errorTimerRef = useRef(null);
|
||||||
|
|
||||||
const handleError = useCallback((error, context) => {
|
const handleError = useCallback((error, context) => {
|
||||||
console.error(`${context}:`, error);
|
console.error(`${context}:`, error);
|
||||||
const errorMessage = error.status === 400
|
|
||||||
? error.message
|
const isConversationFetchError = error.status === 404;
|
||||||
|
const errorMessage = isConversationFetchError
|
||||||
|
? "Error fetching conversation. Retrying..." // Updated message
|
||||||
: `Error ${context.toLowerCase()}. Please try again.`;
|
: `Error ${context.toLowerCase()}. Please try again.`;
|
||||||
|
|
||||||
setError({
|
setError(prevError => {
|
||||||
visible: true,
|
// If the same 404 error is already being displayed, don't reset state (prevents flickering)
|
||||||
message: errorMessage
|
if (prevError.visible && prevError.message === errorMessage) {
|
||||||
|
return prevError;
|
||||||
|
}
|
||||||
|
return { visible: true, message: errorMessage };
|
||||||
});
|
});
|
||||||
|
|
||||||
const timer = setTimeout(() => setError(INITIAL_ERROR_STATE), 3000);
|
// Clear any existing timeout
|
||||||
return () => clearTimeout(timer);
|
if (errorTimerRef.current) {
|
||||||
|
clearTimeout(errorTimerRef.current);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Only auto-dismiss non-404 errors after 3 seconds
|
||||||
|
if (!isConversationFetchError) {
|
||||||
|
errorTimerRef.current = setTimeout(() => setError(INITIAL_ERROR_STATE), 3000);
|
||||||
|
}
|
||||||
|
}, []);
|
||||||
|
|
||||||
|
|
||||||
|
const clearErrorOnSuccess = useCallback(() => {
|
||||||
|
if (errorTimerRef.current) {
|
||||||
|
clearTimeout(errorTimerRef.current);
|
||||||
|
}
|
||||||
|
setError(INITIAL_ERROR_STATE);
|
||||||
}, []);
|
}, []);
|
||||||
|
|
||||||
const fetchConversationHistory = useCallback(async () => {
|
const fetchConversationHistory = useCallback(async () => {
|
||||||
@@ -59,13 +80,9 @@ export default function App() {
|
|||||||
const data = await apiService.getConversationHistory();
|
const data = await apiService.getConversationHistory();
|
||||||
const newConversation = data.messages || [];
|
const newConversation = data.messages || [];
|
||||||
|
|
||||||
setConversation(prevConversation => {
|
setConversation(prevConversation =>
|
||||||
// Only update if there are actual changes
|
JSON.stringify(prevConversation) !== JSON.stringify(newConversation) ? newConversation : prevConversation
|
||||||
if (JSON.stringify(prevConversation) !== JSON.stringify(newConversation)) {
|
);
|
||||||
return newConversation;
|
|
||||||
}
|
|
||||||
return prevConversation;
|
|
||||||
});
|
|
||||||
|
|
||||||
if (newConversation.length > 0) {
|
if (newConversation.length > 0) {
|
||||||
const lastMsg = newConversation[newConversation.length - 1];
|
const lastMsg = newConversation[newConversation.length - 1];
|
||||||
@@ -74,33 +91,32 @@ export default function App() {
|
|||||||
setLoading(!isAgentMessage);
|
setLoading(!isAgentMessage);
|
||||||
setDone(lastMsg.response.next === "done");
|
setDone(lastMsg.response.next === "done");
|
||||||
|
|
||||||
setLastMessage(prevLastMessage => {
|
setLastMessage(prevLastMessage =>
|
||||||
if (!prevLastMessage || lastMsg.response.response !== prevLastMessage.response.response) {
|
!prevLastMessage || lastMsg.response.response !== prevLastMessage.response.response
|
||||||
return lastMsg;
|
? lastMsg
|
||||||
}
|
: prevLastMessage
|
||||||
return prevLastMessage;
|
);
|
||||||
});
|
|
||||||
} else {
|
} else {
|
||||||
setLoading(false);
|
setLoading(false);
|
||||||
setDone(true);
|
setDone(true);
|
||||||
setLastMessage(null);
|
setLastMessage(null);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Successfully fetched data, clear any persistent errors
|
||||||
|
clearErrorOnSuccess();
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
handleError(err, "fetching conversation");
|
handleError(err, "fetching conversation");
|
||||||
}
|
}
|
||||||
}, [handleError]);
|
}, [handleError, clearErrorOnSuccess]);
|
||||||
|
|
||||||
// Setup polling with cleanup
|
// Setup polling with cleanup
|
||||||
useEffect(() => {
|
useEffect(() => {
|
||||||
pollingRef.current = setInterval(fetchConversationHistory, POLL_INTERVAL);
|
pollingRef.current = setInterval(fetchConversationHistory, POLL_INTERVAL);
|
||||||
|
|
||||||
return () => {
|
return () => clearInterval(pollingRef.current);
|
||||||
if (pollingRef.current) {
|
|
||||||
clearInterval(pollingRef.current);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}, [fetchConversationHistory]);
|
}, [fetchConversationHistory]);
|
||||||
|
|
||||||
|
|
||||||
const scrollToBottom = useCallback(() => {
|
const scrollToBottom = useCallback(() => {
|
||||||
if (containerRef.current) {
|
if (containerRef.current) {
|
||||||
if (scrollTimeoutRef.current) {
|
if (scrollTimeoutRef.current) {
|
||||||
|
|||||||
Reference in New Issue
Block a user