diff --git a/apps/vinyl/src/definitions.py b/apps/vinyl/src/definitions.py index 386efe1..553013f 100644 --- a/apps/vinyl/src/definitions.py +++ b/apps/vinyl/src/definitions.py @@ -6,7 +6,7 @@ from dagster_duckdb.io_manager import DbTypeHandler from dagster_duckdb_pandas import DuckDBPandasTypeHandler from dagster_polars import PolarsParquetIOManager from icecream import install -from jobs import check_partititions_job, deals_job, musicbrainz_lookup_job +from jobs import check_partitions_job, deals_job, musicbrainz_lookup_job from schedules import deals_schedule from sensors import musicbrainz_lookup_sensor @@ -32,7 +32,7 @@ definitions = Definitions( "polars_parquet_io_manager": PolarsParquetIOManager(base_dir="/storage"), "duckdb_io_manager": PandasDuckDBIOManager(database="vinyl"), }, - jobs=[deals_job, check_partititions_job, musicbrainz_lookup_job], + jobs=[deals_job, check_partitions_job, musicbrainz_lookup_job], schedules=[deals_schedule], sensors=[musicbrainz_lookup_sensor], ) diff --git a/apps/vinyl/src/jobs.py b/apps/vinyl/src/jobs.py index 9ae537a..dfffb10 100644 --- a/apps/vinyl/src/jobs.py +++ b/apps/vinyl/src/jobs.py @@ -1,3 +1,4 @@ +import polars as pl from assets import deals, new_deals, works from dagster import ( @@ -14,27 +15,22 @@ deals_job = define_asset_job( ) -@op -def check_partititions(context: OpExecutionContext): - # Replace with your asset/job name +@op(required_resource_keys={"polars_parquet_io_manager"}) +def check_partitions(context: OpExecutionContext): asset_key = "deals" - context.log_event( - AssetMaterialization(asset_key=asset_key, partition="2024-09-30|sounds") - ) - # Fetch the materializations for the asset key materializations = context.instance.get_materialized_partitions( asset_key=AssetKey(asset_key) ) - context.log.info("Existing partitions", extra=dict(partitions=materializations)) + ic(materializations) - import polars as pl - - storage_dir = context.instance.storage_directory() + storage_dir = context.resources.polars_parquet_io_manager.base_dir ic(storage_dir) for row in ( - pl.scan_parquet(f"{storage_dir}/{asset_key}/*/*.parquet") + pl.scan_parquet( + f"{storage_dir}/{asset_key}/*/*.parquet", extra_columns="ignore" + ) .select(["date", "source"]) .unique() .collect() @@ -49,8 +45,8 @@ def check_partititions(context: OpExecutionContext): @job -def check_partititions_job(): - check_partititions() +def check_partitions_job(): + check_partitions() musicbrainz_lookup_job = define_asset_job(