from collections.abc import Iterator from datetime import datetime, timezone from functools import partial from pathlib import Path from typing import Any import polars as pl import requests_cache from config import APP from partitions import ( daily_partitions_def, latitude_partitions_def, location_partitions_def, longitude_partitions_def, ) from requests import Request from retry_requests import retry from utils import parse_coordinate_str import dagster as dg asset = partial(dg.asset, key_prefix=APP) class WeatherFetcher: def __init__(self) -> None: # Setup the Open-Meteo API client with cache and retry on error self.cache_session = requests_cache.CachedSession( "/cache/open-meteo", expire_after=3600 ) self.retry_session = retry(self.cache_session, retries=5, backoff_factor=0.2) # openmeteo = openmeteo_requests.Client(session=retry_session) def fetch(self, latitude: float, longitude: float) -> Any: url = "https://api.open-meteo.com/v1/forecast" params = { "latitude": latitude, "longitude": longitude, "hourly": [ "temperature_2m", "shortwave_radiation", "shortwave_radiation_instant", "direct_radiation", "rain", "precipitation", "showers", "weather_code", "pressure_msl", "surface_pressure", "wind_speed_80m", "wind_direction_80m", "wind_gusts_10m", "temperature_80m", "wind_direction_10m", "wind_speed_10m", "surface_temperature", ], "models": "gfs_global", "current": [ "wind_speed_10m", "wind_gusts_10m", "temperature_2m", "relative_humidity_2m", "apparent_temperature", "precipitation", "rain", "snowfall", ], "forecast_days": 16, } request = self.retry_session.prepare_request(Request("GET", url, params=params)) cache_key = requests_cache.create_key(request) response = self.retry_session.send(request) response.raise_for_status() data = response.json() if "error" in data: requests_cache.delete(cache_key) raise ValueError( f"Error (data['error']) fetching weather data: {data.get('reason', '')}" ) return data @asset( io_manager_key="json_io_manager", partitions_def=location_partitions_def, name="raw", ) def raw_weather(context: dg.AssetExecutionContext) -> Any: """Asset to fetch raw weather data for each location.""" partition_key = context.partition_key latitude, longitude = parse_coordinate_str(partition_key) context.log.info( f"Fetching weather data for location ({partition_key}): {latitude}, {longitude}" ) fetcher = WeatherFetcher() data = fetcher.fetch(latitude=latitude, longitude=longitude) now = datetime.now(tz=timezone.utc) date_str = now.strftime("%Y-%m-%d") time_str = now.strftime("%H:%M:%S") yield dg.Output( data, metadata={ "date": dg.MetadataValue.timestamp(now), "latitude": dg.MetadataValue.float(latitude), "longitude": dg.MetadataValue.float(longitude), "path_suffix": [date_str, time_str], }, ) @asset( partitions_def=longitude_partitions_def, ) def raw_weather_batch_longitude() -> None: raise NotImplementedError @asset( io_manager_key="json_io_manager", partitions_def=latitude_partitions_def, ) def raw_weather_batch_latitude(context: dg.AssetExecutionContext) -> None: ic(context.resources._asdict().keys()) # contains json_io_manager ic(context.partition_key) ic(context.op.outputs()) latitude_str = context.partition_key locations = context.instance.get_dynamic_partitions(location_partitions_def.name) ic(locations) longitudes = context.instance.get_dynamic_partitions(longitude_partitions_def.name) ic(longitudes) for longitude_str in longitudes: location = f"{latitude_str}{longitude_str}" if location in locations: context.log.info("Fetching weather data for location %s", location) fetcher = WeatherFetcher() latitude, longitude = parse_coordinate_str(location) data = fetcher.fetch(latitude=latitude, longitude=longitude) now = datetime.now(tz=timezone.utc) date_str = now.strftime("%Y-%m-%d") time_str = now.strftime("%H:%M:%S") output_context = dg.build_output_context( asset_key=dg.AssetKey( [APP, "raw", date_str, latitude_str, longitude_str, time_str] ), resources=context.resources.original_resource_dict, ) context.resources.json_io_manager.handle_output(output_context, data) context.log_event( dg.AssetMaterialization( asset_key=raw_weather.key, partition=location, metadata={ "date": dg.MetadataValue.timestamp(now), "latitude": dg.MetadataValue.float(latitude), "longitude": dg.MetadataValue.float(longitude), }, ) ) @asset( name="parsed", deps=[raw_weather], io_manager_key="polars_parquet_io_manager", partitions_def=daily_partitions_def, output_required=False, ) def parsed_weather( context: dg.AssetExecutionContext, ) -> Iterator[dg.Output[pl.DataFrame]]: base = ( Path(context.resources.polars_parquet_io_manager.base_dir).joinpath( *raw_weather.key.path ) / context.partition_key ) dfs = [] ic(base) for path in Path(base).rglob("*.json"): df = pl.read_json(path) df = df.select( *( pl.lit(v).alias(k) for k, v in zip( ("_latitude", "_longitude", "_time"), path.relative_to(base).with_suffix("").parts, ) ), *df.columns, ) dfs.append(df) if dfs: yield dg.Output(pl.concat(dfs))