diff --git a/apps/tesla/src/assets.py b/apps/tesla/src/assets.py index 0a8f3da..ab76c3f 100644 --- a/apps/tesla/src/assets.py +++ b/apps/tesla/src/assets.py @@ -1,5 +1,7 @@ +from datetime import datetime, timezone from functools import partial +import requests from shared.config import APP import dagster as dg @@ -7,5 +9,27 @@ import dagster as dg asset = partial(dg.asset, key_prefix=APP) -@asset -def raw_nearby_charging_sites() -> None: ... +@asset(io_manager_key="json_io_manager") +def raw_nearby_charging_sites() -> dg.Output[str]: + # base_url = 'https://owner-api.teslamotors.com' + + # Proxy that handles authentication + base_url = "http://192.168.2.200:8000" + + # Hardcoded vehicle ID (for now) + vid = "929784335744220" + + response = requests.get(f"{base_url}/api/1/vehicles/{vid}/nearby_charging_sites") + response.raise_for_status() + + now = datetime.now(tz=timezone.utc) + date_str = now.strftime("%Y-%m-%d") + time_str = now.strftime("%H:%M:%S") + + return dg.Output( + response.text, + metadata={ + "date": dg.MetadataValue.timestamp(now), + "path_suffix": [date_str, time_str], + }, + ) diff --git a/apps/tesla/src/definitions.py b/apps/tesla/src/definitions.py index d4c7751..56dbd3b 100644 --- a/apps/tesla/src/definitions.py +++ b/apps/tesla/src/definitions.py @@ -3,6 +3,7 @@ import schedules from dagster_polars import PolarsParquetIOManager from icecream import install from shared.config import APP, STORAGE_DIR +from shared.io_manager import JsonIOManager import dagster as dg from dagster import load_assets_from_modules @@ -18,6 +19,7 @@ definitions = dg.Definitions( for asset in load_assets_from_modules([assets]) ], resources={ + "json_io_manager": JsonIOManager(base_dir=STORAGE_DIR), "polars_parquet_io_manager": PolarsParquetIOManager(base_dir=STORAGE_DIR), }, schedules=[schedules.raw_schedule], diff --git a/apps/tesla/src/schedules.py b/apps/tesla/src/schedules.py index 9822fdf..e3459ca 100644 --- a/apps/tesla/src/schedules.py +++ b/apps/tesla/src/schedules.py @@ -3,13 +3,7 @@ import assets import dagster as dg raw_schedule = dg.ScheduleDefinition( - name="daily_refresh", + name="raw_nearby_charging_sites", cron_schedule="*/10 * * * *", target=[assets.raw_nearby_charging_sites], ) - -# dg.build_schedule_from_partitioned_job( -# job=raw_nearby_charging_sites_job, -# hour_of_day=9, -# default_status=dg.DefaultScheduleStatus.STOPPED, -# )