diff --git a/apps/vinyl/src/jobs.py b/apps/vinyl/src/jobs.py index 732b1e2..b90c44a 100644 --- a/apps/vinyl/src/jobs.py +++ b/apps/vinyl/src/jobs.py @@ -1,28 +1,34 @@ +from functools import partial + import polars as pl from assets import deals +from config import APP import dagster as dg -deals_job = dg.define_asset_job( +kwargs = dict(tags={"app": APP}) +job = partial(dg.job, **kwargs) +define_asset_job = partial(dg.define_asset_job, **kwargs) + +deals_job = define_asset_job( "deals_job", selection=[deals.key], partitions_def=deals.partitions_def ) @dg.op(required_resource_keys={"polars_parquet_io_manager"}) -def check_partitions(context: dg.OpExecutionContext): - asset_key = "deals" +def check_partitions(context: dg.OpExecutionContext) -> None: + asset_key = deals.key # Fetch the materialized partitions for the asset key - materialized_partitions = context.instance.get_materialized_partitions( - asset_key=dg.AssetKey(asset_key) - ) + materialized_partitions = context.instance.get_materialized_partitions(asset_key) ic(materialized_partitions) storage_dir = context.resources.polars_parquet_io_manager.base_dir - ic(storage_dir) + asset_path = "/".join(asset_key.path) + ic(storage_dir, asset_key, asset_path) for row in ( pl.scan_parquet( - f"{storage_dir}/{asset_key}/*/*.parquet", # extra_columns="ignore" + f"{storage_dir}/{asset_path}/*/*.parquet", extra_columns="ignore" ) .select(["date", "source"]) .unique() @@ -37,6 +43,6 @@ def check_partitions(context: dg.OpExecutionContext): ) -@dg.job +@job def check_partitions_job(): check_partitions()