from functools import partial import assets import polars as pl from config import APP import dagster as dg kwargs = dict(tags={"app": APP}) job = partial(dg.job, **kwargs) define_asset_job = partial(dg.define_asset_job, **kwargs) deals_job = dg.define_asset_job( "deals_job", selection=[assets.deals.key], partitions_def=assets.deals.partitions_def, ) @dg.op(required_resource_keys={"polars_parquet_io_manager"}) def check_partitions(context: dg.OpExecutionContext) -> None: for asset in [assets.deals, assets.cleaned_deals]: asset_key = asset.key # Fetch the materialized partitions for the asset key materialized_partitions = context.instance.get_materialized_partitions( asset_key ) ic(materialized_partitions) storage_dir = context.resources.polars_parquet_io_manager.base_dir asset_path = "/".join(asset_key.path) ic(storage_dir, asset_key, asset_path) dimension_names = asset.partitions_def.partition_dimension_names ic(dimension_names) partitions = [] for row in ( pl.scan_parquet( f"{storage_dir}/{asset_path}/*/*.parquet", extra_columns="ignore" ) .select(dimension_names) .unique() .with_columns(pl.exclude(pl.String).cast(str)) .collect() .iter_rows() ): partition = "|".join(row) if partition not in materialized_partitions: context.log.info(f"[{asset_key}] Adding partition: {partition}") context.log_event( dg.AssetMaterialization(asset_key=asset_key, partition=partition) ) partitions.append(partition) missing = set(materialized_partitions) - set(partitions) ic(missing) for partition in missing: context.log.info(f"[{asset_key}] Should remove partition: {partition}") @job def check_partitions_job(): check_partitions()