align raw weather output files

This commit is contained in:
2025-10-29 13:59:21 +01:00
parent 204565118b
commit 3f99f354de
2 changed files with 21 additions and 1 deletions

View File

@@ -102,13 +102,15 @@ def raw_weather(context: dg.AssetExecutionContext) -> Any:
now = datetime.now(tz=timezone.utc) now = datetime.now(tz=timezone.utc)
date_str = now.strftime("%Y-%m-%d") date_str = now.strftime("%Y-%m-%d")
time_str = now.strftime("%H:%M:%S") time_str = now.strftime("%H:%M:%S")
latitude_str, longitude_str = partition_key[:5], partition_key[5:]
yield dg.Output( yield dg.Output(
data, data,
metadata={ metadata={
"date": dg.MetadataValue.timestamp(now), "date": dg.MetadataValue.timestamp(now),
"latitude": dg.MetadataValue.float(latitude), "latitude": dg.MetadataValue.float(latitude),
"longitude": dg.MetadataValue.float(longitude), "longitude": dg.MetadataValue.float(longitude),
"path_suffix": [date_str, time_str], "path": [APP, "raw", date_str, latitude_str, longitude_str, time_str],
}, },
) )

View File

@@ -6,6 +6,10 @@ from pydantic import Field, PrivateAttr
from upath import UPath from upath import UPath
import dagster as dg import dagster as dg
from dagster import (
InputContext,
OutputContext,
)
def _process_env_vars(config: dict[str, Any]) -> dict[str, Any]: def _process_env_vars(config: dict[str, Any]) -> dict[str, Any]:
@@ -60,12 +64,26 @@ class BaseIOManager(dg.ConfigurableIOManager, dg.UPathIOManager, ABC):
with path.open("r") as fp: with path.open("r") as fp:
return json.load(fp) return json.load(fp)
def get_path_for_partition(
self, context: InputContext | OutputContext, path: "UPath", partition: str
) -> UPath:
"""Use path from metadata when provided."""
ic()
context_metadata = context.output_metadata or {}
ic(context_metadata)
if "path" in context_metadata:
return UPath(*context_metadata["path"].value)
return super().get_path_for_partition(context)
def get_asset_relative_path( def get_asset_relative_path(
self, context: dg.InputContext | dg.OutputContext self, context: dg.InputContext | dg.OutputContext
) -> UPath: ) -> UPath:
"""Get the relative path for the asset based on context metadata.""" """Get the relative path for the asset based on context metadata."""
ic()
context_metadata = context.output_metadata or {} context_metadata = context.output_metadata or {}
ic(context_metadata) ic(context_metadata)
path_prefix = ( path_prefix = (
context_metadata["path_prefix"].value context_metadata["path_prefix"].value
if "path_prefix" in context_metadata if "path_prefix" in context_metadata