From 66bcb3e2d3d06fa6b8b407459fa6c736e0a4c22e Mon Sep 17 00:00:00 2001 From: Rik Veenboer Date: Sat, 26 Jul 2025 19:09:06 +0200 Subject: [PATCH] towards detecting new deal --- apps/vinyl/src/assets.py | 55 ++++++++++++++++++++++++++++++++++---- apps/vinyl/src/sensors.py | 16 ----------- apps/vinyl/src/test.py | 11 ++++++-- shared/src/shared/utils.py | 4 +-- 4 files changed, 61 insertions(+), 25 deletions(-) diff --git a/apps/vinyl/src/assets.py b/apps/vinyl/src/assets.py index a9e1fef..d4093cc 100644 --- a/apps/vinyl/src/assets.py +++ b/apps/vinyl/src/assets.py @@ -1,8 +1,10 @@ +from collections.abc import Iterator from datetime import datetime from glob import glob import polars as pl import structlog +from dagster_polars.patito import patito_model_to_dagster_type from models import Deal from plato.fetch import scrape_plato from plato.parse import parse as parse_plato @@ -33,9 +35,6 @@ partitions_mapping = dg.MultiToSingleDimensionPartitionMapping( @dg.asset( io_manager_key="polars_parquet_io_manager", partitions_def=multi_partitions_def, - metadata={ - "partition_by": ["date", "source"], - }, config_schema={"import_dir": dg.Field(str, default_value="/storage/import")}, ) def deals(context: dg.AssetExecutionContext) -> pl.DataFrame: @@ -99,7 +98,7 @@ def deals(context: dg.AssetExecutionContext) -> pl.DataFrame: ) def cleaned_deals( context: dg.AssetExecutionContext, df: pl.LazyFrame -) -> Deal.DataFrame | None: +) -> Deal.DataFrame: """Clean and parse deals from the raw source tables.""" ic() partition_keys = get_partition_keys(context) @@ -113,7 +112,7 @@ def cleaned_deals( parsed_df = parse_sounds(df) case _: context.log.warning(f"Unknown source: {source}!") - return None + return Deal.DataFrame() ic(parsed_df.collect_schema()) @@ -144,3 +143,49 @@ def works(context: dg.AssetExecutionContext) -> pl.DataFrame | None: columns = ["artist", "title", "release"] return pl.concat(dfs, how="vertical_relaxed").select(columns).unique() return None + + +@dg.asset( + io_manager_key="polars_parquet_io_manager", + partitions_def=multi_partitions_def, + ins={ + "partitions": dg.AssetIn( + key=cleaned_deals.key, + partition_mapping=dg.MultiPartitionMapping( + { + "date": dg.DimensionPartitionMapping( + dimension_name="date", + partition_mapping=dg.TimeWindowPartitionMapping( + start_offset=-10, + end_offset=0, + allow_nonexistent_upstream_partitions=True, + ), + ), + "source": dg.DimensionPartitionMapping( + dimension_name="source", + partition_mapping=dg.IdentityPartitionMapping(), + ), + } + ), + ) + }, + output_required=False, + dagster_type=patito_model_to_dagster_type(Deal), +) +def new_deals( + context: dg.AssetExecutionContext, partitions: dict[str, pl.LazyFrame | None] +) -> Iterator[dg.Output[Deal.DataFrame]]: + """Fetch new deals from all sources.""" + ic() + partition_keys = get_partition_keys(context) + ic(partition_keys) + + if len(partition_keys := sorted(partitions.keys())) < 2: + context.log.warning("Not enough partitions to fetch new deals!") + + return + + yield dg.Output(Deal.DataFrame(partitions[partition_keys[-1]].limit(5).collect())) + + +# def good_deals(): ... diff --git a/apps/vinyl/src/sensors.py b/apps/vinyl/src/sensors.py index 55cf3b7..e69de29 100644 --- a/apps/vinyl/src/sensors.py +++ b/apps/vinyl/src/sensors.py @@ -1,16 +0,0 @@ -from assets import deals -from jobs import musicbrainz_lookup_job - -import dagster as dg - - -@dg.asset_sensor( - asset_key=deals.key, - job=musicbrainz_lookup_job, - default_status=dg.DefaultSensorStatus.STOPPED, -) -def musicbrainz_lookup_sensor( - context: dg.SensorEvaluationContext, asset_event: dg.EventLogEntry -): - assert asset_event.dagster_event and asset_event.dagster_event.asset_key - yield dg.RunRequest(run_key=context.cursor) diff --git a/apps/vinyl/src/test.py b/apps/vinyl/src/test.py index 8267295..09c2c24 100644 --- a/apps/vinyl/src/test.py +++ b/apps/vinyl/src/test.py @@ -2,7 +2,7 @@ import logging from datetime import datetime from typing import Any -from assets import cleaned_deals, deals, works +from assets import cleaned_deals, deals, new_deals, works from dagster_polars import PolarsParquetIOManager from definitions import definitions from jobs import check_partitions_job @@ -34,7 +34,7 @@ def test_deals(resources: dict[str, Any], source="sounds", date: str = None): if __name__ == "__main__": - run = 3 + run = 5 resources = { "polars_parquet_io_manager": PolarsParquetIOManager( base_dir="/opt/dagster/storage/vinyl" @@ -61,5 +61,12 @@ if __name__ == "__main__": selection=[works.key], resources=resources, ) + case 5: + dg.materialize( + assets=definitions.assets, + selection=[new_deals.key], + partition_key=f"{today_str()}|{source}", + resources=resources, + ) case _: raise ValueError(f"Invalid run number: {run}!") diff --git a/shared/src/shared/utils.py b/shared/src/shared/utils.py index e191629..158548c 100644 --- a/shared/src/shared/utils.py +++ b/shared/src/shared/utils.py @@ -1,4 +1,4 @@ -from collections.abc import Generator +from collections.abc import Iterator from typing import Mapping import polars as pl @@ -87,7 +87,7 @@ def parse_partition_keys( def load_partitions( context: dg.AssetExecutionContext, asset_key: dg.AssetKey, partitions: set[str] -) -> Generator[pl.DataFrame, None, None]: +) -> Iterator[pl.DataFrame]: """ Load data from an asset for the specified partitions.