diff --git a/apps/weather/src/assets.py b/apps/weather/src/assets.py index 72ab6b2..3d4f5ad 100644 --- a/apps/weather/src/assets.py +++ b/apps/weather/src/assets.py @@ -1,10 +1,14 @@ +from collections.abc import Iterator from datetime import datetime, timezone from functools import partial +from pathlib import Path from typing import Any +import polars as pl import requests_cache from config import APP from partitions import ( + daily_partitions_def, latitude_partitions_def, location_partitions_def, longitude_partitions_def, @@ -120,7 +124,7 @@ def raw_weather_batch_longitude() -> None: io_manager_key="json_io_manager", partitions_def=latitude_partitions_def, ) -def raw_weather_batch_latitude(context: dg.AssetExecutionContext): +def raw_weather_batch_latitude(context: dg.AssetExecutionContext) -> None: ic(context.resources._asdict().keys()) # contains json_io_manager ic(context.partition_key) @@ -164,3 +168,40 @@ def raw_weather_batch_latitude(context: dg.AssetExecutionContext): }, ) ) + + +@asset( + name="parsed", + deps=[raw_weather], + io_manager_key="polars_parquet_io_manager", + partitions_def=daily_partitions_def, + output_required=False, +) +def parsed_weather( + context: dg.AssetExecutionContext, +) -> Iterator[dg.Output[pl.DataFrame]]: + base = ( + Path(context.resources.polars_parquet_io_manager.base_dir).joinpath( + *raw_weather.key.path + ) + / context.partition_key + ) + + dfs = [] + ic(base) + for path in Path(base).rglob("*.json"): + df = pl.read_json(path) + df = df.select( + *( + pl.lit(v).alias(k) + for k, v in zip( + ("_latitude", "_longitude", "_time"), + path.relative_to(base).with_suffix("").parts, + ) + ), + *df.columns, + ) + dfs.append(df) + + if dfs: + yield dg.Output(pl.concat(dfs)) diff --git a/apps/weather/src/partitions.py b/apps/weather/src/partitions.py index 876664e..3efac76 100644 --- a/apps/weather/src/partitions.py +++ b/apps/weather/src/partitions.py @@ -1,5 +1,10 @@ +import os + import dagster as dg latitude_partitions_def = dg.DynamicPartitionsDefinition(name="latitudes") longitude_partitions_def = dg.DynamicPartitionsDefinition(name="longitudes") location_partitions_def = dg.DynamicPartitionsDefinition(name="locations") +daily_partitions_def = dg.DailyPartitionsDefinition( + start_date="2025-08-01", end_offset=1, timezone=os.environ.get("TZ", "UTC") +)