put vinyl assets under key

This commit is contained in:
2025-07-27 18:17:56 +02:00
parent 8e8b6190b9
commit e4dddd0400
3 changed files with 14 additions and 9 deletions

View File

@@ -1,10 +1,12 @@
from collections.abc import Iterator from collections.abc import Iterator
from datetime import datetime from datetime import datetime
from functools import partial
from glob import glob from glob import glob
from types import SimpleNamespace from types import SimpleNamespace
import polars as pl import polars as pl
import structlog import structlog
from config import APP
from dagster_polars.patito import patito_model_to_dagster_type from dagster_polars.patito import patito_model_to_dagster_type
from jinja2 import Environment, FileSystemLoader from jinja2 import Environment, FileSystemLoader
from models import Deal from models import Deal
@@ -18,10 +20,11 @@ from utils.email import EmailService
import dagster as dg import dagster as dg
asset = partial(dg.asset, key_prefix=APP)
logger = structlog.get_logger() logger = structlog.get_logger()
@dg.asset( @asset(
io_manager_key="polars_parquet_io_manager", io_manager_key="polars_parquet_io_manager",
partitions_def=multi_partitions_def, partitions_def=multi_partitions_def,
config_schema={"import_dir": dg.Field(str, default_value="/storage/import")}, config_schema={"import_dir": dg.Field(str, default_value="/storage/import")},
@@ -77,7 +80,7 @@ def deals(context: dg.AssetExecutionContext) -> pl.DataFrame:
) )
@dg.asset( @asset(
io_manager_key="polars_parquet_io_manager", io_manager_key="polars_parquet_io_manager",
partitions_def=deals.partitions_def, partitions_def=deals.partitions_def,
ins={"df": dg.AssetIn(key=deals.key)}, ins={"df": dg.AssetIn(key=deals.key)},
@@ -116,7 +119,7 @@ def cleaned_deals(
) )
@dg.asset( @asset(
deps=[cleaned_deals], deps=[cleaned_deals],
io_manager_key="polars_parquet_io_manager", io_manager_key="polars_parquet_io_manager",
automation_condition=dg.AutomationCondition.on_missing().without( automation_condition=dg.AutomationCondition.on_missing().without(
@@ -135,7 +138,7 @@ def works(context: dg.AssetExecutionContext) -> pl.DataFrame | None:
return None return None
@dg.asset( @asset(
io_manager_key="polars_parquet_io_manager", io_manager_key="polars_parquet_io_manager",
partitions_def=multi_partitions_def, partitions_def=multi_partitions_def,
ins={ ins={
@@ -202,7 +205,7 @@ def new_deals(
context.log.info("No new deals found!") context.log.info("No new deals found!")
@dg.asset( @asset(
io_manager_key="polars_parquet_io_manager", io_manager_key="polars_parquet_io_manager",
partitions_def=daily_partitions_def, partitions_def=daily_partitions_def,
metadata={ metadata={

4
apps/vinyl/src/config.py Normal file
View File

@@ -0,0 +1,4 @@
import os
from pathlib import Path
APP = os.environ.get("APP", Path(__file__).parent.parent.name)

View File

@@ -1,7 +1,7 @@
import os import os
from pathlib import Path
import assets import assets
from config import APP
from dagster_polars import PolarsParquetIOManager from dagster_polars import PolarsParquetIOManager
from icecream import install from icecream import install
from jobs import check_partitions_job, deals_job from jobs import check_partitions_job, deals_job
@@ -10,8 +10,6 @@ from utils.email import EmailService
import dagster as dg import dagster as dg
APP = os.environ.get("APP", Path(__file__).parent.parent.name)
install() install()
@@ -40,7 +38,7 @@ definitions = dg.Definitions(
], ],
resources={ resources={
"polars_parquet_io_manager": PolarsParquetIOManager( "polars_parquet_io_manager": PolarsParquetIOManager(
base_dir=os.environ.get("STORAGE_DIR", "/storage") + f"/{APP}" base_dir=os.environ.get("STORAGE_DIR", "/storage")
), ),
"email_service": EmailService( "email_service": EmailService(
smtp_server=dg.EnvVar("SMTP_SERVER"), smtp_server=dg.EnvVar("SMTP_SERVER"),