From 3f99f354de0a5ca69166180e05e95269f3bbfa53 Mon Sep 17 00:00:00 2001 From: Rik Veenboer Date: Wed, 29 Oct 2025 13:59:21 +0100 Subject: [PATCH] align raw weather output files --- apps/weather/src/assets.py | 4 +++- shared/src/shared/io_manager/base.py | 18 ++++++++++++++++++ 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/apps/weather/src/assets.py b/apps/weather/src/assets.py index aadf1a6..b2a844b 100644 --- a/apps/weather/src/assets.py +++ b/apps/weather/src/assets.py @@ -102,13 +102,15 @@ def raw_weather(context: dg.AssetExecutionContext) -> Any: now = datetime.now(tz=timezone.utc) date_str = now.strftime("%Y-%m-%d") time_str = now.strftime("%H:%M:%S") + + latitude_str, longitude_str = partition_key[:5], partition_key[5:] yield dg.Output( data, metadata={ "date": dg.MetadataValue.timestamp(now), "latitude": dg.MetadataValue.float(latitude), "longitude": dg.MetadataValue.float(longitude), - "path_suffix": [date_str, time_str], + "path": [APP, "raw", date_str, latitude_str, longitude_str, time_str], }, ) diff --git a/shared/src/shared/io_manager/base.py b/shared/src/shared/io_manager/base.py index 83f2c90..fe6283f 100644 --- a/shared/src/shared/io_manager/base.py +++ b/shared/src/shared/io_manager/base.py @@ -6,6 +6,10 @@ from pydantic import Field, PrivateAttr from upath import UPath import dagster as dg +from dagster import ( + InputContext, + OutputContext, +) def _process_env_vars(config: dict[str, Any]) -> dict[str, Any]: @@ -60,12 +64,26 @@ class BaseIOManager(dg.ConfigurableIOManager, dg.UPathIOManager, ABC): with path.open("r") as fp: return json.load(fp) + def get_path_for_partition( + self, context: InputContext | OutputContext, path: "UPath", partition: str + ) -> UPath: + """Use path from metadata when provided.""" + ic() + context_metadata = context.output_metadata or {} + ic(context_metadata) + + if "path" in context_metadata: + return UPath(*context_metadata["path"].value) + return super().get_path_for_partition(context) + def get_asset_relative_path( self, context: dg.InputContext | dg.OutputContext ) -> UPath: """Get the relative path for the asset based on context metadata.""" + ic() context_metadata = context.output_metadata or {} ic(context_metadata) + path_prefix = ( context_metadata["path_prefix"].value if "path_prefix" in context_metadata