mirror of
https://github.com/temporal-community/temporal-ai-agent.git
synced 2026-03-15 14:08:08 +01:00
133 lines
5.2 KiB
Python
133 lines
5.2 KiB
Python
import os
|
|
from pathlib import Path
|
|
import json
|
|
from temporalio.client import Client
|
|
from dataclasses import dataclass
|
|
from typing import Optional
|
|
import asyncio
|
|
from temporalio.exceptions import WorkflowAlreadyStartedError
|
|
from shared.config import get_temporal_client
|
|
|
|
|
|
from enum import Enum, auto
|
|
|
|
#enums for the java enum
|
|
# class ExecutionScenarios(Enum):
|
|
# HAPPY_PATH = 0
|
|
# ADVANCED_VISIBILITY = auto() # 1
|
|
# HUMAN_IN_LOOP = auto() # 2
|
|
# API_DOWNTIME = auto() # 3
|
|
# BUG_IN_WORKFLOW = auto() # 4
|
|
# INVALID_ACCOUNT = auto() # 5
|
|
|
|
# these dataclasses are for calling the Temporal Workflow
|
|
# Python equivalent of the workflow we're calling's Java WorkflowParameterObj
|
|
@dataclass
|
|
class MoneyMovementWorkflowParameterObj:
|
|
amount: int # Using snake_case as per Python conventions
|
|
scenario: str
|
|
|
|
# this is made to demonstrate functionality but it could just as durably be an API call
|
|
# this assumes it's a valid account - use check_account_valid() to verify that first
|
|
async def move_money(args: dict) -> dict:
|
|
|
|
print("in move_money")
|
|
account_key = args.get("accountkey")
|
|
account_type: str = args.get("accounttype")
|
|
amount = args.get("amount")
|
|
destinationaccount = args.get("destinationaccount")
|
|
|
|
file_path = Path(__file__).resolve().parent.parent / "data" / "customer_account_data.json"
|
|
if not file_path.exists():
|
|
return {"error": "Data file not found."}
|
|
|
|
# todo validate there's enough money in the account
|
|
with open(file_path, "r") as file:
|
|
data = json.load(file)
|
|
account_list = data["accounts"]
|
|
|
|
for account in account_list:
|
|
if account["email"] == account_key or account["account_id"] == account_key:
|
|
amount_str: str = str(amount) # LLM+python gets sassy about types but we need it to be str
|
|
from_account_combo = account_key + account_type
|
|
|
|
transfer_workflow_id = await start_workflow(amount_cents=str_dollars_to_cents(amount_str),from_account_name=from_account_combo, to_account_name=destinationaccount)
|
|
|
|
account_type_key = 'checking_balance'
|
|
if(account_type.casefold() == "checking" ):
|
|
account_type = "checking"
|
|
account_type_key = 'checking_balance'
|
|
|
|
elif(account_type.casefold() == "savings" ):
|
|
account_type = "savings"
|
|
account_type_key = 'savings_balance'
|
|
else:
|
|
raise NotImplementedError("money order for account types other than checking or savings is not implemented.")
|
|
|
|
new_balance: float = float(str_dollars_to_cents(str(account[account_type_key])))
|
|
new_balance = new_balance - float(str_dollars_to_cents(amount_str))
|
|
account[account_type_key] = str(new_balance / 100 ) #to dollars
|
|
with open(file_path, 'w') as file:
|
|
json.dump(data, file, indent=4)
|
|
|
|
return {'status': "money movement complete", 'confirmation id': transfer_workflow_id, 'new_balance': account[account_type_key]}
|
|
|
|
return_msg = "Account not found with for " + account_key
|
|
return {"error": return_msg}
|
|
|
|
# Async function to start workflow
|
|
async def start_workflow(amount_cents: int, from_account_name: str, to_account_name: str)-> str:
|
|
|
|
|
|
# Connect to Temporal
|
|
|
|
|
|
client = await get_temporal_client()
|
|
start_real_workflow = os.getenv("FIN_START_REAL_WORKFLOW")
|
|
if start_real_workflow is not None and start_real_workflow.lower() == "false":
|
|
START_REAL_WORKFLOW = False
|
|
else:
|
|
START_REAL_WORKFLOW = True
|
|
|
|
if START_REAL_WORKFLOW:
|
|
# Create the parameter object
|
|
params = MoneyMovementWorkflowParameterObj(
|
|
amount=amount_cents,
|
|
scenario="HAPPY_PATH"
|
|
)
|
|
|
|
workflow_id="TRANSFER-ACCT-" + from_account_name + "-TO-" + to_account_name # business-relevant workflow ID
|
|
|
|
try:
|
|
handle = await client.start_workflow(
|
|
"moneyTransferWorkflow", # Workflow name
|
|
params, # Workflow parameters
|
|
id=workflow_id,
|
|
task_queue="MoneyTransferJava" # Task queue name
|
|
)
|
|
return handle.id
|
|
except WorkflowAlreadyStartedError as e:
|
|
existing_handle = client.get_workflow_handle(workflow_id=workflow_id)
|
|
return existing_handle.id
|
|
else:
|
|
return "TRANSFER-ACCT-" + from_account_name + "-TO-" + to_account_name + "not-real"
|
|
|
|
|
|
#cleans a string dollar amount description to cents value
|
|
def str_dollars_to_cents(dollar_str: str) -> int:
|
|
try:
|
|
# Remove '$' and any whitespace
|
|
cleaned_str = dollar_str.replace('$', '').strip()
|
|
|
|
# Handle empty string or invalid input
|
|
if not cleaned_str:
|
|
raise ValueError("Empty amount provided")
|
|
|
|
# Convert to float and then to cents
|
|
amount = float(cleaned_str)
|
|
if amount < 0:
|
|
raise ValueError("Negative amounts not allowed")
|
|
|
|
return int(amount * 100)
|
|
except ValueError as e:
|
|
raise ValueError(f"Invalid dollar amount format: {dollar_str}") from e |