add existing partitions

This commit is contained in:
2025-07-22 13:37:35 +02:00
parent be8fae969c
commit 91ca2a24a1
2 changed files with 12 additions and 16 deletions

View File

@@ -6,7 +6,7 @@ from dagster_duckdb.io_manager import DbTypeHandler
from dagster_duckdb_pandas import DuckDBPandasTypeHandler from dagster_duckdb_pandas import DuckDBPandasTypeHandler
from dagster_polars import PolarsParquetIOManager from dagster_polars import PolarsParquetIOManager
from icecream import install from icecream import install
from jobs import check_partititions_job, deals_job, musicbrainz_lookup_job from jobs import check_partitions_job, deals_job, musicbrainz_lookup_job
from schedules import deals_schedule from schedules import deals_schedule
from sensors import musicbrainz_lookup_sensor from sensors import musicbrainz_lookup_sensor
@@ -32,7 +32,7 @@ definitions = Definitions(
"polars_parquet_io_manager": PolarsParquetIOManager(base_dir="/storage"), "polars_parquet_io_manager": PolarsParquetIOManager(base_dir="/storage"),
"duckdb_io_manager": PandasDuckDBIOManager(database="vinyl"), "duckdb_io_manager": PandasDuckDBIOManager(database="vinyl"),
}, },
jobs=[deals_job, check_partititions_job, musicbrainz_lookup_job], jobs=[deals_job, check_partitions_job, musicbrainz_lookup_job],
schedules=[deals_schedule], schedules=[deals_schedule],
sensors=[musicbrainz_lookup_sensor], sensors=[musicbrainz_lookup_sensor],
) )

View File

@@ -1,3 +1,4 @@
import polars as pl
from assets import deals, new_deals, works from assets import deals, new_deals, works
from dagster import ( from dagster import (
@@ -14,27 +15,22 @@ deals_job = define_asset_job(
) )
@op @op(required_resource_keys={"polars_parquet_io_manager"})
def check_partititions(context: OpExecutionContext): def check_partitions(context: OpExecutionContext):
# Replace with your asset/job name
asset_key = "deals" asset_key = "deals"
context.log_event(
AssetMaterialization(asset_key=asset_key, partition="2024-09-30|sounds")
)
# Fetch the materializations for the asset key # Fetch the materializations for the asset key
materializations = context.instance.get_materialized_partitions( materializations = context.instance.get_materialized_partitions(
asset_key=AssetKey(asset_key) asset_key=AssetKey(asset_key)
) )
context.log.info("Existing partitions", extra=dict(partitions=materializations)) ic(materializations)
import polars as pl storage_dir = context.resources.polars_parquet_io_manager.base_dir
storage_dir = context.instance.storage_directory()
ic(storage_dir) ic(storage_dir)
for row in ( for row in (
pl.scan_parquet(f"{storage_dir}/{asset_key}/*/*.parquet") pl.scan_parquet(
f"{storage_dir}/{asset_key}/*/*.parquet", extra_columns="ignore"
)
.select(["date", "source"]) .select(["date", "source"])
.unique() .unique()
.collect() .collect()
@@ -49,8 +45,8 @@ def check_partititions(context: OpExecutionContext):
@job @job
def check_partititions_job(): def check_partitions_job():
check_partititions() check_partitions()
musicbrainz_lookup_job = define_asset_job( musicbrainz_lookup_job = define_asset_job(