use batch weather fetcher

This commit is contained in:
2025-08-04 15:15:53 +02:00
parent d6737cc327
commit 4a44d4c3cd
4 changed files with 153 additions and 79 deletions

View File

@@ -18,6 +18,67 @@ import dagster as dg
asset = partial(dg.asset, key_prefix=APP) asset = partial(dg.asset, key_prefix=APP)
class WeatherFetcher:
def __init__(self) -> None:
# Setup the Open-Meteo API client with cache and retry on error
self.cache_session = requests_cache.CachedSession(
"/cache/open-meteo", expire_after=3600
)
self.retry_session = retry(self.cache_session, retries=5, backoff_factor=0.2)
# openmeteo = openmeteo_requests.Client(session=retry_session)
def fetch(self, latitude: float, longitude: float) -> Any:
url = "https://api.open-meteo.com/v1/forecast"
params = {
"latitude": latitude,
"longitude": longitude,
"hourly": [
"temperature_2m",
"shortwave_radiation",
"shortwave_radiation_instant",
"direct_radiation",
"rain",
"precipitation",
"showers",
"weather_code",
"pressure_msl",
"surface_pressure",
"wind_speed_80m",
"wind_direction_80m",
"wind_gusts_10m",
"temperature_80m",
"wind_direction_10m",
"wind_speed_10m",
"surface_temperature",
],
"models": "gfs_global",
"current": [
"wind_speed_10m",
"wind_gusts_10m",
"temperature_2m",
"relative_humidity_2m",
"apparent_temperature",
"precipitation",
"rain",
"snowfall",
],
"forecast_days": 16,
}
request = self.retry_session.prepare_request(Request("GET", url, params=params))
cache_key = requests_cache.create_key(request)
response = self.retry_session.send(request)
response.raise_for_status()
data = response.json()
if "error" in data:
requests_cache.delete(cache_key)
raise ValueError(
f"Error (data['error']) fetching weather data: {data.get('reason', '')}"
)
return data
@asset( @asset(
io_manager_key="json_io_manager", io_manager_key="json_io_manager",
partitions_def=location_partitions_def, partitions_def=location_partitions_def,
@@ -26,69 +87,13 @@ asset = partial(dg.asset, key_prefix=APP)
def raw_weather(context: dg.AssetExecutionContext) -> Any: def raw_weather(context: dg.AssetExecutionContext) -> Any:
"""Asset to fetch raw weather data for each location.""" """Asset to fetch raw weather data for each location."""
partition_key = context.partition_key partition_key = context.partition_key
lat, lon = parse_coordinate_str(partition_key) latitude, longitude = parse_coordinate_str(partition_key)
context.log.info( context.log.info(
f"Fetching weather data for location ({partition_key}): {lat}, {lon}" f"Fetching weather data for location ({partition_key}): {latitude}, {longitude}"
) )
# Setup the Open-Meteo API client with cache and retry on error fetcher = WeatherFetcher()
cache_session = requests_cache.CachedSession("/cache/open-meteo", expire_after=3600) data = fetcher.fetch(latitude=latitude, longitude=longitude)
retry_session = retry(cache_session, retries=5, backoff_factor=0.2)
# openmeteo = openmeteo_requests.Client(session=retry_session)
# Make sure all required weather variables are listed here
# The order of variables in hourly or daily is important to assign them correctly below
url = "https://api.open-meteo.com/v1/forecast"
params = {
"latitude": 52.5,
"longitude": 4.5,
"hourly": [
"temperature_2m",
"shortwave_radiation",
"shortwave_radiation_instant",
"direct_radiation",
"rain",
"precipitation",
"showers",
"weather_code",
"pressure_msl",
"surface_pressure",
"wind_speed_80m",
"wind_direction_80m",
"wind_gusts_10m",
"temperature_80m",
"wind_direction_10m",
"wind_speed_10m",
"surface_temperature",
],
"models": "gfs_global",
"current": [
"wind_speed_10m",
"wind_gusts_10m",
"temperature_2m",
"relative_humidity_2m",
"apparent_temperature",
"precipitation",
"rain",
"snowfall",
],
"forecast_days": 16,
}
# responses = openmeteo.weather_api(url, params=params)
# print(len(responses))
# print(responses)
request = retry_session.prepare_request(Request("GET", url, params=params))
cache_key = requests_cache.create_key(request)
response = retry_session.send(request)
response.raise_for_status()
data = response.json()
if "error" in data:
requests_cache.delete(cache_key)
raise ValueError(
f"Error (data['error']) fetching weather data: {data.get('reason', '')}"
)
now = datetime.now(tz=timezone.utc) now = datetime.now(tz=timezone.utc)
date_str = now.strftime("%Y-%m-%d") date_str = now.strftime("%Y-%m-%d")
@@ -97,23 +102,65 @@ def raw_weather(context: dg.AssetExecutionContext) -> Any:
data, data,
metadata={ metadata={
"date": dg.MetadataValue.timestamp(now), "date": dg.MetadataValue.timestamp(now),
"latitude": dg.MetadataValue.float(latitude),
"longitude": dg.MetadataValue.float(longitude),
"path_suffix": [date_str, time_str], "path_suffix": [date_str, time_str],
}, },
) )
@asset(
partitions_def=longitude_partitions_def,
)
def raw_weather_batch_longitude() -> None:
raise NotImplementedError
@asset( @asset(
io_manager_key="json_io_manager", io_manager_key="json_io_manager",
partitions_def=latitude_partitions_def, partitions_def=latitude_partitions_def,
name="raw_batch",
) )
def raw_weather_batch(context: dg.AssetExecutionContext) -> None: def raw_weather_batch_latitude(context: dg.AssetExecutionContext):
for partitions_def in [ ic(context.resources._asdict().keys()) # contains json_io_manager
location_partitions_def, ic(context.partition_key)
latitude_partitions_def,
longitude_partitions_def, ic(context.op.outputs())
]: latitude_str = context.partition_key
existing_keys = set(
context.instance.get_dynamic_partitions(partitions_def.name) locations = context.instance.get_dynamic_partitions(location_partitions_def.name)
) ic(locations)
ic(partitions_def.name, len(existing_keys), existing_keys)
longitudes = context.instance.get_dynamic_partitions(longitude_partitions_def.name)
ic(longitudes)
for longitude_str in longitudes:
location = f"{latitude_str}{longitude_str}"
if location in locations:
context.log.info("Fetching weather data for location %s", location)
fetcher = WeatherFetcher()
latitude, longitude = parse_coordinate_str(location)
data = fetcher.fetch(latitude=latitude, longitude=longitude)
now = datetime.now(tz=timezone.utc)
date_str = now.strftime("%Y-%m-%d")
time_str = now.strftime("%H:%M:%S")
output_context = dg.build_output_context(
asset_key=dg.AssetKey(
[APP, "raw", date_str, latitude_str, longitude_str, time_str]
),
resources=context.resources.original_resource_dict,
)
context.resources.json_io_manager.handle_output(output_context, data)
context.log_event(
dg.AssetMaterialization(
asset_key=raw_weather.key,
partition=location,
metadata={
"date": dg.MetadataValue.timestamp(now),
"latitude": dg.MetadataValue.float(latitude),
"longitude": dg.MetadataValue.float(longitude),
},
)
)

View File

@@ -21,8 +21,11 @@ definitions = dg.Definitions(
"json_io_manager": JsonIOManager(base_dir=STORAGE_DIR), "json_io_manager": JsonIOManager(base_dir=STORAGE_DIR),
"polars_parquet_io_manager": PolarsParquetIOManager(base_dir=STORAGE_DIR), "polars_parquet_io_manager": PolarsParquetIOManager(base_dir=STORAGE_DIR),
}, },
sensors=[sensors.list_locations, sensors.list_latitudes, sensors.retrieve_weather], sensors=[
schedules=[ sensors.list_locations,
# schedules.raw_weather_schedule sensors.list_latitudes,
sensors.list_longitudes,
sensors.retrieve_weather,
sensors.retrieve_weather,
], ],
) )

View File

@@ -6,6 +6,12 @@ raw_weather_job = dg.define_asset_job(
name="raw_weather_job", selection=[assets.raw_weather.key] name="raw_weather_job", selection=[assets.raw_weather.key]
) )
raw_weather_batch_job = dg.define_asset_job( raw_weather_batch_latitude_job = dg.define_asset_job(
name="raw_weather_batch_job", selection=[assets.raw_weather_batch.key] name="raw_weather_batch_latitude_job",
selection=[assets.raw_weather_batch_latitude.key],
)
raw_weather_batch_longitude_job = dg.define_asset_job(
name="raw_weather_batch_longitude_job",
selection=[assets.raw_weather_batch_longitude.key],
) )

View File

@@ -51,7 +51,7 @@ def list_locations(context: dg.SensorEvaluationContext) -> dg.SensorResult:
) )
@dg.sensor(job=jobs.raw_weather_batch_job) @dg.sensor(job=jobs.raw_weather_batch_latitude_job)
def list_latitudes(context: dg.SensorEvaluationContext) -> dg.SensorResult: def list_latitudes(context: dg.SensorEvaluationContext) -> dg.SensorResult:
existing_latitudes = set( existing_latitudes = set(
context.instance.get_dynamic_partitions(location_partitions_def.name) context.instance.get_dynamic_partitions(location_partitions_def.name)
@@ -69,7 +69,25 @@ def list_latitudes(context: dg.SensorEvaluationContext) -> dg.SensorResult:
) )
@dg.sensor(job=jobs.raw_weather_job, minimum_interval_seconds=60 * 60) @dg.sensor(job=jobs.raw_weather_batch_longitude_job)
def list_longitudes(context: dg.SensorEvaluationContext) -> dg.SensorResult:
existing_longitudes = set(
context.instance.get_dynamic_partitions(location_partitions_def.name)
)
longitudes = [longitude_to_str(lon) for lon in lon_range]
new_longitudes = [lon for lon in longitudes if lon not in existing_longitudes]
return dg.SensorResult(
run_requests=[
dg.RunRequest(partition_key=partition_key)
for partition_key in new_longitudes
],
dynamic_partitions_requests=[
longitude_partitions_def.build_add_request(new_longitudes)
],
)
@dg.sensor(job=jobs.raw_weather_batch_latitude_job, minimum_interval_seconds=60 * 60)
def retrieve_weather(context: dg.SensorEvaluationContext) -> dg.SensorResult: def retrieve_weather(context: dg.SensorEvaluationContext) -> dg.SensorResult:
""" """
Retrieve weather sensor function. Retrieve weather sensor function.
@@ -86,7 +104,7 @@ def retrieve_weather(context: dg.SensorEvaluationContext) -> dg.SensorResult:
The result of the sensor's evaluation, containing run requests for each existing partition key. The result of the sensor's evaluation, containing run requests for each existing partition key.
""" """
existing_keys = set( existing_keys = set(
context.instance.get_dynamic_partitions(location_partitions_def.name) context.instance.get_dynamic_partitions(latitude_partitions_def.name)
) )
return dg.SensorResult( return dg.SensorResult(
run_requests=[ run_requests=[