shared base for file IO manager

This commit is contained in:
2025-07-29 11:28:44 +02:00
parent 80c0452cde
commit d74823960f
2 changed files with 42 additions and 7 deletions

View File

@@ -0,0 +1,37 @@
import json
from typing import Any
from shared.resources import BaseIOManager
from upath import UPath
import dagster as dg
def _process_env_vars(config: dict[str, Any]) -> dict[str, Any]:
out = {}
for key, value in config.items():
if (
isinstance(value, dict)
and len(value) == 1
and next(iter(value.keys())) == "env"
):
out[key] = dg.EnvVar(next(iter(value.values()))).get_value()
else:
out[key] = value
return out
class JsonIOManager(BaseIOManager):
extension: str = ".json" # Automatically adds a .json extension
def dump_to_path(
self, context: dg.OutputContext, obj: object, path: "UPath"
) -> None:
"""This saves the output of an asset to a file path."""
with path.open("w") as fp:
json.dump(obj, fp, indent=4)
def load_from_path(self, context: dg.InputContext, path: "UPath") -> object:
"""This loads an input for a downstream asset from a file path."""
with path.open("r") as fp:
return json.load(fp)

View File

@@ -1,85 +0,0 @@
import json
from typing import Any
from pydantic import Field, PrivateAttr
from upath import UPath
import dagster as dg
def _process_env_vars(config: dict[str, Any]) -> dict[str, Any]:
out = {}
for key, value in config.items():
if (
isinstance(value, dict)
and len(value) == 1
and next(iter(value.keys())) == "env"
):
out[key] = dg.EnvVar(next(iter(value.values()))).get_value()
else:
out[key] = value
return out
class JsonIOManager(dg.ConfigurableIOManager, dg.UPathIOManager):
"""
An I/O manager that stores data as JSON files. It inherits from
UPathIOManager, which handles path creation for local/cloud storage.
"""
extension: str = ".json" # Automatically adds a .json extension
base_dir: str | None = Field(
default=None, description="Base directory for storing files."
)
cloud_storage_options: dict[str, Any] | None = Field(
default=None,
description="Storage authentication for cloud object store",
alias="storage_options",
)
_base_path = PrivateAttr()
def setup_for_execution(self, context: dg.InitResourceContext) -> None:
sp = (
_process_env_vars(self.cloud_storage_options)
if self.cloud_storage_options is not None
else {}
)
self._base_path = (
UPath(self.base_dir, **sp)
if self.base_dir is not None
else UPath(dg._check.not_none(context.instance).storage_directory())
)
def dump_to_path(
self, context: dg.OutputContext, obj: object, path: "UPath"
) -> None:
"""This saves the output of an asset to a file path."""
with path.open("w") as fp:
json.dump(obj, fp, indent=4)
def load_from_path(self, context: dg.InputContext, path: "UPath") -> object:
"""This loads an input for a downstream asset from a file path."""
with path.open("r") as fp:
return json.load(fp)
def get_asset_relative_path(
self, context: dg.InputContext | dg.OutputContext
) -> UPath:
"""Get the relative path for the asset based on context metadata."""
context_metadata = context.output_metadata or {}
ic(context_metadata)
path_prefix = (
context_metadata["path_prefix"].value
if "path_prefix" in context_metadata
else []
)
path_suffix = (
context_metadata["path_suffix"].value
if "path_suffix" in context_metadata
else []
)
ic(path_prefix, context.asset_key.path, path_suffix)
# we are not using context.get_asset_identifier() because it already includes the partition_key
return UPath(*path_prefix, *context.asset_key.path, *path_suffix)