first version of weather loader

This commit is contained in:
2025-07-27 14:54:19 +02:00
parent d3d3911609
commit 131912b70d
9 changed files with 309 additions and 7 deletions

View File

@@ -13,6 +13,10 @@ anyio==4.9.0
# watchfiles
asttokens==3.0.0
# via icecream
attrs==25.3.0
# via
# cattrs
# requests-cache
backoff==2.2.1
# via gql
beautifulsoup4==4.13.4
@@ -26,10 +30,14 @@ botocore==1.39.14
# boto3
# s3fs
# s3transfer
cattrs==25.1.1
# via requests-cache
certifi==2025.7.14
# via requests
charset-normalizer==3.4.2
# via requests
# via
# niquests
# requests
click==8.1.8
# via
# dagster
@@ -104,6 +112,8 @@ fastparquet==2024.11.0
# via dev (pyproject.toml)
filelock==3.18.0
# via dagster
flatbuffers==25.2.10
# via openmeteo-sdk
fonttools==4.59.0
# via matplotlib
fsspec==2025.7.0
@@ -133,7 +143,9 @@ grpcio==1.74.0
grpcio-health-checking==1.71.2
# via dagster
h11==0.16.0
# via uvicorn
# via
# urllib3-future
# uvicorn
httptools==0.6.4
# via uvicorn
humanfriendly==10.0
@@ -145,7 +157,10 @@ idna==3.10
# anyio
# email-validator
# requests
# url-normalize
# yarl
jh2==5.0.9
# via urllib3-future
jinja2==3.1.6
# via dagster
jmespath==1.0.1
@@ -170,6 +185,8 @@ mdurl==0.1.2
# via markdown-it-py
multidict==6.6.3
# via yarl
niquests==3.14.1
# via openmeteo-requests
numpy==2.3.2
# via
# contourpy
@@ -177,6 +194,10 @@ numpy==2.3.2
# matplotlib
# pandas
# seaborn
openmeteo-requests==1.5.0
# via dev (pyproject.toml)
openmeteo-sdk==1.20.1
# via openmeteo-requests
openpyxl==3.1.5
# via dev (pyproject.toml)
packaging==25.0
@@ -197,6 +218,8 @@ patito==0.8.3
# dagster-polars
pillow==11.3.0
# via matplotlib
platformdirs==4.3.8
# via requests-cache
polars==1.31.0
# via
# dagster-polars
@@ -251,6 +274,8 @@ pyyaml==6.0.2
# dev (pyproject.toml)
# dagster-shared
# uvicorn
qh3==1.5.3
# via urllib3-future
regex==2024.11.6
# via docker-image-py
requests==2.32.4
@@ -261,9 +286,15 @@ requests==2.32.4
# dagster-graphql
# docker
# gql
# requests-cache
# requests-toolbelt
# retry-requests
requests-cache==1.2.1
# via dev (pyproject.toml)
requests-toolbelt==1.0.0
# via gql
retry-requests==2.0.0
# via dev (pyproject.toml)
rich==14.1.0
# via dagster
s3fs==0.4.2
@@ -311,6 +342,7 @@ typing-extensions==4.14.1
# alembic
# anyio
# beautifulsoup4
# cattrs
# dagster-polars
# dagster-shared
# graphene
@@ -330,15 +362,23 @@ universal-pathlib==0.2.6
# via
# dagster
# dagster-polars
url-normalize==2.2.1
# via requests-cache
urllib3==2.5.0
# via
# botocore
# docker
# requests
# requests-cache
# retry-requests
urllib3-future==2.13.900
# via niquests
uvicorn==0.35.0
# via dagster-webserver
uvloop==0.21.0
# via uvicorn
wassima==1.2.2
# via niquests
watchdog==5.0.3
# via dagster
watchfiles==1.1.0

View File

@@ -0,0 +1,73 @@
from typing import Any
import requests_cache
from partitions import location_partitions_def
from retry_requests import retry
from utils import parse_coord
import dagster as dg
@dg.asset(
io_manager_key="json_io_manager",
partitions_def=location_partitions_def,
)
def raw_weather(context: dg.AssetExecutionContext) -> Any:
"""Asset to fetch raw weather data for each location."""
partition_key = context.partition_key
lat, lon = parse_coord(partition_key)
context.log.info(
f"Fetching weather data for location ({partition_key}): {lat}, {lon}"
)
# Setup the Open-Meteo API client with cache and retry on error
cache_session = requests_cache.CachedSession("/cache/open-meteo", expire_after=3600)
retry_session = retry(cache_session, retries=5, backoff_factor=0.2)
# openmeteo = openmeteo_requests.Client(session=retry_session)
# Make sure all required weather variables are listed here
# The order of variables in hourly or daily is important to assign them correctly below
url = "https://api.open-meteo.com/v1/forecast"
params = {
"latitude": 52.5,
"longitude": 4.5,
"hourly": [
"temperature_2m",
"shortwave_radiation",
"shortwave_radiation_instant",
"direct_radiation",
"rain",
"precipitation",
"showers",
"weather_code",
"pressure_msl",
"surface_pressure",
"wind_speed_80m",
"wind_direction_80m",
"wind_gusts_10m",
"temperature_80m",
"wind_direction_10m",
"wind_speed_10m",
"surface_temperature",
],
"models": "gfs_global",
"current": [
"wind_speed_10m",
"wind_gusts_10m",
"temperature_2m",
"relative_humidity_2m",
"apparent_temperature",
"precipitation",
"rain",
"snowfall",
],
"forecast_days": 16,
}
# responses = openmeteo.weather_api(url, params=params)
# print(len(responses))
# print(responses)
response = retry_session.get(url, params=params)
response.raise_for_status()
response.json()
return response.json()

View File

@@ -1,27 +1,30 @@
import os
import assets
import sensors
from dagster_polars import PolarsParquetIOManager
from icecream import install
from resoures import JsonIOManager
import dagster as dg
from dagster import load_assets_from_modules
install()
APP = os.environ["APP"]
storage_dir = os.environ.get("STORAGE_DIR", "/storage") + f"/{APP}"
definitions = dg.Definitions(
assets=[
asset.with_attributes(
group_names_by_key={asset.key: APP},
tags_by_key={asset.key: {"app": APP}},
)
for asset in load_assets_from_modules([assets])
for asset in dg.load_assets_from_modules([assets])
],
resources={
"polars_parquet_io_manager": PolarsParquetIOManager(
base_dir=os.environ.get("STORAGE_DIR", "/storage") + f"/{APP}"
),
"json_io_manager": JsonIOManager(base_dir=storage_dir),
"polars_parquet_io_manager": PolarsParquetIOManager(base_dir=storage_dir),
},
sensors=[sensors.list_locations],
)

7
apps/weather/src/jobs.py Normal file
View File

@@ -0,0 +1,7 @@
import assets
import dagster as dg
raw_weather_job = dg.define_asset_job(
name="weather_data_job", selection=[assets.raw_weather.key]
)

View File

@@ -0,0 +1,3 @@
import dagster as dg
location_partitions_def = dg.DynamicPartitionsDefinition(name="locations")

View File

@@ -0,0 +1,65 @@
import json
from typing import TYPE_CHECKING, Any
from pydantic import Field, PrivateAttr
import dagster as dg
if TYPE_CHECKING:
from upath import UPath
def _process_env_vars(config: dict[str, Any]) -> dict[str, Any]:
out = {}
for key, value in config.items():
if (
isinstance(value, dict)
and len(value) == 1
and next(iter(value.keys())) == "env"
):
out[key] = dg.EnvVar(next(iter(value.values()))).get_value()
else:
out[key] = value
return out
class JsonIOManager(dg.ConfigurableIOManager, dg.UPathIOManager):
"""
An I/O manager that stores data as JSON files. It inherits from
UPathIOManager, which handles path creation for local/cloud storage.
"""
extension: str = ".json" # Automatically adds a .json extension
base_dir: str | None = Field(
default=None, description="Base directory for storing files."
)
cloud_storage_options: dict[str, Any] | None = Field(
default=None,
description="Storage authentication for cloud object store",
alias="storage_options",
)
_base_path = PrivateAttr()
def setup_for_execution(self, context: dg.InitResourceContext) -> None:
from upath import UPath
sp = (
_process_env_vars(self.cloud_storage_options)
if self.cloud_storage_options is not None
else {}
)
self._base_path = (
UPath(self.base_dir, **sp)
if self.base_dir is not None
else UPath(dg._check.not_none(context.instance).storage_directory())
)
def dump_to_path(self, context: dg.OutputContext, obj: object, path: "UPath"):
"""This saves the output of an asset to a file path."""
with path.open("w") as fp:
json.dump(obj, fp, indent=4)
def load_from_path(self, context: dg.InputContext, path: "UPath") -> object:
"""This loads an input for a downstream asset from a file path."""
with path.open("r") as fp:
return json.load(fp)

View File

@@ -0,0 +1,34 @@
import jobs
import numpy as np
from partitions import location_partitions_def
from utils import format_coord
import dagster as dg
@dg.sensor(job=jobs.raw_weather_job)
def list_locations(context: dg.SensorEvaluationContext) -> dg.SensorResult:
"""Sensor that emits RunRequests for new 0.25-degree grid locations not yet seen as partitions."""
existing_keys = set(
context.instance.get_dynamic_partitions(location_partitions_def.name)
)
lat_range = np.arange(51.0, 53.01, 0.25, dtype=float)
lon_range = np.arange(3.0, 7.01, 0.25, dtype=float)
locations = [format_coord(lat, lon) for lat in lat_range for lon in lon_range]
new_locations = [
location for location in locations if location not in existing_keys
]
if new_locations:
context.log.info(f"Discovered {len(new_locations)} new locations.")
# Limit to 3 new locations
selected = new_locations[:3]
return dg.SensorResult(
run_requests=[dg.RunRequest(partition_key=loc) for loc in selected],
dynamic_partitions_requests=[
location_partitions_def.build_add_request(selected)
],
)

37
apps/weather/src/tests.py Normal file
View File

@@ -0,0 +1,37 @@
import unittest
from utils import format_coord, parse_coord
class TestCoordinateFormatting(unittest.TestCase):
def test_round_trip(self):
cases = [
(52.25, 4.0, "N5225E0400"),
(-13.5, -122.0, "S1350W12200"),
(0.0, 0.0, "N0000E0000"),
(51.75, 6.5, "N5175E0650"),
(-0.25, 0.25, "S0025E0025"),
]
for lat, lon, expected in cases:
with self.subTest(lat=lat, lon=lon):
self.assertEqual(format_coord(lat, lon), expected)
parsed_lat, parsed_lon = parse_coord(expected)
print(f"Parsed: {parsed_lat}, {parsed_lon} from {expected}")
self.assertAlmostEqual(parsed_lat, lat, places=6)
self.assertAlmostEqual(parsed_lon, lon, places=6)
def test_invalid_length(self):
with self.assertRaises(ValueError):
parse_coord("N52E4") # too short, malformed
def test_negative_coordinates(self):
coord = format_coord(-52.25, -4.0)
self.assertEqual(coord, "S5225W0400")
lat, lon = parse_coord(coord)
self.assertAlmostEqual(lat, -52.25, places=6)
self.assertAlmostEqual(lon, -4.0, places=6)
if __name__ == "__main__":
unittest.main()

40
apps/weather/src/utils.py Normal file
View File

@@ -0,0 +1,40 @@
import re
def format_coord(latitude: float, longitude: float) -> str:
"""Format latitude and longitude into a string with hemisphere indicators."""
def to_str(value: float, pos_letter: str, neg_letter: str) -> str:
"""Convert a coordinate value to a string with hemisphere indicator."""
hemi = pos_letter if value >= 0 else neg_letter
abs_val = abs(value)
degrees = int(abs_val)
fraction = int(round((abs_val - degrees) * 100)) # .25 becomes 25
return f"{hemi}{degrees:02d}{fraction:02d}"
lat_str = to_str(latitude, "N", "S")
lon_str = to_str(longitude, "E", "W")
return f"{lat_str}{lon_str}"
def parse_coord(coord: str) -> tuple[float, float]:
"""
Parse a formatted coordinate string (e.g. 'N5225E0040' or 'S1350W12200')
back into (latitude, longitude) float values.
"""
# Match hemisphere + degrees + fractional part
match = re.match(r"^([NS])(\d{2,3})(\d{2})([EW])(\d{2,3})(\d{2})$", coord)
if not match:
raise ValueError(f"Invalid coordinate format: {coord}")
lat_hemi, lat_deg, lat_frac, lon_hemi, lon_deg, lon_frac = match.groups()
lat = int(lat_deg) + int(lat_frac) / 100.0
lon = int(lon_deg) + int(lon_frac) / 100.0
if lat_hemi == "S":
lat = -lat
if lon_hemi == "W":
lon = -lon
return lat, lon