mirror of
https://github.com/temporal-community/temporal-ai-agent.git
synced 2026-03-16 22:48:09 +01:00
system context
This commit is contained in:
@@ -2,7 +2,12 @@
|
|||||||
|
|
||||||
Multi-Turn Chat using an Entity Workflow. The workflow runs forever unless explicitly ended. The workflow continues as new after a configurable number of chat turns to keep the prompt size small and the Temporal event history small. Each continued-as-new workflow receives a summary of the conversation history so far for context.
|
Multi-Turn Chat using an Entity Workflow. The workflow runs forever unless explicitly ended. The workflow continues as new after a configurable number of chat turns to keep the prompt size small and the Temporal event history small. Each continued-as-new workflow receives a summary of the conversation history so far for context.
|
||||||
|
|
||||||
To run, first see `samples-python` [README.md](../../README.md), and `ollama` [README.md](../README.md) for prerequisites specific to this sample. Once set up, run the following from this directory:
|
## Setup
|
||||||
|
* Install [Ollama](https://ollama.com) and the Mistral model (`ollama run mistral`).
|
||||||
|
* Install and run Temporal. Follow the instructions in the [Temporal documentation](https://learn.temporal.io/getting_started/python/dev_environment/#set-up-a-local-temporal-service-for-development-with-temporal-cli) to install and run the Temporal server.
|
||||||
|
* Install the dependencies: `poetry install`
|
||||||
|
|
||||||
|
## Running the example
|
||||||
|
|
||||||
1. Run the worker: `poetry run python run_worker.py`
|
1. Run the worker: `poetry run python run_worker.py`
|
||||||
2. In another terminal run the client with a prompt.
|
2. In another terminal run the client with a prompt.
|
||||||
|
|||||||
@@ -1,19 +1,26 @@
|
|||||||
|
from dataclasses import dataclass
|
||||||
from temporalio import activity
|
from temporalio import activity
|
||||||
from ollama import chat, ChatResponse
|
from ollama import chat, ChatResponse
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class OllamaPromptInput:
|
||||||
|
prompt: str
|
||||||
|
context_instructions: str
|
||||||
|
|
||||||
class OllamaActivities:
|
class OllamaActivities:
|
||||||
@activity.defn
|
@activity.defn
|
||||||
def prompt_ollama(self, prompt: str) -> str:
|
def prompt_ollama(self, input: OllamaPromptInput) -> str:
|
||||||
model_name = 'mistral'
|
model_name = 'mistral'
|
||||||
messages = [
|
messages = [
|
||||||
|
{
|
||||||
|
'role': 'system',
|
||||||
|
'content': input.context_instructions,
|
||||||
|
},
|
||||||
{
|
{
|
||||||
'role': 'user',
|
'role': 'user',
|
||||||
'content': prompt
|
'content': input.prompt,
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
|
|
||||||
# Call ollama's chat function
|
|
||||||
response: ChatResponse = chat(model=model_name, messages=messages)
|
response: ChatResponse = chat(model=model_name, messages=messages)
|
||||||
|
|
||||||
# Return the model's text response
|
|
||||||
return response.message.content
|
return response.message.content
|
||||||
|
|||||||
@@ -14,7 +14,7 @@ async def main(prompt):
|
|||||||
# Sends a signal to the workflow (and starts it if needed)
|
# Sends a signal to the workflow (and starts it if needed)
|
||||||
await client.start_workflow(
|
await client.start_workflow(
|
||||||
EntityOllamaWorkflow.run,
|
EntityOllamaWorkflow.run,
|
||||||
OllamaParams(None, None),
|
OllamaParams(None, None), # or pass in custom summary/prompt_queue if desired
|
||||||
id=workflow_id,
|
id=workflow_id,
|
||||||
task_queue="ollama-task-queue",
|
task_queue="ollama-task-queue",
|
||||||
start_signal="user_prompt",
|
start_signal="user_prompt",
|
||||||
|
|||||||
103
workflows.py
103
workflows.py
@@ -6,7 +6,8 @@ from typing import Deque, List, Optional, Tuple
|
|||||||
from temporalio import workflow
|
from temporalio import workflow
|
||||||
|
|
||||||
with workflow.unsafe.imports_passed_through():
|
with workflow.unsafe.imports_passed_through():
|
||||||
from activities import OllamaActivities
|
# Import the updated OllamaActivities and the new dataclass
|
||||||
|
from activities import OllamaActivities, OllamaPromptInput
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
@@ -18,7 +19,6 @@ class OllamaParams:
|
|||||||
@workflow.defn
|
@workflow.defn
|
||||||
class EntityOllamaWorkflow:
|
class EntityOllamaWorkflow:
|
||||||
def __init__(self) -> None:
|
def __init__(self) -> None:
|
||||||
# List to store prompt history
|
|
||||||
self.conversation_history: List[Tuple[str, str]] = []
|
self.conversation_history: List[Tuple[str, str]] = []
|
||||||
self.prompt_queue: Deque[str] = deque()
|
self.prompt_queue: Deque[str] = deque()
|
||||||
self.conversation_summary: Optional[str] = None
|
self.conversation_summary: Optional[str] = None
|
||||||
@@ -26,16 +26,11 @@ class EntityOllamaWorkflow:
|
|||||||
self.chat_ended: bool = False
|
self.chat_ended: bool = False
|
||||||
|
|
||||||
@workflow.run
|
@workflow.run
|
||||||
async def run(
|
async def run(self, params: OllamaParams) -> str:
|
||||||
self,
|
|
||||||
params: OllamaParams,
|
|
||||||
) -> str:
|
|
||||||
|
|
||||||
if params and params.conversation_summary:
|
if params and params.conversation_summary:
|
||||||
self.conversation_history.append(
|
self.conversation_history.append(
|
||||||
("conversation_summary", params.conversation_summary)
|
("conversation_summary", params.conversation_summary)
|
||||||
)
|
)
|
||||||
|
|
||||||
self.conversation_summary = params.conversation_summary
|
self.conversation_summary = params.conversation_summary
|
||||||
|
|
||||||
if params and params.prompt_queue:
|
if params and params.prompt_queue:
|
||||||
@@ -44,71 +39,79 @@ class EntityOllamaWorkflow:
|
|||||||
while True:
|
while True:
|
||||||
workflow.logger.info("Waiting for prompts...")
|
workflow.logger.info("Waiting for prompts...")
|
||||||
|
|
||||||
# Wait for a chat message (signal) or timeout
|
|
||||||
await workflow.wait_condition(
|
await workflow.wait_condition(
|
||||||
lambda: bool(self.prompt_queue) or self.chat_ended
|
lambda: bool(self.prompt_queue) or self.chat_ended
|
||||||
)
|
)
|
||||||
|
|
||||||
if self.prompt_queue:
|
if self.prompt_queue:
|
||||||
# Fetch next user prompt and add to conversation history
|
# Get user's prompt
|
||||||
prompt = self.prompt_queue.popleft()
|
prompt = self.prompt_queue.popleft()
|
||||||
self.conversation_history.append(("user", prompt))
|
self.conversation_history.append(("user", prompt))
|
||||||
|
|
||||||
|
# Build prompt + context
|
||||||
|
context_instructions, actual_prompt = self.prompt_with_history(prompt)
|
||||||
workflow.logger.info("Prompt: " + prompt)
|
workflow.logger.info("Prompt: " + prompt)
|
||||||
|
|
||||||
# Send prompt to Ollama
|
# Pass a single input object
|
||||||
|
prompt_input = OllamaPromptInput(
|
||||||
|
prompt=actual_prompt,
|
||||||
|
context_instructions=context_instructions,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Call activity with one argument
|
||||||
response = await workflow.execute_activity_method(
|
response = await workflow.execute_activity_method(
|
||||||
OllamaActivities.prompt_ollama,
|
OllamaActivities.prompt_ollama,
|
||||||
self.prompt_with_history(prompt),
|
prompt_input,
|
||||||
schedule_to_close_timeout=timedelta(seconds=20),
|
schedule_to_close_timeout=timedelta(seconds=20),
|
||||||
)
|
)
|
||||||
|
|
||||||
workflow.logger.info(f"{response}")
|
workflow.logger.info(f"Ollama response: {response}")
|
||||||
|
|
||||||
# Append the response to the conversation history
|
|
||||||
self.conversation_history.append(("response", response))
|
self.conversation_history.append(("response", response))
|
||||||
|
|
||||||
# Continue as new every x conversational turns to avoid event
|
# Continue as new after X turns
|
||||||
# history size getting too large. This is also to avoid the
|
|
||||||
# prompt (with conversational history) getting too large for
|
|
||||||
# AWS Ollama.
|
|
||||||
|
|
||||||
# We summarize the chat to date and use that as input to the
|
|
||||||
# new workflow
|
|
||||||
if len(self.conversation_history) >= self.continue_as_new_per_turns:
|
if len(self.conversation_history) >= self.continue_as_new_per_turns:
|
||||||
# Summarize the conversation to date using Ollama
|
# Summarize conversation
|
||||||
|
summary_context, summary_prompt = self.prompt_summary_with_history()
|
||||||
|
summary_input = OllamaPromptInput(
|
||||||
|
prompt=summary_prompt,
|
||||||
|
context_instructions=summary_context,
|
||||||
|
)
|
||||||
|
|
||||||
self.conversation_summary = await workflow.start_activity_method(
|
self.conversation_summary = await workflow.start_activity_method(
|
||||||
OllamaActivities.prompt_ollama,
|
OllamaActivities.prompt_ollama,
|
||||||
self.prompt_summary_from_history(),
|
summary_input,
|
||||||
schedule_to_close_timeout=timedelta(seconds=20),
|
schedule_to_close_timeout=timedelta(seconds=20),
|
||||||
)
|
)
|
||||||
|
|
||||||
workflow.logger.info(
|
workflow.logger.info(
|
||||||
"Continuing as new due to %i conversational turns."
|
"Continuing as new after %i turns."
|
||||||
% self.continue_as_new_per_turns,
|
% self.continue_as_new_per_turns,
|
||||||
)
|
)
|
||||||
|
|
||||||
workflow.continue_as_new(
|
workflow.continue_as_new(
|
||||||
args=[
|
args=[
|
||||||
OllamaParams(
|
OllamaParams(
|
||||||
self.conversation_summary,
|
conversation_summary=self.conversation_summary,
|
||||||
self.prompt_queue,
|
prompt_queue=self.prompt_queue,
|
||||||
)
|
)
|
||||||
]
|
]
|
||||||
)
|
)
|
||||||
|
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# If end chat signal was sent
|
# Handle end of chat
|
||||||
if self.chat_ended:
|
if self.chat_ended:
|
||||||
# The workflow might be continued as new without any
|
|
||||||
# chat to summarize, so only call Ollama if there
|
|
||||||
# is more than the previous summary in the history.
|
|
||||||
if len(self.conversation_history) > 1:
|
if len(self.conversation_history) > 1:
|
||||||
# Summarize the conversation to date using Ollama
|
# Summarize conversation
|
||||||
|
summary_context, summary_prompt = self.prompt_summary_with_history()
|
||||||
|
summary_input = OllamaPromptInput(
|
||||||
|
prompt=summary_prompt,
|
||||||
|
context_instructions=summary_context,
|
||||||
|
)
|
||||||
|
|
||||||
self.conversation_summary = await workflow.start_activity_method(
|
self.conversation_summary = await workflow.start_activity_method(
|
||||||
OllamaActivities.prompt_ollama,
|
OllamaActivities.prompt_ollama,
|
||||||
self.prompt_summary_from_history(),
|
summary_input,
|
||||||
schedule_to_close_timeout=timedelta(seconds=20),
|
schedule_to_close_timeout=timedelta(seconds=20),
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -116,16 +119,13 @@ class EntityOllamaWorkflow:
|
|||||||
"Chat ended. Conversation summary:\n"
|
"Chat ended. Conversation summary:\n"
|
||||||
+ f"{self.conversation_summary}"
|
+ f"{self.conversation_summary}"
|
||||||
)
|
)
|
||||||
|
|
||||||
return f"{self.conversation_history}"
|
return f"{self.conversation_history}"
|
||||||
|
|
||||||
@workflow.signal
|
@workflow.signal
|
||||||
async def user_prompt(self, prompt: str) -> None:
|
async def user_prompt(self, prompt: str) -> None:
|
||||||
# Chat ended but the workflow is waiting for a chat summary to be generated
|
|
||||||
if self.chat_ended:
|
if self.chat_ended:
|
||||||
workflow.logger.warn(f"Message dropped due to chat closed: {prompt}")
|
workflow.logger.warn(f"Message dropped due to chat closed: {prompt}")
|
||||||
return
|
return
|
||||||
|
|
||||||
self.prompt_queue.append(prompt)
|
self.prompt_queue.append(prompt)
|
||||||
|
|
||||||
@workflow.signal
|
@workflow.signal
|
||||||
@@ -140,25 +140,28 @@ class EntityOllamaWorkflow:
|
|||||||
def get_summary_from_history(self) -> Optional[str]:
|
def get_summary_from_history(self) -> Optional[str]:
|
||||||
return self.conversation_summary
|
return self.conversation_summary
|
||||||
|
|
||||||
# Helper method used in prompts to Ollama
|
# Helper: generate text of the entire conversation so far
|
||||||
def format_history(self) -> str:
|
def format_history(self) -> str:
|
||||||
return " ".join(f"{text}" for _, text in self.conversation_history)
|
return " ".join(f"{text}" for _, text in self.conversation_history)
|
||||||
|
|
||||||
# Create the prompt given to Ollama for each conversational turn
|
# Return (context_instructions, prompt)
|
||||||
def prompt_with_history(self, prompt: str) -> str:
|
def prompt_with_history(self, prompt: str) -> tuple[str, str]:
|
||||||
history_string = self.format_history()
|
history_string = self.format_history()
|
||||||
return (
|
context_instructions = (
|
||||||
f"Here is the conversation history: {history_string} Please add "
|
f"Here is the conversation history: {history_string} "
|
||||||
+ "a few sentence response to the prompt in plain text sentences. "
|
"Please add a few sentence response in plain text sentences. "
|
||||||
+ "Don't editorialize or add metadata like response. Keep the "
|
"Don't editorialize or add metadata. "
|
||||||
+ f"text a plain explanation based on the history. Prompt: {prompt}"
|
"Keep the text a plain explanation based on the history."
|
||||||
)
|
)
|
||||||
|
return (context_instructions, prompt)
|
||||||
|
|
||||||
# Create the prompt to Ollama to summarize the conversation history
|
# Return (context_instructions, prompt) for summarizing the conversation
|
||||||
def prompt_summary_from_history(self) -> str:
|
def prompt_summary_with_history(self) -> tuple[str, str]:
|
||||||
history_string = self.format_history()
|
history_string = self.format_history()
|
||||||
return (
|
context_instructions = (
|
||||||
"Here is the conversation history between a user and a chatbot: "
|
f"Here is the conversation history between a user and a chatbot: {history_string}"
|
||||||
+ f"{history_string} -- Please produce a two sentence summary of "
|
|
||||||
+ "this conversation."
|
|
||||||
)
|
)
|
||||||
|
actual_prompt = (
|
||||||
|
"Please produce a two sentence summary of this conversation."
|
||||||
|
)
|
||||||
|
return (context_instructions, actual_prompt)
|
||||||
|
|||||||
Reference in New Issue
Block a user