Files
temporal-ai-agent/tools/fin/move_money.py
2025-04-21 09:56:58 -04:00

129 lines
5.2 KiB
Python

import os
from pathlib import Path
import json
from temporalio.client import Client
from dataclasses import dataclass
from typing import Optional
import asyncio
from temporalio.exceptions import WorkflowAlreadyStartedError
from shared.config import get_temporal_client
from enum import Enum, auto
#enums for the java enum
# class ExecutionScenarios(Enum):
# HAPPY_PATH = 0
# ADVANCED_VISIBILITY = auto() # 1
# HUMAN_IN_LOOP = auto() # 2
# API_DOWNTIME = auto() # 3
# BUG_IN_WORKFLOW = auto() # 4
# INVALID_ACCOUNT = auto() # 5
# these dataclasses are for calling the Temporal Workflow
# Python equivalent of the workflow we're calling's Java WorkflowParameterObj
@dataclass
class MoneyMovementWorkflowParameterObj:
amount: int # Using snake_case as per Python conventions
scenario: str
# this is made to demonstrate functionality but it could just as durably be an API call
# this assumes it's a valid account - use check_account_valid() to verify that first
async def move_money(args: dict) -> dict:
account_key = args.get("email_address_or_account_ID")
account_type: str = args.get("accounttype")
amount = args.get("amount")
destinationaccount = args.get("destinationaccount")
file_path = Path(__file__).resolve().parent.parent / "data" / "customer_account_data.json"
if not file_path.exists():
return {"error": "Data file not found."}
# todo validate there's enough money in the account
with open(file_path, "r") as file:
data = json.load(file)
account_list = data["accounts"]
for account in account_list:
if account["email"] == account_key or account["account_id"] == account_key:
amount_str: str = str(amount) # LLM+python gets sassy about types but we need it to be str
from_account_combo = account_key + account_type
transfer_workflow_id = await start_workflow(amount_cents=str_dollars_to_cents(amount_str),from_account_name=from_account_combo, to_account_name=destinationaccount)
account_type_key = 'checking_balance'
if(account_type.casefold() == "checking" ):
account_type = "checking"
account_type_key = 'checking_balance'
elif(account_type.casefold() == "savings" ):
account_type = "savings"
account_type_key = 'savings_balance'
else:
raise NotImplementedError("money order for account types other than checking or savings is not implemented.")
new_balance: float = float(str_dollars_to_cents(str(account[account_type_key])))
new_balance = new_balance - float(str_dollars_to_cents(amount_str))
account[account_type_key] = str(new_balance / 100 ) #to dollars
with open(file_path, 'w') as file:
json.dump(data, file, indent=4)
return {'status': "money movement complete", 'confirmation id': transfer_workflow_id, 'new_balance': account[account_type_key]}
return_msg = "Account not found with for " + account_key
return {"error": return_msg}
# Async function to start workflow
async def start_workflow(amount_cents: int, from_account_name: str, to_account_name: str)-> str:
start_real_workflow = os.getenv("FIN_START_REAL_WORKFLOW")
if start_real_workflow is not None and start_real_workflow.lower() == "false":
START_REAL_WORKFLOW = False
else:
START_REAL_WORKFLOW = True
if START_REAL_WORKFLOW:
# Connect to Temporal
client = await get_temporal_client()
# Create the parameter object
params = MoneyMovementWorkflowParameterObj(
amount=amount_cents,
scenario="HAPPY_PATH"
)
workflow_id="TRANSFER-ACCT-" + from_account_name + "-TO-" + to_account_name # business-relevant workflow ID
try:
handle = await client.start_workflow(
"moneyTransferWorkflow", # Workflow name
params, # Workflow parameters
id=workflow_id,
task_queue="MoneyTransferJava" # Task queue name
)
return handle.id
except WorkflowAlreadyStartedError as e:
existing_handle = client.get_workflow_handle(workflow_id=workflow_id)
return existing_handle.id
else:
return "TRANSFER-ACCT-" + from_account_name + "-TO-" + to_account_name + "not-real"
#cleans a string dollar amount description to cents value
def str_dollars_to_cents(dollar_str: str) -> int:
try:
# Remove '$' and any whitespace
cleaned_str = dollar_str.replace('$', '').strip()
# Handle empty string or invalid input
if not cleaned_str:
raise ValueError("Empty amount provided")
# Convert to float and then to cents
amount = float(cleaned_str)
if amount < 0:
raise ValueError("Negative amounts not allowed")
return int(amount * 100)
except ValueError as e:
raise ValueError(f"Invalid dollar amount format: {dollar_str}") from e