From 4a44d4c3cd0ca174a2c137feb7988910a9bf8f83 Mon Sep 17 00:00:00 2001 From: Rik Veenboer Date: Mon, 4 Aug 2025 15:15:53 +0200 Subject: [PATCH] use batch weather fetcher --- apps/weather/src/assets.py | 189 ++++++++++++++++++++------------ apps/weather/src/definitions.py | 9 +- apps/weather/src/jobs.py | 10 +- apps/weather/src/sensors.py | 24 +++- 4 files changed, 153 insertions(+), 79 deletions(-) diff --git a/apps/weather/src/assets.py b/apps/weather/src/assets.py index d76ed22..72ab6b2 100644 --- a/apps/weather/src/assets.py +++ b/apps/weather/src/assets.py @@ -18,6 +18,67 @@ import dagster as dg asset = partial(dg.asset, key_prefix=APP) +class WeatherFetcher: + def __init__(self) -> None: + # Setup the Open-Meteo API client with cache and retry on error + self.cache_session = requests_cache.CachedSession( + "/cache/open-meteo", expire_after=3600 + ) + self.retry_session = retry(self.cache_session, retries=5, backoff_factor=0.2) + # openmeteo = openmeteo_requests.Client(session=retry_session) + + def fetch(self, latitude: float, longitude: float) -> Any: + url = "https://api.open-meteo.com/v1/forecast" + params = { + "latitude": latitude, + "longitude": longitude, + "hourly": [ + "temperature_2m", + "shortwave_radiation", + "shortwave_radiation_instant", + "direct_radiation", + "rain", + "precipitation", + "showers", + "weather_code", + "pressure_msl", + "surface_pressure", + "wind_speed_80m", + "wind_direction_80m", + "wind_gusts_10m", + "temperature_80m", + "wind_direction_10m", + "wind_speed_10m", + "surface_temperature", + ], + "models": "gfs_global", + "current": [ + "wind_speed_10m", + "wind_gusts_10m", + "temperature_2m", + "relative_humidity_2m", + "apparent_temperature", + "precipitation", + "rain", + "snowfall", + ], + "forecast_days": 16, + } + + request = self.retry_session.prepare_request(Request("GET", url, params=params)) + cache_key = requests_cache.create_key(request) + response = self.retry_session.send(request) + response.raise_for_status() + + data = response.json() + if "error" in data: + requests_cache.delete(cache_key) + raise ValueError( + f"Error (data['error']) fetching weather data: {data.get('reason', '')}" + ) + return data + + @asset( io_manager_key="json_io_manager", partitions_def=location_partitions_def, @@ -26,69 +87,13 @@ asset = partial(dg.asset, key_prefix=APP) def raw_weather(context: dg.AssetExecutionContext) -> Any: """Asset to fetch raw weather data for each location.""" partition_key = context.partition_key - lat, lon = parse_coordinate_str(partition_key) + latitude, longitude = parse_coordinate_str(partition_key) context.log.info( - f"Fetching weather data for location ({partition_key}): {lat}, {lon}" + f"Fetching weather data for location ({partition_key}): {latitude}, {longitude}" ) - # Setup the Open-Meteo API client with cache and retry on error - cache_session = requests_cache.CachedSession("/cache/open-meteo", expire_after=3600) - retry_session = retry(cache_session, retries=5, backoff_factor=0.2) - # openmeteo = openmeteo_requests.Client(session=retry_session) - - # Make sure all required weather variables are listed here - # The order of variables in hourly or daily is important to assign them correctly below - url = "https://api.open-meteo.com/v1/forecast" - params = { - "latitude": 52.5, - "longitude": 4.5, - "hourly": [ - "temperature_2m", - "shortwave_radiation", - "shortwave_radiation_instant", - "direct_radiation", - "rain", - "precipitation", - "showers", - "weather_code", - "pressure_msl", - "surface_pressure", - "wind_speed_80m", - "wind_direction_80m", - "wind_gusts_10m", - "temperature_80m", - "wind_direction_10m", - "wind_speed_10m", - "surface_temperature", - ], - "models": "gfs_global", - "current": [ - "wind_speed_10m", - "wind_gusts_10m", - "temperature_2m", - "relative_humidity_2m", - "apparent_temperature", - "precipitation", - "rain", - "snowfall", - ], - "forecast_days": 16, - } - # responses = openmeteo.weather_api(url, params=params) - # print(len(responses)) - # print(responses) - - request = retry_session.prepare_request(Request("GET", url, params=params)) - cache_key = requests_cache.create_key(request) - response = retry_session.send(request) - response.raise_for_status() - - data = response.json() - if "error" in data: - requests_cache.delete(cache_key) - raise ValueError( - f"Error (data['error']) fetching weather data: {data.get('reason', '')}" - ) + fetcher = WeatherFetcher() + data = fetcher.fetch(latitude=latitude, longitude=longitude) now = datetime.now(tz=timezone.utc) date_str = now.strftime("%Y-%m-%d") @@ -97,23 +102,65 @@ def raw_weather(context: dg.AssetExecutionContext) -> Any: data, metadata={ "date": dg.MetadataValue.timestamp(now), + "latitude": dg.MetadataValue.float(latitude), + "longitude": dg.MetadataValue.float(longitude), "path_suffix": [date_str, time_str], }, ) +@asset( + partitions_def=longitude_partitions_def, +) +def raw_weather_batch_longitude() -> None: + raise NotImplementedError + + @asset( io_manager_key="json_io_manager", partitions_def=latitude_partitions_def, - name="raw_batch", ) -def raw_weather_batch(context: dg.AssetExecutionContext) -> None: - for partitions_def in [ - location_partitions_def, - latitude_partitions_def, - longitude_partitions_def, - ]: - existing_keys = set( - context.instance.get_dynamic_partitions(partitions_def.name) - ) - ic(partitions_def.name, len(existing_keys), existing_keys) +def raw_weather_batch_latitude(context: dg.AssetExecutionContext): + ic(context.resources._asdict().keys()) # contains json_io_manager + ic(context.partition_key) + + ic(context.op.outputs()) + latitude_str = context.partition_key + + locations = context.instance.get_dynamic_partitions(location_partitions_def.name) + ic(locations) + + longitudes = context.instance.get_dynamic_partitions(longitude_partitions_def.name) + ic(longitudes) + + for longitude_str in longitudes: + location = f"{latitude_str}{longitude_str}" + if location in locations: + context.log.info("Fetching weather data for location %s", location) + + fetcher = WeatherFetcher() + latitude, longitude = parse_coordinate_str(location) + data = fetcher.fetch(latitude=latitude, longitude=longitude) + + now = datetime.now(tz=timezone.utc) + date_str = now.strftime("%Y-%m-%d") + time_str = now.strftime("%H:%M:%S") + + output_context = dg.build_output_context( + asset_key=dg.AssetKey( + [APP, "raw", date_str, latitude_str, longitude_str, time_str] + ), + resources=context.resources.original_resource_dict, + ) + context.resources.json_io_manager.handle_output(output_context, data) + context.log_event( + dg.AssetMaterialization( + asset_key=raw_weather.key, + partition=location, + metadata={ + "date": dg.MetadataValue.timestamp(now), + "latitude": dg.MetadataValue.float(latitude), + "longitude": dg.MetadataValue.float(longitude), + }, + ) + ) diff --git a/apps/weather/src/definitions.py b/apps/weather/src/definitions.py index 15e0f74..2b3eb3e 100644 --- a/apps/weather/src/definitions.py +++ b/apps/weather/src/definitions.py @@ -21,8 +21,11 @@ definitions = dg.Definitions( "json_io_manager": JsonIOManager(base_dir=STORAGE_DIR), "polars_parquet_io_manager": PolarsParquetIOManager(base_dir=STORAGE_DIR), }, - sensors=[sensors.list_locations, sensors.list_latitudes, sensors.retrieve_weather], - schedules=[ - # schedules.raw_weather_schedule + sensors=[ + sensors.list_locations, + sensors.list_latitudes, + sensors.list_longitudes, + sensors.retrieve_weather, + sensors.retrieve_weather, ], ) diff --git a/apps/weather/src/jobs.py b/apps/weather/src/jobs.py index 0bad521..ff2b2a6 100644 --- a/apps/weather/src/jobs.py +++ b/apps/weather/src/jobs.py @@ -6,6 +6,12 @@ raw_weather_job = dg.define_asset_job( name="raw_weather_job", selection=[assets.raw_weather.key] ) -raw_weather_batch_job = dg.define_asset_job( - name="raw_weather_batch_job", selection=[assets.raw_weather_batch.key] +raw_weather_batch_latitude_job = dg.define_asset_job( + name="raw_weather_batch_latitude_job", + selection=[assets.raw_weather_batch_latitude.key], +) + +raw_weather_batch_longitude_job = dg.define_asset_job( + name="raw_weather_batch_longitude_job", + selection=[assets.raw_weather_batch_longitude.key], ) diff --git a/apps/weather/src/sensors.py b/apps/weather/src/sensors.py index 911b59f..84f2cdf 100644 --- a/apps/weather/src/sensors.py +++ b/apps/weather/src/sensors.py @@ -51,7 +51,7 @@ def list_locations(context: dg.SensorEvaluationContext) -> dg.SensorResult: ) -@dg.sensor(job=jobs.raw_weather_batch_job) +@dg.sensor(job=jobs.raw_weather_batch_latitude_job) def list_latitudes(context: dg.SensorEvaluationContext) -> dg.SensorResult: existing_latitudes = set( context.instance.get_dynamic_partitions(location_partitions_def.name) @@ -69,7 +69,25 @@ def list_latitudes(context: dg.SensorEvaluationContext) -> dg.SensorResult: ) -@dg.sensor(job=jobs.raw_weather_job, minimum_interval_seconds=60 * 60) +@dg.sensor(job=jobs.raw_weather_batch_longitude_job) +def list_longitudes(context: dg.SensorEvaluationContext) -> dg.SensorResult: + existing_longitudes = set( + context.instance.get_dynamic_partitions(location_partitions_def.name) + ) + longitudes = [longitude_to_str(lon) for lon in lon_range] + new_longitudes = [lon for lon in longitudes if lon not in existing_longitudes] + return dg.SensorResult( + run_requests=[ + dg.RunRequest(partition_key=partition_key) + for partition_key in new_longitudes + ], + dynamic_partitions_requests=[ + longitude_partitions_def.build_add_request(new_longitudes) + ], + ) + + +@dg.sensor(job=jobs.raw_weather_batch_latitude_job, minimum_interval_seconds=60 * 60) def retrieve_weather(context: dg.SensorEvaluationContext) -> dg.SensorResult: """ Retrieve weather sensor function. @@ -86,7 +104,7 @@ def retrieve_weather(context: dg.SensorEvaluationContext) -> dg.SensorResult: The result of the sensor's evaluation, containing run requests for each existing partition key. """ existing_keys = set( - context.instance.get_dynamic_partitions(location_partitions_def.name) + context.instance.get_dynamic_partitions(latitude_partitions_def.name) ) return dg.SensorResult( run_requests=[