diff --git a/apps/vinyl/src/assets.py b/apps/vinyl/src/assets.py index 2a3e882..a9e1fef 100644 --- a/apps/vinyl/src/assets.py +++ b/apps/vinyl/src/assets.py @@ -3,9 +3,10 @@ from glob import glob import polars as pl import structlog +from models import Deal from plato.fetch import scrape_plato from plato.parse import parse as parse_plato -from shared.utils import get_partition_keys, parse_partition_keys +from shared.utils import get_partition_keys, load_partitions from sounds.fetch import fetch_deals from sounds.parse import parse as parse_sounds @@ -37,12 +38,12 @@ partitions_mapping = dg.MultiToSingleDimensionPartitionMapping( }, config_schema={"import_dir": dg.Field(str, default_value="/storage/import")}, ) -def deals(context): +def deals(context: dg.AssetExecutionContext) -> pl.DataFrame: ic() ic(context.partition_key) ic(context.op_config) import_dir = context.op_config["import_dir"] - partition_key = context.partition_key.keys_by_dimension + partition_key = get_partition_keys(context) date_str = partition_key["date"] source = partition_key["source"] logger.info("Materializing deals", date=date_str, source=source) @@ -88,22 +89,6 @@ def deals(context): ) -@dg.asset( - io_manager_key="polars_parquet_io_manager", - partitions_def=daily_partitions_def, - ins={"partitions": dg.AssetIn(key=deals.key, partition_mapping=partitions_mapping)}, - automation_condition=dg.AutomationCondition.eager(), -) -def new_deals( - context: dg.OpExecutionContext, partitions: dict[str, pl.DataFrame] -) -> None: # pl.DataFrame: - """Combine deals from Plato and Sounds into a single DataFrame.""" - ic() - partition_keys = parse_partition_keys(context) - ic(partition_keys) - return - - @dg.asset( io_manager_key="polars_parquet_io_manager", partitions_def=deals.partitions_def, @@ -113,8 +98,9 @@ def new_deals( ), ) def cleaned_deals( - context: dg.OpExecutionContext, df: pl.LazyFrame -) -> pl.DataFrame | None: + context: dg.AssetExecutionContext, df: pl.LazyFrame +) -> Deal.DataFrame | None: + """Clean and parse deals from the raw source tables.""" ic() partition_keys = get_partition_keys(context) ic(partition_keys) @@ -129,22 +115,32 @@ def cleaned_deals( context.log.warning(f"Unknown source: {source}!") return None + ic(parsed_df.collect_schema()) + # Deduplicate and sort the DataFrame columns = ["source", "id", "artist", "title", "price"] - return ( - parsed_df.collect() - .sort("date", descending=True) + return Deal.DataFrame( + parsed_df.sort("date", descending=True) .unique(subset=columns, keep="first") .sort("date", descending=False) .select(*columns, "date", "release", "url") + .collect() ) @dg.asset( - ins={"df": dg.AssetIn(key=new_deals.key)}, + deps=[cleaned_deals], io_manager_key="polars_parquet_io_manager", - automation_condition=dg.AutomationCondition.eager(), + automation_condition=dg.AutomationCondition.on_missing().without( + dg.AutomationCondition.in_latest_time_window() + ), ) -def works(df: pl.DataFrame) -> pl.DataFrame: - columns = ["artist", "title", "release"] - return df[columns].unique() +def works(context: dg.AssetExecutionContext) -> pl.DataFrame | None: + """Aggregate works from cleaned deals.""" + partitions = context.instance.get_materialized_partitions(cleaned_deals.key) + ic(partitions) + dfs = list(load_partitions(context, cleaned_deals.key, partitions)) + if dfs: + columns = ["artist", "title", "release"] + return pl.concat(dfs, how="vertical_relaxed").select(columns).unique() + return None diff --git a/apps/vinyl/src/definitions.py b/apps/vinyl/src/definitions.py index 02e26be..5e4fe12 100644 --- a/apps/vinyl/src/definitions.py +++ b/apps/vinyl/src/definitions.py @@ -1,9 +1,8 @@ import assets from dagster_polars import PolarsParquetIOManager from icecream import install -from jobs import check_partitions_job, deals_job, musicbrainz_lookup_job +from jobs import check_partitions_job, deals_job from schedules import deals_schedule -from sensors import musicbrainz_lookup_sensor import dagster as dg @@ -17,16 +16,8 @@ definitions = dg.Definitions( for asset in dg.load_assets_from_modules([assets]) ], resources={ - "polars_parquet_io_manager": PolarsParquetIOManager(base_dir="/storage"), + "polars_parquet_io_manager": PolarsParquetIOManager(base_dir="/storage/vinyl"), }, - jobs=[deals_job, check_partitions_job, musicbrainz_lookup_job], + jobs=[deals_job, check_partitions_job], schedules=[deals_schedule], - sensors=[ - # dg.AutomationConditionSensorDefinition( - # "run_tags_automation_condition_sensor", - # target=dg.AssetSelection.all(), - # default_status=dg.DefaultSensorStatus.RUNNING, - # ), - musicbrainz_lookup_sensor, - ], ) diff --git a/apps/vinyl/src/jobs.py b/apps/vinyl/src/jobs.py index 9ac912c..732b1e2 100644 --- a/apps/vinyl/src/jobs.py +++ b/apps/vinyl/src/jobs.py @@ -1,5 +1,5 @@ import polars as pl -from assets import deals, new_deals, works +from assets import deals import dagster as dg @@ -12,11 +12,11 @@ deals_job = dg.define_asset_job( def check_partitions(context: dg.OpExecutionContext): asset_key = "deals" - # Fetch the materializations for the asset key - materializations = context.instance.get_materialized_partitions( + # Fetch the materialized partitions for the asset key + materialized_partitions = context.instance.get_materialized_partitions( asset_key=dg.AssetKey(asset_key) ) - ic(materializations) + ic(materialized_partitions) storage_dir = context.resources.polars_parquet_io_manager.base_dir ic(storage_dir) @@ -30,7 +30,7 @@ def check_partitions(context: dg.OpExecutionContext): .iter_rows() ): partition = "|".join(row) - if partition not in materializations: + if partition not in materialized_partitions: context.log.info(f"Missing partition: {partition}") context.log_event( dg.AssetMaterialization(asset_key=asset_key, partition=partition) @@ -40,8 +40,3 @@ def check_partitions(context: dg.OpExecutionContext): @dg.job def check_partitions_job(): check_partitions() - - -musicbrainz_lookup_job = dg.define_asset_job( - "musicbrainz_lookup_job", selection=[works.key, new_deals.key] -) diff --git a/apps/vinyl/src/models.py b/apps/vinyl/src/models.py new file mode 100644 index 0000000..0308fe4 --- /dev/null +++ b/apps/vinyl/src/models.py @@ -0,0 +1,14 @@ +import datetime + +import patito as pt + + +class Deal(pt.Model): + source: str = pt.Field(description="Source of the deal, e.g., 'plato' or 'sounds'.") + id: str = pt.Field(description="Unique identifier that is used at the source.") + artist: str = pt.Field(description="Artist.") + title: str = pt.Field(description="Title of the deal.") + url: str = pt.Field(description="URL to the deal.") + date: datetime.date = pt.Field(description="Day the deal was listed.") + release: datetime.date = pt.Field(description="Release date.") + price: float = pt.Field(description="Price of the deal in EUR.") diff --git a/apps/vinyl/src/test.py b/apps/vinyl/src/test.py index 4365a94..8267295 100644 --- a/apps/vinyl/src/test.py +++ b/apps/vinyl/src/test.py @@ -1,16 +1,15 @@ import logging -import warnings from datetime import datetime from typing import Any -from assets import cleaned_deals, deals, new_deals +from assets import cleaned_deals, deals, works from dagster_polars import PolarsParquetIOManager from definitions import definitions from jobs import check_partitions_job import dagster as dg -warnings.filterwarnings("ignore", category=dg.ExperimentalWarning) +# warnings.filterwarnings("ignore", category=dg.Ex) logging.getLogger().setLevel(logging.INFO) @@ -35,10 +34,10 @@ def test_deals(resources: dict[str, Any], source="sounds", date: str = None): if __name__ == "__main__": - run = 4 + run = 3 resources = { "polars_parquet_io_manager": PolarsParquetIOManager( - base_dir="/opt/dagster/storage" + base_dir="/opt/dagster/storage/vinyl" ) } source = "sounds" # or "plato" @@ -48,19 +47,19 @@ if __name__ == "__main__": check_partitions_job.execute_in_process(resources=resources) case 2: test_deals(resources, source=source) + case 3: - dg.materialize( - assets=definitions.assets, - selection=[new_deals.key], - partition_key=today_str(), - resources=resources, - ) - case 4: dg.materialize( assets=definitions.assets, selection=[cleaned_deals.key], partition_key=f"{today_str()}|{source}", resources=resources, ) + case 4: + dg.materialize( + assets=definitions.assets, + selection=[works.key], + resources=resources, + ) case _: - raise ValueError("Invalid run number") + raise ValueError(f"Invalid run number: {run}!")