diff --git a/apps/weather/src/resources.py b/apps/weather/src/resources.py new file mode 100644 index 0000000..4f73a44 --- /dev/null +++ b/apps/weather/src/resources.py @@ -0,0 +1,37 @@ +import json +from typing import Any + +from shared.resources import BaseIOManager +from upath import UPath + +import dagster as dg + + +def _process_env_vars(config: dict[str, Any]) -> dict[str, Any]: + out = {} + for key, value in config.items(): + if ( + isinstance(value, dict) + and len(value) == 1 + and next(iter(value.keys())) == "env" + ): + out[key] = dg.EnvVar(next(iter(value.values()))).get_value() + else: + out[key] = value + return out + + +class JsonIOManager(BaseIOManager): + extension: str = ".json" # Automatically adds a .json extension + + def dump_to_path( + self, context: dg.OutputContext, obj: object, path: "UPath" + ) -> None: + """This saves the output of an asset to a file path.""" + with path.open("w") as fp: + json.dump(obj, fp, indent=4) + + def load_from_path(self, context: dg.InputContext, path: "UPath") -> object: + """This loads an input for a downstream asset from a file path.""" + with path.open("r") as fp: + return json.load(fp) diff --git a/apps/weather/src/resoures.py b/shared/src/shared/resources.py similarity index 89% rename from apps/weather/src/resoures.py rename to shared/src/shared/resources.py index 8a6d85c..83f2c90 100644 --- a/apps/weather/src/resoures.py +++ b/shared/src/shared/resources.py @@ -1,4 +1,5 @@ import json +from abc import ABC, abstractmethod from typing import Any from pydantic import Field, PrivateAttr @@ -21,13 +22,8 @@ def _process_env_vars(config: dict[str, Any]) -> dict[str, Any]: return out -class JsonIOManager(dg.ConfigurableIOManager, dg.UPathIOManager): - """ - An I/O manager that stores data as JSON files. It inherits from - UPathIOManager, which handles path creation for local/cloud storage. - """ - - extension: str = ".json" # Automatically adds a .json extension +class BaseIOManager(dg.ConfigurableIOManager, dg.UPathIOManager, ABC): + extension: str base_dir: str | None = Field( default=None, description="Base directory for storing files." ) @@ -50,6 +46,7 @@ class JsonIOManager(dg.ConfigurableIOManager, dg.UPathIOManager): else UPath(dg._check.not_none(context.instance).storage_directory()) ) + @abstractmethod def dump_to_path( self, context: dg.OutputContext, obj: object, path: "UPath" ) -> None: @@ -57,6 +54,7 @@ class JsonIOManager(dg.ConfigurableIOManager, dg.UPathIOManager): with path.open("w") as fp: json.dump(obj, fp, indent=4) + @abstractmethod def load_from_path(self, context: dg.InputContext, path: "UPath") -> object: """This loads an input for a downstream asset from a file path.""" with path.open("r") as fp: