From 25cccdb501656b90cce2ad53fb0637d11c4a3354 Mon Sep 17 00:00:00 2001 From: Rik Veenboer Date: Mon, 28 Jul 2025 19:37:52 +0200 Subject: [PATCH] towards batch download --- apps/weather/src/assets.py | 27 +++++++++-- apps/weather/src/definitions.py | 5 +- apps/weather/src/jobs.py | 6 ++- apps/weather/src/partitions.py | 2 + apps/weather/src/schedules.py | 11 +++++ apps/weather/src/sensors.py | 82 +++++++++++++++++++++++++++++---- apps/weather/src/tests.py | 12 ++--- apps/weather/src/utils.py | 36 +++++++++------ 8 files changed, 147 insertions(+), 34 deletions(-) create mode 100644 apps/weather/src/schedules.py diff --git a/apps/weather/src/assets.py b/apps/weather/src/assets.py index b18ec3e..d76ed22 100644 --- a/apps/weather/src/assets.py +++ b/apps/weather/src/assets.py @@ -4,10 +4,14 @@ from typing import Any import requests_cache from config import APP -from partitions import location_partitions_def +from partitions import ( + latitude_partitions_def, + location_partitions_def, + longitude_partitions_def, +) from requests import Request from retry_requests import retry -from utils import parse_coord +from utils import parse_coordinate_str import dagster as dg @@ -22,7 +26,7 @@ asset = partial(dg.asset, key_prefix=APP) def raw_weather(context: dg.AssetExecutionContext) -> Any: """Asset to fetch raw weather data for each location.""" partition_key = context.partition_key - lat, lon = parse_coord(partition_key) + lat, lon = parse_coordinate_str(partition_key) context.log.info( f"Fetching weather data for location ({partition_key}): {lat}, {lon}" ) @@ -96,3 +100,20 @@ def raw_weather(context: dg.AssetExecutionContext) -> Any: "path_suffix": [date_str, time_str], }, ) + + +@asset( + io_manager_key="json_io_manager", + partitions_def=latitude_partitions_def, + name="raw_batch", +) +def raw_weather_batch(context: dg.AssetExecutionContext) -> None: + for partitions_def in [ + location_partitions_def, + latitude_partitions_def, + longitude_partitions_def, + ]: + existing_keys = set( + context.instance.get_dynamic_partitions(partitions_def.name) + ) + ic(partitions_def.name, len(existing_keys), existing_keys) diff --git a/apps/weather/src/definitions.py b/apps/weather/src/definitions.py index c69b971..a451c66 100644 --- a/apps/weather/src/definitions.py +++ b/apps/weather/src/definitions.py @@ -25,5 +25,8 @@ definitions = dg.Definitions( "json_io_manager": JsonIOManager(base_dir=storage_dir), "polars_parquet_io_manager": PolarsParquetIOManager(base_dir=storage_dir), }, - sensors=[sensors.list_locations], + sensors=[sensors.list_locations, sensors.list_latitudes, sensors.retrieve_weather], + schedules=[ + # schedules.raw_weather_schedule + ], ) diff --git a/apps/weather/src/jobs.py b/apps/weather/src/jobs.py index 6b5a3f6..0bad521 100644 --- a/apps/weather/src/jobs.py +++ b/apps/weather/src/jobs.py @@ -3,5 +3,9 @@ import assets import dagster as dg raw_weather_job = dg.define_asset_job( - name="weather_data_job", selection=[assets.raw_weather.key] + name="raw_weather_job", selection=[assets.raw_weather.key] +) + +raw_weather_batch_job = dg.define_asset_job( + name="raw_weather_batch_job", selection=[assets.raw_weather_batch.key] ) diff --git a/apps/weather/src/partitions.py b/apps/weather/src/partitions.py index 2423e91..876664e 100644 --- a/apps/weather/src/partitions.py +++ b/apps/weather/src/partitions.py @@ -1,3 +1,5 @@ import dagster as dg +latitude_partitions_def = dg.DynamicPartitionsDefinition(name="latitudes") +longitude_partitions_def = dg.DynamicPartitionsDefinition(name="longitudes") location_partitions_def = dg.DynamicPartitionsDefinition(name="locations") diff --git a/apps/weather/src/schedules.py b/apps/weather/src/schedules.py new file mode 100644 index 0000000..21f720f --- /dev/null +++ b/apps/weather/src/schedules.py @@ -0,0 +1,11 @@ +# This does not work: +# Tried to build a partitioned schedule from an asset job, but received an invalid partitions definition. The permitted partitions definitions are: +# 1. TimeWindowPartitionsDefinition +# 2. MultiPartitionsDefinition with a single TimeWindowPartitionsDefinition dimension +# 3. StaticPartitionsDefinition +# Instead, use a sensor to trigger materialization on a schedule +# raw_weather_schedule = dg.build_schedule_from_partitioned_job( +# job=raw_weather_job, +# # cron_schedule="0 * * * *", +# default_status=dg.DefaultScheduleStatus.RUNNING, +# ) diff --git a/apps/weather/src/sensors.py b/apps/weather/src/sensors.py index d0d7219..911b59f 100644 --- a/apps/weather/src/sensors.py +++ b/apps/weather/src/sensors.py @@ -1,25 +1,40 @@ import jobs import numpy as np -from partitions import location_partitions_def -from utils import format_coord +from partitions import ( + latitude_partitions_def, + location_partitions_def, + longitude_partitions_def, +) +from utils import coordinate_to_str, latitude_to_str, longitude_to_str import dagster as dg +lat_range = np.arange(51.0, 53.01, 0.25, dtype=float) +lon_range = np.arange(3.0, 7.01, 0.25, dtype=float) + @dg.sensor(job=jobs.raw_weather_job) def list_locations(context: dg.SensorEvaluationContext) -> dg.SensorResult: """Sensor that emits RunRequests for new 0.25-degree grid locations not yet seen as partitions.""" - existing_keys = set( + + existing_latitudes = set( + context.instance.get_dynamic_partitions(location_partitions_def.name) + ) + existing_longitudes = set( + context.instance.get_dynamic_partitions(location_partitions_def.name) + ) + existing_coordinates = set( context.instance.get_dynamic_partitions(location_partitions_def.name) ) - lat_range = np.arange(51.0, 53.01, 0.25, dtype=float) - lon_range = np.arange(3.0, 7.01, 0.25, dtype=float) - - locations = [format_coord(lat, lon) for lat in lat_range for lon in lon_range] + latitudes = [latitude_to_str(lat) for lat in lat_range] + longitudes = [longitude_to_str(lon) for lon in lon_range] + locations = [coordinate_to_str(lat, lon) for lat in lat_range for lon in lon_range] + new_latitudes = [lat for lat in latitudes if lat not in existing_latitudes] + new_longitudes = [lon for lon in longitudes if lon not in existing_longitudes] new_locations = [ - location for location in locations if location not in existing_keys + location for location in locations if location not in existing_coordinates ] if new_locations: context.log.info(f"Discovered {len(new_locations)} new locations.") @@ -27,8 +42,55 @@ def list_locations(context: dg.SensorEvaluationContext) -> dg.SensorResult: # Limit to 3 new locations selected = new_locations[:3] return dg.SensorResult( - run_requests=[dg.RunRequest(partition_key=loc) for loc in selected], + run_requests=[], # dg.RunRequest(partition_key=location) for location in locations], dynamic_partitions_requests=[ - location_partitions_def.build_add_request(selected) + location_partitions_def.build_add_request(selected), + latitude_partitions_def.build_add_request(new_latitudes), + longitude_partitions_def.build_add_request(new_longitudes), ], ) + + +@dg.sensor(job=jobs.raw_weather_batch_job) +def list_latitudes(context: dg.SensorEvaluationContext) -> dg.SensorResult: + existing_latitudes = set( + context.instance.get_dynamic_partitions(location_partitions_def.name) + ) + latitudes = [latitude_to_str(lat) for lat in lat_range] + new_latitudes = [lat for lat in latitudes if lat not in existing_latitudes] + return dg.SensorResult( + run_requests=[ + dg.RunRequest(partition_key=partition_key) + for partition_key in new_latitudes + ], + dynamic_partitions_requests=[ + latitude_partitions_def.build_add_request(new_latitudes) + ], + ) + + +@dg.sensor(job=jobs.raw_weather_job, minimum_interval_seconds=60 * 60) +def retrieve_weather(context: dg.SensorEvaluationContext) -> dg.SensorResult: + """ + Retrieve weather sensor function. + + This function monitors and retrieves weather data by evaluating the current dynamic + partitions and triggering run requests for each key in the partitions. The function + is executed as a sensor with a defined minimum interval. + + Args: + - context (dg.SensorEvaluationContext): The context provided by the sensor framework, + allowing access to the instance for retrieving dynamic partitions. + + Returns: + The result of the sensor's evaluation, containing run requests for each existing partition key. + """ + existing_keys = set( + context.instance.get_dynamic_partitions(location_partitions_def.name) + ) + return dg.SensorResult( + run_requests=[ + dg.RunRequest(partition_key=partition_key) + for partition_key in existing_keys + ] + ) diff --git a/apps/weather/src/tests.py b/apps/weather/src/tests.py index e1cb04e..c394c77 100644 --- a/apps/weather/src/tests.py +++ b/apps/weather/src/tests.py @@ -1,6 +1,6 @@ import unittest -from utils import format_coord, parse_coord +from utils import coordinate_to_str, parse_coordinate_str class TestCoordinateFormatting(unittest.TestCase): @@ -15,20 +15,20 @@ class TestCoordinateFormatting(unittest.TestCase): ] for lat, lon, expected in cases: with self.subTest(lat=lat, lon=lon): - self.assertEqual(format_coord(lat, lon), expected) - parsed_lat, parsed_lon = parse_coord(expected) + self.assertEqual(coordinate_to_str(lat, lon), expected) + parsed_lat, parsed_lon = parse_coordinate_str(expected) print(f"Parsed: {parsed_lat}, {parsed_lon} from {expected}") self.assertAlmostEqual(parsed_lat, lat, places=6) self.assertAlmostEqual(parsed_lon, lon, places=6) def test_invalid_length(self): with self.assertRaises(ValueError): - parse_coord("N52E4") # too short, malformed + parse_coordinate_str("N52E4") # too short, malformed def test_negative_coordinates(self): - coord = format_coord(-52.25, -4.0) + coord = coordinate_to_str(-52.25, -4.0) self.assertEqual(coord, "S5225W0400") - lat, lon = parse_coord(coord) + lat, lon = parse_coordinate_str(coord) self.assertAlmostEqual(lat, -52.25, places=6) self.assertAlmostEqual(lon, -4.0, places=6) diff --git a/apps/weather/src/utils.py b/apps/weather/src/utils.py index a94faaa..d7ea020 100644 --- a/apps/weather/src/utils.py +++ b/apps/weather/src/utils.py @@ -1,23 +1,33 @@ import re -def format_coord(latitude: float, longitude: float) -> str: +def component_to_str(value: float, pos_letter: str, neg_letter: str) -> str: + """Convert a coordinate value to a string with a hemisphere indicator.""" + hemi = pos_letter if value >= 0 else neg_letter + abs_val = abs(value) + degrees = int(abs_val) + fraction = int(round((abs_val - degrees) * 100)) # .25 becomes 25 + return f"{hemi}{degrees:02d}{fraction:02d}" + + +def latitude_to_str(latitude: float) -> str: + """Convert a latitude value to a string with a hemisphere indicator.""" + return component_to_str(latitude, "N", "S") + + +def longitude_to_str(latitude: float) -> str: + """Convert a longitude value to a string with a hemisphere indicator.""" + return component_to_str(latitude, "E", "W") + + +def coordinate_to_str(latitude: float, longitude: float) -> str: """Format latitude and longitude into a string with hemisphere indicators.""" - - def to_str(value: float, pos_letter: str, neg_letter: str) -> str: - """Convert a coordinate value to a string with hemisphere indicator.""" - hemi = pos_letter if value >= 0 else neg_letter - abs_val = abs(value) - degrees = int(abs_val) - fraction = int(round((abs_val - degrees) * 100)) # .25 becomes 25 - return f"{hemi}{degrees:02d}{fraction:02d}" - - lat_str = to_str(latitude, "N", "S") - lon_str = to_str(longitude, "E", "W") + lat_str = latitude_to_str(latitude) + lon_str = longitude_to_str(longitude) return f"{lat_str}{lon_str}" -def parse_coord(coord: str) -> tuple[float, float]: +def parse_coordinate_str(coord: str) -> tuple[float, float]: """ Parse a formatted coordinate string (e.g. 'N5225E0040' or 'S1350W12200') back into (latitude, longitude) float values.