organize weather data by retrieval time
This commit is contained in:
@@ -1,3 +1,4 @@
|
||||
from datetime import datetime, timezone
|
||||
from typing import Any
|
||||
|
||||
import requests_cache
|
||||
@@ -9,6 +10,16 @@ from utils import parse_coord
|
||||
import dagster as dg
|
||||
|
||||
|
||||
@dg.asset(
|
||||
io_manager_key="json_io_manager",
|
||||
)
|
||||
def organised():
|
||||
yield dg.Output(
|
||||
{"interesting": "data"},
|
||||
metadata={"where": "something", "path_prefix": "a/b", "path_suffix": "c/d"},
|
||||
)
|
||||
|
||||
|
||||
@dg.asset(
|
||||
io_manager_key="json_io_manager",
|
||||
partitions_def=location_partitions_def,
|
||||
@@ -79,4 +90,14 @@ def raw_weather(context: dg.AssetExecutionContext) -> Any:
|
||||
raise ValueError(
|
||||
f"Error (data['error']) fetching weather data: {data.get('reason', '')}"
|
||||
)
|
||||
return response.json()
|
||||
|
||||
now = datetime.now(tz=timezone.utc)
|
||||
date_str = now.strftime("%Y-%m-%d")
|
||||
time_str = now.strftime("%H:%M:%S")
|
||||
yield dg.Output(
|
||||
data,
|
||||
metadata={
|
||||
"date": dg.MetadataValue.timestamp(now),
|
||||
"path_suffix": [date_str, time_str],
|
||||
},
|
||||
)
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
import assets
|
||||
import sensors
|
||||
@@ -10,7 +11,7 @@ import dagster as dg
|
||||
|
||||
install()
|
||||
|
||||
APP = os.environ["APP"]
|
||||
APP = os.environ.get("APP", Path(__file__).parent.parent.name)
|
||||
|
||||
storage_dir = os.environ.get("STORAGE_DIR", "/storage") + f"/{APP}"
|
||||
|
||||
|
||||
@@ -1,13 +1,11 @@
|
||||
import json
|
||||
from typing import TYPE_CHECKING, Any
|
||||
from typing import Any
|
||||
|
||||
from pydantic import Field, PrivateAttr
|
||||
from upath import UPath
|
||||
|
||||
import dagster as dg
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from upath import UPath
|
||||
|
||||
|
||||
def _process_env_vars(config: dict[str, Any]) -> dict[str, Any]:
|
||||
out = {}
|
||||
@@ -41,8 +39,6 @@ class JsonIOManager(dg.ConfigurableIOManager, dg.UPathIOManager):
|
||||
_base_path = PrivateAttr()
|
||||
|
||||
def setup_for_execution(self, context: dg.InitResourceContext) -> None:
|
||||
from upath import UPath
|
||||
|
||||
sp = (
|
||||
_process_env_vars(self.cloud_storage_options)
|
||||
if self.cloud_storage_options is not None
|
||||
@@ -54,7 +50,9 @@ class JsonIOManager(dg.ConfigurableIOManager, dg.UPathIOManager):
|
||||
else UPath(dg._check.not_none(context.instance).storage_directory())
|
||||
)
|
||||
|
||||
def dump_to_path(self, context: dg.OutputContext, obj: object, path: "UPath"):
|
||||
def dump_to_path(
|
||||
self, context: dg.OutputContext, obj: object, path: "UPath"
|
||||
) -> None:
|
||||
"""This saves the output of an asset to a file path."""
|
||||
with path.open("w") as fp:
|
||||
json.dump(obj, fp, indent=4)
|
||||
@@ -63,3 +61,25 @@ class JsonIOManager(dg.ConfigurableIOManager, dg.UPathIOManager):
|
||||
"""This loads an input for a downstream asset from a file path."""
|
||||
with path.open("r") as fp:
|
||||
return json.load(fp)
|
||||
|
||||
def get_asset_relative_path(
|
||||
self, context: dg.InputContext | dg.OutputContext
|
||||
) -> UPath:
|
||||
"""Get the relative path for the asset based on context metadata."""
|
||||
context_metadata = context.output_metadata or {}
|
||||
ic(context_metadata)
|
||||
path_prefix = (
|
||||
context_metadata["path_prefix"].value
|
||||
if "path_prefix" in context_metadata
|
||||
else []
|
||||
)
|
||||
path_suffix = (
|
||||
context_metadata["path_suffix"].value
|
||||
if "path_suffix" in context_metadata
|
||||
else []
|
||||
)
|
||||
|
||||
ic(path_prefix, context.asset_key.path, path_suffix)
|
||||
|
||||
# we are not using context.get_asset_identifier() because it already includes the partition_key
|
||||
return UPath(*path_prefix, *context.asset_key.path, *path_suffix)
|
||||
|
||||
32
apps/weather/src/test.py
Normal file
32
apps/weather/src/test.py
Normal file
@@ -0,0 +1,32 @@
|
||||
import logging
|
||||
from datetime import datetime
|
||||
|
||||
from dotenv import find_dotenv, load_dotenv
|
||||
|
||||
import dagster as dg
|
||||
from apps.weather.src.assets import organised
|
||||
|
||||
logging.getLogger().setLevel(logging.INFO)
|
||||
|
||||
|
||||
def today_str():
|
||||
"""Returns today's date as a string in the format YYYY-MM-DD."""
|
||||
return datetime.today().strftime("%Y-%m-%d")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
load_dotenv(find_dotenv())
|
||||
from definitions import definitions
|
||||
|
||||
run = 1
|
||||
resources = definitions.resources
|
||||
|
||||
match run:
|
||||
case 1:
|
||||
dg.materialize(
|
||||
assets=definitions.assets,
|
||||
selection=[organised.key],
|
||||
resources=resources,
|
||||
)
|
||||
case _:
|
||||
raise ValueError(f"Invalid run number: {run}!")
|
||||
Reference in New Issue
Block a user