diff --git a/apps/vinyl/src/assets.py b/apps/vinyl/src/assets.py index 5f5cd9a..f4758b0 100644 --- a/apps/vinyl/src/assets.py +++ b/apps/vinyl/src/assets.py @@ -1,10 +1,12 @@ from collections.abc import Iterator from datetime import datetime +from functools import partial from glob import glob from types import SimpleNamespace import polars as pl import structlog +from config import APP from dagster_polars.patito import patito_model_to_dagster_type from jinja2 import Environment, FileSystemLoader from models import Deal @@ -18,10 +20,11 @@ from utils.email import EmailService import dagster as dg +asset = partial(dg.asset, key_prefix=APP) logger = structlog.get_logger() -@dg.asset( +@asset( io_manager_key="polars_parquet_io_manager", partitions_def=multi_partitions_def, config_schema={"import_dir": dg.Field(str, default_value="/storage/import")}, @@ -77,7 +80,7 @@ def deals(context: dg.AssetExecutionContext) -> pl.DataFrame: ) -@dg.asset( +@asset( io_manager_key="polars_parquet_io_manager", partitions_def=deals.partitions_def, ins={"df": dg.AssetIn(key=deals.key)}, @@ -116,7 +119,7 @@ def cleaned_deals( ) -@dg.asset( +@asset( deps=[cleaned_deals], io_manager_key="polars_parquet_io_manager", automation_condition=dg.AutomationCondition.on_missing().without( @@ -135,7 +138,7 @@ def works(context: dg.AssetExecutionContext) -> pl.DataFrame | None: return None -@dg.asset( +@asset( io_manager_key="polars_parquet_io_manager", partitions_def=multi_partitions_def, ins={ @@ -202,7 +205,7 @@ def new_deals( context.log.info("No new deals found!") -@dg.asset( +@asset( io_manager_key="polars_parquet_io_manager", partitions_def=daily_partitions_def, metadata={ diff --git a/apps/vinyl/src/config.py b/apps/vinyl/src/config.py new file mode 100644 index 0000000..fefeadf --- /dev/null +++ b/apps/vinyl/src/config.py @@ -0,0 +1,4 @@ +import os +from pathlib import Path + +APP = os.environ.get("APP", Path(__file__).parent.parent.name) diff --git a/apps/vinyl/src/definitions.py b/apps/vinyl/src/definitions.py index d601ca9..ad184b9 100644 --- a/apps/vinyl/src/definitions.py +++ b/apps/vinyl/src/definitions.py @@ -1,7 +1,7 @@ import os -from pathlib import Path import assets +from config import APP from dagster_polars import PolarsParquetIOManager from icecream import install from jobs import check_partitions_job, deals_job @@ -10,8 +10,6 @@ from utils.email import EmailService import dagster as dg -APP = os.environ.get("APP", Path(__file__).parent.parent.name) - install() @@ -40,7 +38,7 @@ definitions = dg.Definitions( ], resources={ "polars_parquet_io_manager": PolarsParquetIOManager( - base_dir=os.environ.get("STORAGE_DIR", "/storage") + f"/{APP}" + base_dir=os.environ.get("STORAGE_DIR", "/storage") ), "email_service": EmailService( smtp_server=dg.EnvVar("SMTP_SERVER"),