diff --git a/apps/weather/requirements.txt b/apps/weather/requirements.txt index 25aca50..ae2cc04 100644 --- a/apps/weather/requirements.txt +++ b/apps/weather/requirements.txt @@ -13,6 +13,10 @@ anyio==4.9.0 # watchfiles asttokens==3.0.0 # via icecream +attrs==25.3.0 + # via + # cattrs + # requests-cache backoff==2.2.1 # via gql beautifulsoup4==4.13.4 @@ -26,10 +30,14 @@ botocore==1.39.14 # boto3 # s3fs # s3transfer +cattrs==25.1.1 + # via requests-cache certifi==2025.7.14 # via requests charset-normalizer==3.4.2 - # via requests + # via + # niquests + # requests click==8.1.8 # via # dagster @@ -104,6 +112,8 @@ fastparquet==2024.11.0 # via dev (pyproject.toml) filelock==3.18.0 # via dagster +flatbuffers==25.2.10 + # via openmeteo-sdk fonttools==4.59.0 # via matplotlib fsspec==2025.7.0 @@ -133,7 +143,9 @@ grpcio==1.74.0 grpcio-health-checking==1.71.2 # via dagster h11==0.16.0 - # via uvicorn + # via + # urllib3-future + # uvicorn httptools==0.6.4 # via uvicorn humanfriendly==10.0 @@ -145,7 +157,10 @@ idna==3.10 # anyio # email-validator # requests + # url-normalize # yarl +jh2==5.0.9 + # via urllib3-future jinja2==3.1.6 # via dagster jmespath==1.0.1 @@ -170,6 +185,8 @@ mdurl==0.1.2 # via markdown-it-py multidict==6.6.3 # via yarl +niquests==3.14.1 + # via openmeteo-requests numpy==2.3.2 # via # contourpy @@ -177,6 +194,10 @@ numpy==2.3.2 # matplotlib # pandas # seaborn +openmeteo-requests==1.5.0 + # via dev (pyproject.toml) +openmeteo-sdk==1.20.1 + # via openmeteo-requests openpyxl==3.1.5 # via dev (pyproject.toml) packaging==25.0 @@ -197,6 +218,8 @@ patito==0.8.3 # dagster-polars pillow==11.3.0 # via matplotlib +platformdirs==4.3.8 + # via requests-cache polars==1.31.0 # via # dagster-polars @@ -251,6 +274,8 @@ pyyaml==6.0.2 # dev (pyproject.toml) # dagster-shared # uvicorn +qh3==1.5.3 + # via urllib3-future regex==2024.11.6 # via docker-image-py requests==2.32.4 @@ -261,9 +286,15 @@ requests==2.32.4 # dagster-graphql # docker # gql + # requests-cache # requests-toolbelt + # retry-requests +requests-cache==1.2.1 + # via dev (pyproject.toml) requests-toolbelt==1.0.0 # via gql +retry-requests==2.0.0 + # via dev (pyproject.toml) rich==14.1.0 # via dagster s3fs==0.4.2 @@ -311,6 +342,7 @@ typing-extensions==4.14.1 # alembic # anyio # beautifulsoup4 + # cattrs # dagster-polars # dagster-shared # graphene @@ -330,15 +362,23 @@ universal-pathlib==0.2.6 # via # dagster # dagster-polars +url-normalize==2.2.1 + # via requests-cache urllib3==2.5.0 # via # botocore # docker # requests + # requests-cache + # retry-requests +urllib3-future==2.13.900 + # via niquests uvicorn==0.35.0 # via dagster-webserver uvloop==0.21.0 # via uvicorn +wassima==1.2.2 + # via niquests watchdog==5.0.3 # via dagster watchfiles==1.1.0 diff --git a/apps/weather/src/assets.py b/apps/weather/src/assets.py index e69de29..f3ddbac 100644 --- a/apps/weather/src/assets.py +++ b/apps/weather/src/assets.py @@ -0,0 +1,73 @@ +from typing import Any + +import requests_cache +from partitions import location_partitions_def +from retry_requests import retry +from utils import parse_coord + +import dagster as dg + + +@dg.asset( + io_manager_key="json_io_manager", + partitions_def=location_partitions_def, +) +def raw_weather(context: dg.AssetExecutionContext) -> Any: + """Asset to fetch raw weather data for each location.""" + partition_key = context.partition_key + lat, lon = parse_coord(partition_key) + context.log.info( + f"Fetching weather data for location ({partition_key}): {lat}, {lon}" + ) + + # Setup the Open-Meteo API client with cache and retry on error + cache_session = requests_cache.CachedSession("/cache/open-meteo", expire_after=3600) + retry_session = retry(cache_session, retries=5, backoff_factor=0.2) + # openmeteo = openmeteo_requests.Client(session=retry_session) + + # Make sure all required weather variables are listed here + # The order of variables in hourly or daily is important to assign them correctly below + url = "https://api.open-meteo.com/v1/forecast" + params = { + "latitude": 52.5, + "longitude": 4.5, + "hourly": [ + "temperature_2m", + "shortwave_radiation", + "shortwave_radiation_instant", + "direct_radiation", + "rain", + "precipitation", + "showers", + "weather_code", + "pressure_msl", + "surface_pressure", + "wind_speed_80m", + "wind_direction_80m", + "wind_gusts_10m", + "temperature_80m", + "wind_direction_10m", + "wind_speed_10m", + "surface_temperature", + ], + "models": "gfs_global", + "current": [ + "wind_speed_10m", + "wind_gusts_10m", + "temperature_2m", + "relative_humidity_2m", + "apparent_temperature", + "precipitation", + "rain", + "snowfall", + ], + "forecast_days": 16, + } + # responses = openmeteo.weather_api(url, params=params) + # print(len(responses)) + # print(responses) + + response = retry_session.get(url, params=params) + response.raise_for_status() + response.json() + return response.json() diff --git a/apps/weather/src/definitions.py b/apps/weather/src/definitions.py index 7e0e20b..61e2f52 100644 --- a/apps/weather/src/definitions.py +++ b/apps/weather/src/definitions.py @@ -1,27 +1,30 @@ import os import assets +import sensors from dagster_polars import PolarsParquetIOManager from icecream import install +from resoures import JsonIOManager import dagster as dg -from dagster import load_assets_from_modules install() APP = os.environ["APP"] +storage_dir = os.environ.get("STORAGE_DIR", "/storage") + f"/{APP}" + definitions = dg.Definitions( assets=[ asset.with_attributes( group_names_by_key={asset.key: APP}, tags_by_key={asset.key: {"app": APP}}, ) - for asset in load_assets_from_modules([assets]) + for asset in dg.load_assets_from_modules([assets]) ], resources={ - "polars_parquet_io_manager": PolarsParquetIOManager( - base_dir=os.environ.get("STORAGE_DIR", "/storage") + f"/{APP}" - ), + "json_io_manager": JsonIOManager(base_dir=storage_dir), + "polars_parquet_io_manager": PolarsParquetIOManager(base_dir=storage_dir), }, + sensors=[sensors.list_locations], ) diff --git a/apps/weather/src/jobs.py b/apps/weather/src/jobs.py new file mode 100644 index 0000000..6b5a3f6 --- /dev/null +++ b/apps/weather/src/jobs.py @@ -0,0 +1,7 @@ +import assets + +import dagster as dg + +raw_weather_job = dg.define_asset_job( + name="weather_data_job", selection=[assets.raw_weather.key] +) diff --git a/apps/weather/src/partitions.py b/apps/weather/src/partitions.py new file mode 100644 index 0000000..2423e91 --- /dev/null +++ b/apps/weather/src/partitions.py @@ -0,0 +1,3 @@ +import dagster as dg + +location_partitions_def = dg.DynamicPartitionsDefinition(name="locations") diff --git a/apps/weather/src/resoures.py b/apps/weather/src/resoures.py new file mode 100644 index 0000000..d1fd0cb --- /dev/null +++ b/apps/weather/src/resoures.py @@ -0,0 +1,65 @@ +import json +from typing import TYPE_CHECKING, Any + +from pydantic import Field, PrivateAttr + +import dagster as dg + +if TYPE_CHECKING: + from upath import UPath + + +def _process_env_vars(config: dict[str, Any]) -> dict[str, Any]: + out = {} + for key, value in config.items(): + if ( + isinstance(value, dict) + and len(value) == 1 + and next(iter(value.keys())) == "env" + ): + out[key] = dg.EnvVar(next(iter(value.values()))).get_value() + else: + out[key] = value + return out + + +class JsonIOManager(dg.ConfigurableIOManager, dg.UPathIOManager): + """ + An I/O manager that stores data as JSON files. It inherits from + UPathIOManager, which handles path creation for local/cloud storage. + """ + + extension: str = ".json" # Automatically adds a .json extension + base_dir: str | None = Field( + default=None, description="Base directory for storing files." + ) + cloud_storage_options: dict[str, Any] | None = Field( + default=None, + description="Storage authentication for cloud object store", + alias="storage_options", + ) + _base_path = PrivateAttr() + + def setup_for_execution(self, context: dg.InitResourceContext) -> None: + from upath import UPath + + sp = ( + _process_env_vars(self.cloud_storage_options) + if self.cloud_storage_options is not None + else {} + ) + self._base_path = ( + UPath(self.base_dir, **sp) + if self.base_dir is not None + else UPath(dg._check.not_none(context.instance).storage_directory()) + ) + + def dump_to_path(self, context: dg.OutputContext, obj: object, path: "UPath"): + """This saves the output of an asset to a file path.""" + with path.open("w") as fp: + json.dump(obj, fp, indent=4) + + def load_from_path(self, context: dg.InputContext, path: "UPath") -> object: + """This loads an input for a downstream asset from a file path.""" + with path.open("r") as fp: + return json.load(fp) diff --git a/apps/weather/src/sensors.py b/apps/weather/src/sensors.py new file mode 100644 index 0000000..d0d7219 --- /dev/null +++ b/apps/weather/src/sensors.py @@ -0,0 +1,34 @@ +import jobs +import numpy as np +from partitions import location_partitions_def +from utils import format_coord + +import dagster as dg + + +@dg.sensor(job=jobs.raw_weather_job) +def list_locations(context: dg.SensorEvaluationContext) -> dg.SensorResult: + """Sensor that emits RunRequests for new 0.25-degree grid locations not yet seen as partitions.""" + existing_keys = set( + context.instance.get_dynamic_partitions(location_partitions_def.name) + ) + + lat_range = np.arange(51.0, 53.01, 0.25, dtype=float) + lon_range = np.arange(3.0, 7.01, 0.25, dtype=float) + + locations = [format_coord(lat, lon) for lat in lat_range for lon in lon_range] + + new_locations = [ + location for location in locations if location not in existing_keys + ] + if new_locations: + context.log.info(f"Discovered {len(new_locations)} new locations.") + + # Limit to 3 new locations + selected = new_locations[:3] + return dg.SensorResult( + run_requests=[dg.RunRequest(partition_key=loc) for loc in selected], + dynamic_partitions_requests=[ + location_partitions_def.build_add_request(selected) + ], + ) diff --git a/apps/weather/src/tests.py b/apps/weather/src/tests.py new file mode 100644 index 0000000..e1cb04e --- /dev/null +++ b/apps/weather/src/tests.py @@ -0,0 +1,37 @@ +import unittest + +from utils import format_coord, parse_coord + + +class TestCoordinateFormatting(unittest.TestCase): + + def test_round_trip(self): + cases = [ + (52.25, 4.0, "N5225E0400"), + (-13.5, -122.0, "S1350W12200"), + (0.0, 0.0, "N0000E0000"), + (51.75, 6.5, "N5175E0650"), + (-0.25, 0.25, "S0025E0025"), + ] + for lat, lon, expected in cases: + with self.subTest(lat=lat, lon=lon): + self.assertEqual(format_coord(lat, lon), expected) + parsed_lat, parsed_lon = parse_coord(expected) + print(f"Parsed: {parsed_lat}, {parsed_lon} from {expected}") + self.assertAlmostEqual(parsed_lat, lat, places=6) + self.assertAlmostEqual(parsed_lon, lon, places=6) + + def test_invalid_length(self): + with self.assertRaises(ValueError): + parse_coord("N52E4") # too short, malformed + + def test_negative_coordinates(self): + coord = format_coord(-52.25, -4.0) + self.assertEqual(coord, "S5225W0400") + lat, lon = parse_coord(coord) + self.assertAlmostEqual(lat, -52.25, places=6) + self.assertAlmostEqual(lon, -4.0, places=6) + + +if __name__ == "__main__": + unittest.main() diff --git a/apps/weather/src/utils.py b/apps/weather/src/utils.py new file mode 100644 index 0000000..a94faaa --- /dev/null +++ b/apps/weather/src/utils.py @@ -0,0 +1,40 @@ +import re + + +def format_coord(latitude: float, longitude: float) -> str: + """Format latitude and longitude into a string with hemisphere indicators.""" + + def to_str(value: float, pos_letter: str, neg_letter: str) -> str: + """Convert a coordinate value to a string with hemisphere indicator.""" + hemi = pos_letter if value >= 0 else neg_letter + abs_val = abs(value) + degrees = int(abs_val) + fraction = int(round((abs_val - degrees) * 100)) # .25 becomes 25 + return f"{hemi}{degrees:02d}{fraction:02d}" + + lat_str = to_str(latitude, "N", "S") + lon_str = to_str(longitude, "E", "W") + return f"{lat_str}{lon_str}" + + +def parse_coord(coord: str) -> tuple[float, float]: + """ + Parse a formatted coordinate string (e.g. 'N5225E0040' or 'S1350W12200') + back into (latitude, longitude) float values. + """ + # Match hemisphere + degrees + fractional part + match = re.match(r"^([NS])(\d{2,3})(\d{2})([EW])(\d{2,3})(\d{2})$", coord) + if not match: + raise ValueError(f"Invalid coordinate format: {coord}") + + lat_hemi, lat_deg, lat_frac, lon_hemi, lon_deg, lon_frac = match.groups() + + lat = int(lat_deg) + int(lat_frac) / 100.0 + lon = int(lon_deg) + int(lon_frac) / 100.0 + + if lat_hemi == "S": + lat = -lat + if lon_hemi == "W": + lon = -lon + + return lat, lon