towards batch download

This commit is contained in:
2025-07-28 19:37:52 +02:00
parent 78de29e930
commit 25cccdb501
8 changed files with 147 additions and 34 deletions

View File

@@ -4,10 +4,14 @@ from typing import Any
import requests_cache import requests_cache
from config import APP from config import APP
from partitions import location_partitions_def from partitions import (
latitude_partitions_def,
location_partitions_def,
longitude_partitions_def,
)
from requests import Request from requests import Request
from retry_requests import retry from retry_requests import retry
from utils import parse_coord from utils import parse_coordinate_str
import dagster as dg import dagster as dg
@@ -22,7 +26,7 @@ asset = partial(dg.asset, key_prefix=APP)
def raw_weather(context: dg.AssetExecutionContext) -> Any: def raw_weather(context: dg.AssetExecutionContext) -> Any:
"""Asset to fetch raw weather data for each location.""" """Asset to fetch raw weather data for each location."""
partition_key = context.partition_key partition_key = context.partition_key
lat, lon = parse_coord(partition_key) lat, lon = parse_coordinate_str(partition_key)
context.log.info( context.log.info(
f"Fetching weather data for location ({partition_key}): {lat}, {lon}" f"Fetching weather data for location ({partition_key}): {lat}, {lon}"
) )
@@ -96,3 +100,20 @@ def raw_weather(context: dg.AssetExecutionContext) -> Any:
"path_suffix": [date_str, time_str], "path_suffix": [date_str, time_str],
}, },
) )
@asset(
io_manager_key="json_io_manager",
partitions_def=latitude_partitions_def,
name="raw_batch",
)
def raw_weather_batch(context: dg.AssetExecutionContext) -> None:
for partitions_def in [
location_partitions_def,
latitude_partitions_def,
longitude_partitions_def,
]:
existing_keys = set(
context.instance.get_dynamic_partitions(partitions_def.name)
)
ic(partitions_def.name, len(existing_keys), existing_keys)

View File

@@ -25,5 +25,8 @@ definitions = dg.Definitions(
"json_io_manager": JsonIOManager(base_dir=storage_dir), "json_io_manager": JsonIOManager(base_dir=storage_dir),
"polars_parquet_io_manager": PolarsParquetIOManager(base_dir=storage_dir), "polars_parquet_io_manager": PolarsParquetIOManager(base_dir=storage_dir),
}, },
sensors=[sensors.list_locations], sensors=[sensors.list_locations, sensors.list_latitudes, sensors.retrieve_weather],
schedules=[
# schedules.raw_weather_schedule
],
) )

View File

@@ -3,5 +3,9 @@ import assets
import dagster as dg import dagster as dg
raw_weather_job = dg.define_asset_job( raw_weather_job = dg.define_asset_job(
name="weather_data_job", selection=[assets.raw_weather.key] name="raw_weather_job", selection=[assets.raw_weather.key]
)
raw_weather_batch_job = dg.define_asset_job(
name="raw_weather_batch_job", selection=[assets.raw_weather_batch.key]
) )

View File

@@ -1,3 +1,5 @@
import dagster as dg import dagster as dg
latitude_partitions_def = dg.DynamicPartitionsDefinition(name="latitudes")
longitude_partitions_def = dg.DynamicPartitionsDefinition(name="longitudes")
location_partitions_def = dg.DynamicPartitionsDefinition(name="locations") location_partitions_def = dg.DynamicPartitionsDefinition(name="locations")

View File

@@ -0,0 +1,11 @@
# This does not work:
# Tried to build a partitioned schedule from an asset job, but received an invalid partitions definition. The permitted partitions definitions are:
# 1. TimeWindowPartitionsDefinition
# 2. MultiPartitionsDefinition with a single TimeWindowPartitionsDefinition dimension
# 3. StaticPartitionsDefinition
# Instead, use a sensor to trigger materialization on a schedule
# raw_weather_schedule = dg.build_schedule_from_partitioned_job(
# job=raw_weather_job,
# # cron_schedule="0 * * * *",
# default_status=dg.DefaultScheduleStatus.RUNNING,
# )

View File

@@ -1,25 +1,40 @@
import jobs import jobs
import numpy as np import numpy as np
from partitions import location_partitions_def from partitions import (
from utils import format_coord latitude_partitions_def,
location_partitions_def,
longitude_partitions_def,
)
from utils import coordinate_to_str, latitude_to_str, longitude_to_str
import dagster as dg import dagster as dg
lat_range = np.arange(51.0, 53.01, 0.25, dtype=float)
lon_range = np.arange(3.0, 7.01, 0.25, dtype=float)
@dg.sensor(job=jobs.raw_weather_job) @dg.sensor(job=jobs.raw_weather_job)
def list_locations(context: dg.SensorEvaluationContext) -> dg.SensorResult: def list_locations(context: dg.SensorEvaluationContext) -> dg.SensorResult:
"""Sensor that emits RunRequests for new 0.25-degree grid locations not yet seen as partitions.""" """Sensor that emits RunRequests for new 0.25-degree grid locations not yet seen as partitions."""
existing_keys = set(
existing_latitudes = set(
context.instance.get_dynamic_partitions(location_partitions_def.name)
)
existing_longitudes = set(
context.instance.get_dynamic_partitions(location_partitions_def.name)
)
existing_coordinates = set(
context.instance.get_dynamic_partitions(location_partitions_def.name) context.instance.get_dynamic_partitions(location_partitions_def.name)
) )
lat_range = np.arange(51.0, 53.01, 0.25, dtype=float) latitudes = [latitude_to_str(lat) for lat in lat_range]
lon_range = np.arange(3.0, 7.01, 0.25, dtype=float) longitudes = [longitude_to_str(lon) for lon in lon_range]
locations = [coordinate_to_str(lat, lon) for lat in lat_range for lon in lon_range]
locations = [format_coord(lat, lon) for lat in lat_range for lon in lon_range]
new_latitudes = [lat for lat in latitudes if lat not in existing_latitudes]
new_longitudes = [lon for lon in longitudes if lon not in existing_longitudes]
new_locations = [ new_locations = [
location for location in locations if location not in existing_keys location for location in locations if location not in existing_coordinates
] ]
if new_locations: if new_locations:
context.log.info(f"Discovered {len(new_locations)} new locations.") context.log.info(f"Discovered {len(new_locations)} new locations.")
@@ -27,8 +42,55 @@ def list_locations(context: dg.SensorEvaluationContext) -> dg.SensorResult:
# Limit to 3 new locations # Limit to 3 new locations
selected = new_locations[:3] selected = new_locations[:3]
return dg.SensorResult( return dg.SensorResult(
run_requests=[dg.RunRequest(partition_key=loc) for loc in selected], run_requests=[], # dg.RunRequest(partition_key=location) for location in locations],
dynamic_partitions_requests=[ dynamic_partitions_requests=[
location_partitions_def.build_add_request(selected) location_partitions_def.build_add_request(selected),
latitude_partitions_def.build_add_request(new_latitudes),
longitude_partitions_def.build_add_request(new_longitudes),
], ],
) )
@dg.sensor(job=jobs.raw_weather_batch_job)
def list_latitudes(context: dg.SensorEvaluationContext) -> dg.SensorResult:
existing_latitudes = set(
context.instance.get_dynamic_partitions(location_partitions_def.name)
)
latitudes = [latitude_to_str(lat) for lat in lat_range]
new_latitudes = [lat for lat in latitudes if lat not in existing_latitudes]
return dg.SensorResult(
run_requests=[
dg.RunRequest(partition_key=partition_key)
for partition_key in new_latitudes
],
dynamic_partitions_requests=[
latitude_partitions_def.build_add_request(new_latitudes)
],
)
@dg.sensor(job=jobs.raw_weather_job, minimum_interval_seconds=60 * 60)
def retrieve_weather(context: dg.SensorEvaluationContext) -> dg.SensorResult:
"""
Retrieve weather sensor function.
This function monitors and retrieves weather data by evaluating the current dynamic
partitions and triggering run requests for each key in the partitions. The function
is executed as a sensor with a defined minimum interval.
Args:
- context (dg.SensorEvaluationContext): The context provided by the sensor framework,
allowing access to the instance for retrieving dynamic partitions.
Returns:
The result of the sensor's evaluation, containing run requests for each existing partition key.
"""
existing_keys = set(
context.instance.get_dynamic_partitions(location_partitions_def.name)
)
return dg.SensorResult(
run_requests=[
dg.RunRequest(partition_key=partition_key)
for partition_key in existing_keys
]
)

View File

@@ -1,6 +1,6 @@
import unittest import unittest
from utils import format_coord, parse_coord from utils import coordinate_to_str, parse_coordinate_str
class TestCoordinateFormatting(unittest.TestCase): class TestCoordinateFormatting(unittest.TestCase):
@@ -15,20 +15,20 @@ class TestCoordinateFormatting(unittest.TestCase):
] ]
for lat, lon, expected in cases: for lat, lon, expected in cases:
with self.subTest(lat=lat, lon=lon): with self.subTest(lat=lat, lon=lon):
self.assertEqual(format_coord(lat, lon), expected) self.assertEqual(coordinate_to_str(lat, lon), expected)
parsed_lat, parsed_lon = parse_coord(expected) parsed_lat, parsed_lon = parse_coordinate_str(expected)
print(f"Parsed: {parsed_lat}, {parsed_lon} from {expected}") print(f"Parsed: {parsed_lat}, {parsed_lon} from {expected}")
self.assertAlmostEqual(parsed_lat, lat, places=6) self.assertAlmostEqual(parsed_lat, lat, places=6)
self.assertAlmostEqual(parsed_lon, lon, places=6) self.assertAlmostEqual(parsed_lon, lon, places=6)
def test_invalid_length(self): def test_invalid_length(self):
with self.assertRaises(ValueError): with self.assertRaises(ValueError):
parse_coord("N52E4") # too short, malformed parse_coordinate_str("N52E4") # too short, malformed
def test_negative_coordinates(self): def test_negative_coordinates(self):
coord = format_coord(-52.25, -4.0) coord = coordinate_to_str(-52.25, -4.0)
self.assertEqual(coord, "S5225W0400") self.assertEqual(coord, "S5225W0400")
lat, lon = parse_coord(coord) lat, lon = parse_coordinate_str(coord)
self.assertAlmostEqual(lat, -52.25, places=6) self.assertAlmostEqual(lat, -52.25, places=6)
self.assertAlmostEqual(lon, -4.0, places=6) self.assertAlmostEqual(lon, -4.0, places=6)

View File

@@ -1,23 +1,33 @@
import re import re
def format_coord(latitude: float, longitude: float) -> str: def component_to_str(value: float, pos_letter: str, neg_letter: str) -> str:
"""Convert a coordinate value to a string with a hemisphere indicator."""
hemi = pos_letter if value >= 0 else neg_letter
abs_val = abs(value)
degrees = int(abs_val)
fraction = int(round((abs_val - degrees) * 100)) # .25 becomes 25
return f"{hemi}{degrees:02d}{fraction:02d}"
def latitude_to_str(latitude: float) -> str:
"""Convert a latitude value to a string with a hemisphere indicator."""
return component_to_str(latitude, "N", "S")
def longitude_to_str(latitude: float) -> str:
"""Convert a longitude value to a string with a hemisphere indicator."""
return component_to_str(latitude, "E", "W")
def coordinate_to_str(latitude: float, longitude: float) -> str:
"""Format latitude and longitude into a string with hemisphere indicators.""" """Format latitude and longitude into a string with hemisphere indicators."""
lat_str = latitude_to_str(latitude)
def to_str(value: float, pos_letter: str, neg_letter: str) -> str: lon_str = longitude_to_str(longitude)
"""Convert a coordinate value to a string with hemisphere indicator."""
hemi = pos_letter if value >= 0 else neg_letter
abs_val = abs(value)
degrees = int(abs_val)
fraction = int(round((abs_val - degrees) * 100)) # .25 becomes 25
return f"{hemi}{degrees:02d}{fraction:02d}"
lat_str = to_str(latitude, "N", "S")
lon_str = to_str(longitude, "E", "W")
return f"{lat_str}{lon_str}" return f"{lat_str}{lon_str}"
def parse_coord(coord: str) -> tuple[float, float]: def parse_coordinate_str(coord: str) -> tuple[float, float]:
""" """
Parse a formatted coordinate string (e.g. 'N5225E0040' or 'S1350W12200') Parse a formatted coordinate string (e.g. 'N5225E0040' or 'S1350W12200')
back into (latitude, longitude) float values. back into (latitude, longitude) float values.