From 1e9078fed5de69c779e976051463b44c42a75d70 Mon Sep 17 00:00:00 2001 From: Rik Veenboer Date: Sun, 27 Jul 2025 18:06:47 +0200 Subject: [PATCH] add asset key prefix --- apps/weather/src/assets.py | 15 +++++---------- apps/weather/src/config.py | 4 ++++ apps/weather/src/definitions.py | 6 ++---- apps/weather/src/test.py | 5 +++-- 4 files changed, 14 insertions(+), 16 deletions(-) create mode 100644 apps/weather/src/config.py diff --git a/apps/weather/src/assets.py b/apps/weather/src/assets.py index 26077e7..b18ec3e 100644 --- a/apps/weather/src/assets.py +++ b/apps/weather/src/assets.py @@ -1,7 +1,9 @@ from datetime import datetime, timezone +from functools import partial from typing import Any import requests_cache +from config import APP from partitions import location_partitions_def from requests import Request from retry_requests import retry @@ -9,20 +11,13 @@ from utils import parse_coord import dagster as dg - -@dg.asset( - io_manager_key="json_io_manager", -) -def organised(): - yield dg.Output( - {"interesting": "data"}, - metadata={"where": "something", "path_prefix": "a/b", "path_suffix": "c/d"}, - ) +asset = partial(dg.asset, key_prefix=APP) -@dg.asset( +@asset( io_manager_key="json_io_manager", partitions_def=location_partitions_def, + name="raw", ) def raw_weather(context: dg.AssetExecutionContext) -> Any: """Asset to fetch raw weather data for each location.""" diff --git a/apps/weather/src/config.py b/apps/weather/src/config.py new file mode 100644 index 0000000..fefeadf --- /dev/null +++ b/apps/weather/src/config.py @@ -0,0 +1,4 @@ +import os +from pathlib import Path + +APP = os.environ.get("APP", Path(__file__).parent.parent.name) diff --git a/apps/weather/src/definitions.py b/apps/weather/src/definitions.py index ca80dcc..c69b971 100644 --- a/apps/weather/src/definitions.py +++ b/apps/weather/src/definitions.py @@ -1,8 +1,8 @@ import os -from pathlib import Path import assets import sensors +from config import APP from dagster_polars import PolarsParquetIOManager from icecream import install from resoures import JsonIOManager @@ -11,9 +11,7 @@ import dagster as dg install() -APP = os.environ.get("APP", Path(__file__).parent.parent.name) - -storage_dir = os.environ.get("STORAGE_DIR", "/storage") + f"/{APP}" +storage_dir = os.environ.get("STORAGE_DIR", "/storage") definitions = dg.Definitions( assets=[ diff --git a/apps/weather/src/test.py b/apps/weather/src/test.py index 6cf058c..bf56cef 100644 --- a/apps/weather/src/test.py +++ b/apps/weather/src/test.py @@ -4,7 +4,7 @@ from datetime import datetime from dotenv import find_dotenv, load_dotenv import dagster as dg -from apps.weather.src.assets import organised +from apps.weather.src.assets import raw_weather logging.getLogger().setLevel(logging.INFO) @@ -25,7 +25,8 @@ if __name__ == "__main__": case 1: dg.materialize( assets=definitions.assets, - selection=[organised.key], + selection=[raw_weather.key], + partition_key="N5100E0300", resources=resources, ) case _: