add asset key prefix
This commit is contained in:
@@ -1,7 +1,9 @@
|
|||||||
from datetime import datetime, timezone
|
from datetime import datetime, timezone
|
||||||
|
from functools import partial
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
import requests_cache
|
import requests_cache
|
||||||
|
from config import APP
|
||||||
from partitions import location_partitions_def
|
from partitions import location_partitions_def
|
||||||
from requests import Request
|
from requests import Request
|
||||||
from retry_requests import retry
|
from retry_requests import retry
|
||||||
@@ -9,20 +11,13 @@ from utils import parse_coord
|
|||||||
|
|
||||||
import dagster as dg
|
import dagster as dg
|
||||||
|
|
||||||
|
asset = partial(dg.asset, key_prefix=APP)
|
||||||
@dg.asset(
|
|
||||||
io_manager_key="json_io_manager",
|
|
||||||
)
|
|
||||||
def organised():
|
|
||||||
yield dg.Output(
|
|
||||||
{"interesting": "data"},
|
|
||||||
metadata={"where": "something", "path_prefix": "a/b", "path_suffix": "c/d"},
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
@dg.asset(
|
@asset(
|
||||||
io_manager_key="json_io_manager",
|
io_manager_key="json_io_manager",
|
||||||
partitions_def=location_partitions_def,
|
partitions_def=location_partitions_def,
|
||||||
|
name="raw",
|
||||||
)
|
)
|
||||||
def raw_weather(context: dg.AssetExecutionContext) -> Any:
|
def raw_weather(context: dg.AssetExecutionContext) -> Any:
|
||||||
"""Asset to fetch raw weather data for each location."""
|
"""Asset to fetch raw weather data for each location."""
|
||||||
|
|||||||
4
apps/weather/src/config.py
Normal file
4
apps/weather/src/config.py
Normal file
@@ -0,0 +1,4 @@
|
|||||||
|
import os
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
APP = os.environ.get("APP", Path(__file__).parent.parent.name)
|
||||||
@@ -1,8 +1,8 @@
|
|||||||
import os
|
import os
|
||||||
from pathlib import Path
|
|
||||||
|
|
||||||
import assets
|
import assets
|
||||||
import sensors
|
import sensors
|
||||||
|
from config import APP
|
||||||
from dagster_polars import PolarsParquetIOManager
|
from dagster_polars import PolarsParquetIOManager
|
||||||
from icecream import install
|
from icecream import install
|
||||||
from resoures import JsonIOManager
|
from resoures import JsonIOManager
|
||||||
@@ -11,9 +11,7 @@ import dagster as dg
|
|||||||
|
|
||||||
install()
|
install()
|
||||||
|
|
||||||
APP = os.environ.get("APP", Path(__file__).parent.parent.name)
|
storage_dir = os.environ.get("STORAGE_DIR", "/storage")
|
||||||
|
|
||||||
storage_dir = os.environ.get("STORAGE_DIR", "/storage") + f"/{APP}"
|
|
||||||
|
|
||||||
definitions = dg.Definitions(
|
definitions = dg.Definitions(
|
||||||
assets=[
|
assets=[
|
||||||
|
|||||||
@@ -4,7 +4,7 @@ from datetime import datetime
|
|||||||
from dotenv import find_dotenv, load_dotenv
|
from dotenv import find_dotenv, load_dotenv
|
||||||
|
|
||||||
import dagster as dg
|
import dagster as dg
|
||||||
from apps.weather.src.assets import organised
|
from apps.weather.src.assets import raw_weather
|
||||||
|
|
||||||
logging.getLogger().setLevel(logging.INFO)
|
logging.getLogger().setLevel(logging.INFO)
|
||||||
|
|
||||||
@@ -25,7 +25,8 @@ if __name__ == "__main__":
|
|||||||
case 1:
|
case 1:
|
||||||
dg.materialize(
|
dg.materialize(
|
||||||
assets=definitions.assets,
|
assets=definitions.assets,
|
||||||
selection=[organised.key],
|
selection=[raw_weather.key],
|
||||||
|
partition_key="N5100E0300",
|
||||||
resources=resources,
|
resources=resources,
|
||||||
)
|
)
|
||||||
case _:
|
case _:
|
||||||
|
|||||||
Reference in New Issue
Block a user