36 lines
903 B
Python
36 lines
903 B
Python
from datetime import datetime, timezone
|
|
from functools import partial
|
|
|
|
import requests
|
|
from shared.config import APP
|
|
|
|
import dagster as dg
|
|
|
|
asset = partial(dg.asset, key_prefix=APP)
|
|
|
|
|
|
@asset(io_manager_key="json_io_manager")
|
|
def raw_nearby_charging_sites() -> dg.Output[str]:
|
|
# base_url = 'https://owner-api.teslamotors.com'
|
|
|
|
# Proxy that handles authentication
|
|
base_url = "http://192.168.2.200:8000"
|
|
|
|
# Hardcoded vehicle ID (for now)
|
|
vid = "929784335744220"
|
|
|
|
response = requests.get(f"{base_url}/api/1/vehicles/{vid}/nearby_charging_sites")
|
|
response.raise_for_status()
|
|
|
|
now = datetime.now(tz=timezone.utc)
|
|
date_str = now.strftime("%Y-%m-%d")
|
|
time_str = now.strftime("%H:%M:%S")
|
|
|
|
return dg.Output(
|
|
response.text,
|
|
metadata={
|
|
"date": dg.MetadataValue.timestamp(now),
|
|
"path_suffix": [date_str, time_str],
|
|
},
|
|
)
|