diff --git a/apps/vinyl/src/assets.py b/apps/vinyl/src/assets.py index 570a840..da3071f 100644 --- a/apps/vinyl/src/assets.py +++ b/apps/vinyl/src/assets.py @@ -9,35 +9,27 @@ from plato.fetch import scrape_plato from sounds.fetch import fetch_deals from utils import parse_date -from dagster import ( - DailyPartitionsDefinition, - Failure, - Field, - MultiPartitionsDefinition, - OpExecutionContext, - StaticPartitionsDefinition, - asset, -) +import dagster as dg SOURCES = ["plato", "sounds"] logger = structlog.get_logger() -partitions_def = MultiPartitionsDefinition( +partitions_def = dg.MultiPartitionsDefinition( { - "date": DailyPartitionsDefinition(start_date="2024-09-01", end_offset=1), - "source": StaticPartitionsDefinition(SOURCES), + "date": dg.DailyPartitionsDefinition(start_date="2024-09-01", end_offset=1), + "source": dg.StaticPartitionsDefinition(SOURCES), } ) -@asset( +@dg.asset( io_manager_key="polars_parquet_io_manager", partitions_def=partitions_def, metadata={ "partition_by": ["date", "source"], }, - config_schema={"import_dir": Field(str, default_value="/storage/import")}, + config_schema={"import_dir": dg.Field(str, default_value="/storage/import")}, ) def deals(context): ic() @@ -53,7 +45,7 @@ def deals(context): days = (date - datetime.today()).days ic(days) if days > 0: - raise Failure(f"Cannot materialize for the future: {date.date()}") + raise dg.Failure(f"Cannot materialize for the future: {date.date()}") if days < -1: if source == "sounds": pattern = f"{import_dir}/{date.date()}_*_sounds.csv" @@ -70,7 +62,7 @@ def deals(context): ) except Exception as e: logger.error("Failed to load CSV file!", error=e) - raise Failure(f"Cannot materialize for the past: {date.date()}") + raise dg.Failure(f"Cannot materialize for the past: {date.date()}") if source == "plato": logger.info("Scraping Plato") @@ -90,8 +82,13 @@ def deals(context): ) -@asset(deps=[deals], io_manager_key="polars_parquet_io_manager") -def new_deals(context: OpExecutionContext) -> pl.DataFrame: +@dg.asset( + deps=[deals.key], + ins={"df": dg.AssetIn(key=deals.key)}, + automation_condition=dg.AutomationCondition.eager(), + io_manager_key="polars_parquet_io_manager", +) +def new_deals(context: dg.OpExecutionContext) -> pl.DataFrame: """Combine deals from Plato and Sounds into a single DataFrame.""" ic() storage_dir = context.resources.polars_parquet_io_manager.base_dir @@ -160,17 +157,12 @@ def new_deals(context: OpExecutionContext) -> pl.DataFrame: ).pl() -@asset( +@dg.asset( + deps=[new_deals.key], + ins={"df": dg.AssetIn(key=new_deals.key)}, io_manager_key="polars_parquet_io_manager", + automation_condition=dg.AutomationCondition.eager(), ) -def works(new_deals: pl.DataFrame) -> pl.DataFrame: - # Pandas - # columns = ["artist", "title"] - # return pl.from_pandas(new_deals[columns].to_pandas().drop_duplicates()) - - # Polars - # return new_deals[columns].unique(subset=columns) - - # DuckDB - with duckdb.connect() as con: - return con.execute("SELECT DISTINCT artist, title, release FROM new_deals").pl() +def works(df: pl.DataFrame) -> pl.DataFrame: + columns = ["artist", "title", "release"] + return df[columns].unique() diff --git a/apps/vinyl/src/definitions.py b/apps/vinyl/src/definitions.py index 553013f..762646f 100644 --- a/apps/vinyl/src/definitions.py +++ b/apps/vinyl/src/definitions.py @@ -10,7 +10,7 @@ from jobs import check_partitions_job, deals_job, musicbrainz_lookup_job from schedules import deals_schedule from sensors import musicbrainz_lookup_sensor -from dagster import Definitions, load_assets_from_modules +import dagster as dg class PandasDuckDBIOManager(DuckDBIOManager): @@ -20,13 +20,13 @@ class PandasDuckDBIOManager(DuckDBIOManager): install() -definitions = Definitions( +definitions = dg.Definitions( assets=[ asset.with_attributes( group_names_by_key={asset.key: "vinyl"}, tags_by_key={asset.key: {"app": "vinyl"}}, ) - for asset in load_assets_from_modules([assets]) + for asset in dg.load_assets_from_modules([assets]) ], resources={ "polars_parquet_io_manager": PolarsParquetIOManager(base_dir="/storage"), @@ -34,5 +34,12 @@ definitions = Definitions( }, jobs=[deals_job, check_partitions_job, musicbrainz_lookup_job], schedules=[deals_schedule], - sensors=[musicbrainz_lookup_sensor], + sensors=[ + dg.AutomationConditionSensorDefinition( + "run_tags_automation_condition_sensor", + target=dg.AssetSelection.all(), + default_status=dg.DefaultSensorStatus.RUNNING, + ), + musicbrainz_lookup_sensor, + ], ) diff --git a/apps/vinyl/src/jobs.py b/apps/vinyl/src/jobs.py index dfffb10..c560099 100644 --- a/apps/vinyl/src/jobs.py +++ b/apps/vinyl/src/jobs.py @@ -1,27 +1,20 @@ import polars as pl from assets import deals, new_deals, works -from dagster import ( - AssetKey, - AssetMaterialization, - OpExecutionContext, - define_asset_job, - job, - op, -) +import dagster as dg -deals_job = define_asset_job( - "deals_job", selection=[deals], partitions_def=deals.partitions_def +deals_job = dg.define_asset_job( + "deals_job", selection=[deals.key], partitions_def=deals.partitions_def ) -@op(required_resource_keys={"polars_parquet_io_manager"}) -def check_partitions(context: OpExecutionContext): +@dg.op(required_resource_keys={"polars_parquet_io_manager"}) +def check_partitions(context: dg.OpExecutionContext): asset_key = "deals" # Fetch the materializations for the asset key materializations = context.instance.get_materialized_partitions( - asset_key=AssetKey(asset_key) + asset_key=dg.AssetKey(asset_key) ) ic(materializations) @@ -40,15 +33,15 @@ def check_partitions(context: OpExecutionContext): if partition not in materializations: context.log.info(f"Missing partition: {partition}") context.log_event( - AssetMaterialization(asset_key=asset_key, partition=partition) + dg.AssetMaterialization(asset_key=asset_key, partition=partition) ) -@job +@dg.job def check_partitions_job(): check_partitions() -musicbrainz_lookup_job = define_asset_job( - "musicbrainz_lookup_job", selection=[works, new_deals] +musicbrainz_lookup_job = dg.define_asset_job( + "musicbrainz_lookup_job", selection=[works.key, new_deals.key] ) diff --git a/apps/vinyl/src/schedules.py b/apps/vinyl/src/schedules.py index 3947793..928a2f1 100644 --- a/apps/vinyl/src/schedules.py +++ b/apps/vinyl/src/schedules.py @@ -1,10 +1,9 @@ from jobs import deals_job -from dagster import DefaultScheduleStatus, build_schedule_from_partitioned_job +import dagster as dg -deals_schedule = build_schedule_from_partitioned_job( +deals_schedule = dg.build_schedule_from_partitioned_job( job=deals_job, hour_of_day=7, - # execution_timezone="Europe/Amsterdam", - default_status=DefaultScheduleStatus.RUNNING, + default_status=dg.DefaultScheduleStatus.RUNNING, ) diff --git a/apps/vinyl/src/sensors.py b/apps/vinyl/src/sensors.py index dc57086..55cf3b7 100644 --- a/apps/vinyl/src/sensors.py +++ b/apps/vinyl/src/sensors.py @@ -1,22 +1,16 @@ from assets import deals from jobs import musicbrainz_lookup_job -from dagster import ( - DefaultSensorStatus, - EventLogEntry, - RunRequest, - SensorEvaluationContext, - asset_sensor, -) +import dagster as dg -@asset_sensor( +@dg.asset_sensor( asset_key=deals.key, job=musicbrainz_lookup_job, - default_status=DefaultSensorStatus.RUNNING, + default_status=dg.DefaultSensorStatus.STOPPED, ) def musicbrainz_lookup_sensor( - context: SensorEvaluationContext, asset_event: EventLogEntry + context: dg.SensorEvaluationContext, asset_event: dg.EventLogEntry ): assert asset_event.dagster_event and asset_event.dagster_event.asset_key - yield RunRequest(run_key=context.cursor) + yield dg.RunRequest(run_key=context.cursor)