diff --git a/apps/weather/src/assets.py b/apps/weather/src/assets.py index 5059fb7..26077e7 100644 --- a/apps/weather/src/assets.py +++ b/apps/weather/src/assets.py @@ -1,3 +1,4 @@ +from datetime import datetime, timezone from typing import Any import requests_cache @@ -9,6 +10,16 @@ from utils import parse_coord import dagster as dg +@dg.asset( + io_manager_key="json_io_manager", +) +def organised(): + yield dg.Output( + {"interesting": "data"}, + metadata={"where": "something", "path_prefix": "a/b", "path_suffix": "c/d"}, + ) + + @dg.asset( io_manager_key="json_io_manager", partitions_def=location_partitions_def, @@ -79,4 +90,14 @@ def raw_weather(context: dg.AssetExecutionContext) -> Any: raise ValueError( f"Error (data['error']) fetching weather data: {data.get('reason', '')}" ) - return response.json() + + now = datetime.now(tz=timezone.utc) + date_str = now.strftime("%Y-%m-%d") + time_str = now.strftime("%H:%M:%S") + yield dg.Output( + data, + metadata={ + "date": dg.MetadataValue.timestamp(now), + "path_suffix": [date_str, time_str], + }, + ) diff --git a/apps/weather/src/definitions.py b/apps/weather/src/definitions.py index 61e2f52..ca80dcc 100644 --- a/apps/weather/src/definitions.py +++ b/apps/weather/src/definitions.py @@ -1,4 +1,5 @@ import os +from pathlib import Path import assets import sensors @@ -10,7 +11,7 @@ import dagster as dg install() -APP = os.environ["APP"] +APP = os.environ.get("APP", Path(__file__).parent.parent.name) storage_dir = os.environ.get("STORAGE_DIR", "/storage") + f"/{APP}" diff --git a/apps/weather/src/resoures.py b/apps/weather/src/resoures.py index d1fd0cb..8a6d85c 100644 --- a/apps/weather/src/resoures.py +++ b/apps/weather/src/resoures.py @@ -1,13 +1,11 @@ import json -from typing import TYPE_CHECKING, Any +from typing import Any from pydantic import Field, PrivateAttr +from upath import UPath import dagster as dg -if TYPE_CHECKING: - from upath import UPath - def _process_env_vars(config: dict[str, Any]) -> dict[str, Any]: out = {} @@ -41,8 +39,6 @@ class JsonIOManager(dg.ConfigurableIOManager, dg.UPathIOManager): _base_path = PrivateAttr() def setup_for_execution(self, context: dg.InitResourceContext) -> None: - from upath import UPath - sp = ( _process_env_vars(self.cloud_storage_options) if self.cloud_storage_options is not None @@ -54,7 +50,9 @@ class JsonIOManager(dg.ConfigurableIOManager, dg.UPathIOManager): else UPath(dg._check.not_none(context.instance).storage_directory()) ) - def dump_to_path(self, context: dg.OutputContext, obj: object, path: "UPath"): + def dump_to_path( + self, context: dg.OutputContext, obj: object, path: "UPath" + ) -> None: """This saves the output of an asset to a file path.""" with path.open("w") as fp: json.dump(obj, fp, indent=4) @@ -63,3 +61,25 @@ class JsonIOManager(dg.ConfigurableIOManager, dg.UPathIOManager): """This loads an input for a downstream asset from a file path.""" with path.open("r") as fp: return json.load(fp) + + def get_asset_relative_path( + self, context: dg.InputContext | dg.OutputContext + ) -> UPath: + """Get the relative path for the asset based on context metadata.""" + context_metadata = context.output_metadata or {} + ic(context_metadata) + path_prefix = ( + context_metadata["path_prefix"].value + if "path_prefix" in context_metadata + else [] + ) + path_suffix = ( + context_metadata["path_suffix"].value + if "path_suffix" in context_metadata + else [] + ) + + ic(path_prefix, context.asset_key.path, path_suffix) + + # we are not using context.get_asset_identifier() because it already includes the partition_key + return UPath(*path_prefix, *context.asset_key.path, *path_suffix) diff --git a/apps/weather/src/test.py b/apps/weather/src/test.py new file mode 100644 index 0000000..6cf058c --- /dev/null +++ b/apps/weather/src/test.py @@ -0,0 +1,32 @@ +import logging +from datetime import datetime + +from dotenv import find_dotenv, load_dotenv + +import dagster as dg +from apps.weather.src.assets import organised + +logging.getLogger().setLevel(logging.INFO) + + +def today_str(): + """Returns today's date as a string in the format YYYY-MM-DD.""" + return datetime.today().strftime("%Y-%m-%d") + + +if __name__ == "__main__": + load_dotenv(find_dotenv()) + from definitions import definitions + + run = 1 + resources = definitions.resources + + match run: + case 1: + dg.materialize( + assets=definitions.assets, + selection=[organised.key], + resources=resources, + ) + case _: + raise ValueError(f"Invalid run number: {run}!")