From 4286be2e5db530fc1a1b9563c848a85485de29d6 Mon Sep 17 00:00:00 2001 From: Steve Androulakis Date: Tue, 31 Dec 2024 12:19:53 -0800 Subject: [PATCH] system context --- README.md | 7 +++- activities.py | 17 +++++--- send_message.py | 2 +- workflows.py | 103 +++++++++++++++++++++++++----------------------- 4 files changed, 72 insertions(+), 57 deletions(-) diff --git a/README.md b/README.md index f51a6db..6ad86a0 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,12 @@ Multi-Turn Chat using an Entity Workflow. The workflow runs forever unless explicitly ended. The workflow continues as new after a configurable number of chat turns to keep the prompt size small and the Temporal event history small. Each continued-as-new workflow receives a summary of the conversation history so far for context. -To run, first see `samples-python` [README.md](../../README.md), and `ollama` [README.md](../README.md) for prerequisites specific to this sample. Once set up, run the following from this directory: +## Setup +* Install [Ollama](https://ollama.com) and the Mistral model (`ollama run mistral`). +* Install and run Temporal. Follow the instructions in the [Temporal documentation](https://learn.temporal.io/getting_started/python/dev_environment/#set-up-a-local-temporal-service-for-development-with-temporal-cli) to install and run the Temporal server. +* Install the dependencies: `poetry install` + +## Running the example 1. Run the worker: `poetry run python run_worker.py` 2. In another terminal run the client with a prompt. diff --git a/activities.py b/activities.py index 7a52b22..30c3b9a 100644 --- a/activities.py +++ b/activities.py @@ -1,19 +1,26 @@ +from dataclasses import dataclass from temporalio import activity from ollama import chat, ChatResponse +@dataclass +class OllamaPromptInput: + prompt: str + context_instructions: str + class OllamaActivities: @activity.defn - def prompt_ollama(self, prompt: str) -> str: + def prompt_ollama(self, input: OllamaPromptInput) -> str: model_name = 'mistral' messages = [ + { + 'role': 'system', + 'content': input.context_instructions, + }, { 'role': 'user', - 'content': prompt + 'content': input.prompt, } ] - # Call ollama's chat function response: ChatResponse = chat(model=model_name, messages=messages) - - # Return the model's text response return response.message.content diff --git a/send_message.py b/send_message.py index 93fef85..3645236 100644 --- a/send_message.py +++ b/send_message.py @@ -14,7 +14,7 @@ async def main(prompt): # Sends a signal to the workflow (and starts it if needed) await client.start_workflow( EntityOllamaWorkflow.run, - OllamaParams(None, None), + OllamaParams(None, None), # or pass in custom summary/prompt_queue if desired id=workflow_id, task_queue="ollama-task-queue", start_signal="user_prompt", diff --git a/workflows.py b/workflows.py index 095fc02..a427f65 100644 --- a/workflows.py +++ b/workflows.py @@ -6,7 +6,8 @@ from typing import Deque, List, Optional, Tuple from temporalio import workflow with workflow.unsafe.imports_passed_through(): - from activities import OllamaActivities + # Import the updated OllamaActivities and the new dataclass + from activities import OllamaActivities, OllamaPromptInput @dataclass @@ -18,7 +19,6 @@ class OllamaParams: @workflow.defn class EntityOllamaWorkflow: def __init__(self) -> None: - # List to store prompt history self.conversation_history: List[Tuple[str, str]] = [] self.prompt_queue: Deque[str] = deque() self.conversation_summary: Optional[str] = None @@ -26,16 +26,11 @@ class EntityOllamaWorkflow: self.chat_ended: bool = False @workflow.run - async def run( - self, - params: OllamaParams, - ) -> str: - + async def run(self, params: OllamaParams) -> str: if params and params.conversation_summary: self.conversation_history.append( ("conversation_summary", params.conversation_summary) ) - self.conversation_summary = params.conversation_summary if params and params.prompt_queue: @@ -44,71 +39,79 @@ class EntityOllamaWorkflow: while True: workflow.logger.info("Waiting for prompts...") - # Wait for a chat message (signal) or timeout await workflow.wait_condition( lambda: bool(self.prompt_queue) or self.chat_ended ) if self.prompt_queue: - # Fetch next user prompt and add to conversation history + # Get user's prompt prompt = self.prompt_queue.popleft() self.conversation_history.append(("user", prompt)) + # Build prompt + context + context_instructions, actual_prompt = self.prompt_with_history(prompt) workflow.logger.info("Prompt: " + prompt) - # Send prompt to Ollama + # Pass a single input object + prompt_input = OllamaPromptInput( + prompt=actual_prompt, + context_instructions=context_instructions, + ) + + # Call activity with one argument response = await workflow.execute_activity_method( OllamaActivities.prompt_ollama, - self.prompt_with_history(prompt), + prompt_input, schedule_to_close_timeout=timedelta(seconds=20), ) - workflow.logger.info(f"{response}") - - # Append the response to the conversation history + workflow.logger.info(f"Ollama response: {response}") self.conversation_history.append(("response", response)) - # Continue as new every x conversational turns to avoid event - # history size getting too large. This is also to avoid the - # prompt (with conversational history) getting too large for - # AWS Ollama. - - # We summarize the chat to date and use that as input to the - # new workflow + # Continue as new after X turns if len(self.conversation_history) >= self.continue_as_new_per_turns: - # Summarize the conversation to date using Ollama + # Summarize conversation + summary_context, summary_prompt = self.prompt_summary_with_history() + summary_input = OllamaPromptInput( + prompt=summary_prompt, + context_instructions=summary_context, + ) + self.conversation_summary = await workflow.start_activity_method( OllamaActivities.prompt_ollama, - self.prompt_summary_from_history(), + summary_input, schedule_to_close_timeout=timedelta(seconds=20), ) workflow.logger.info( - "Continuing as new due to %i conversational turns." + "Continuing as new after %i turns." % self.continue_as_new_per_turns, ) workflow.continue_as_new( args=[ OllamaParams( - self.conversation_summary, - self.prompt_queue, + conversation_summary=self.conversation_summary, + prompt_queue=self.prompt_queue, ) ] ) continue - # If end chat signal was sent + # Handle end of chat if self.chat_ended: - # The workflow might be continued as new without any - # chat to summarize, so only call Ollama if there - # is more than the previous summary in the history. if len(self.conversation_history) > 1: - # Summarize the conversation to date using Ollama + # Summarize conversation + summary_context, summary_prompt = self.prompt_summary_with_history() + summary_input = OllamaPromptInput( + prompt=summary_prompt, + context_instructions=summary_context, + ) + self.conversation_summary = await workflow.start_activity_method( OllamaActivities.prompt_ollama, - self.prompt_summary_from_history(), + summary_input, schedule_to_close_timeout=timedelta(seconds=20), ) @@ -116,16 +119,13 @@ class EntityOllamaWorkflow: "Chat ended. Conversation summary:\n" + f"{self.conversation_summary}" ) - return f"{self.conversation_history}" @workflow.signal async def user_prompt(self, prompt: str) -> None: - # Chat ended but the workflow is waiting for a chat summary to be generated if self.chat_ended: workflow.logger.warn(f"Message dropped due to chat closed: {prompt}") return - self.prompt_queue.append(prompt) @workflow.signal @@ -140,25 +140,28 @@ class EntityOllamaWorkflow: def get_summary_from_history(self) -> Optional[str]: return self.conversation_summary - # Helper method used in prompts to Ollama + # Helper: generate text of the entire conversation so far def format_history(self) -> str: return " ".join(f"{text}" for _, text in self.conversation_history) - # Create the prompt given to Ollama for each conversational turn - def prompt_with_history(self, prompt: str) -> str: + # Return (context_instructions, prompt) + def prompt_with_history(self, prompt: str) -> tuple[str, str]: history_string = self.format_history() - return ( - f"Here is the conversation history: {history_string} Please add " - + "a few sentence response to the prompt in plain text sentences. " - + "Don't editorialize or add metadata like response. Keep the " - + f"text a plain explanation based on the history. Prompt: {prompt}" + context_instructions = ( + f"Here is the conversation history: {history_string} " + "Please add a few sentence response in plain text sentences. " + "Don't editorialize or add metadata. " + "Keep the text a plain explanation based on the history." ) + return (context_instructions, prompt) - # Create the prompt to Ollama to summarize the conversation history - def prompt_summary_from_history(self) -> str: + # Return (context_instructions, prompt) for summarizing the conversation + def prompt_summary_with_history(self) -> tuple[str, str]: history_string = self.format_history() - return ( - "Here is the conversation history between a user and a chatbot: " - + f"{history_string} -- Please produce a two sentence summary of " - + "this conversation." + context_instructions = ( + f"Here is the conversation history between a user and a chatbot: {history_string}" ) + actual_prompt = ( + "Please produce a two sentence summary of this conversation." + ) + return (context_instructions, actual_prompt)