towards detecting new deal

This commit is contained in:
2025-07-26 19:09:06 +02:00
parent 8d06b236b7
commit 66bcb3e2d3
4 changed files with 61 additions and 25 deletions

View File

@@ -1,8 +1,10 @@
from collections.abc import Iterator
from datetime import datetime from datetime import datetime
from glob import glob from glob import glob
import polars as pl import polars as pl
import structlog import structlog
from dagster_polars.patito import patito_model_to_dagster_type
from models import Deal from models import Deal
from plato.fetch import scrape_plato from plato.fetch import scrape_plato
from plato.parse import parse as parse_plato from plato.parse import parse as parse_plato
@@ -33,9 +35,6 @@ partitions_mapping = dg.MultiToSingleDimensionPartitionMapping(
@dg.asset( @dg.asset(
io_manager_key="polars_parquet_io_manager", io_manager_key="polars_parquet_io_manager",
partitions_def=multi_partitions_def, partitions_def=multi_partitions_def,
metadata={
"partition_by": ["date", "source"],
},
config_schema={"import_dir": dg.Field(str, default_value="/storage/import")}, config_schema={"import_dir": dg.Field(str, default_value="/storage/import")},
) )
def deals(context: dg.AssetExecutionContext) -> pl.DataFrame: def deals(context: dg.AssetExecutionContext) -> pl.DataFrame:
@@ -99,7 +98,7 @@ def deals(context: dg.AssetExecutionContext) -> pl.DataFrame:
) )
def cleaned_deals( def cleaned_deals(
context: dg.AssetExecutionContext, df: pl.LazyFrame context: dg.AssetExecutionContext, df: pl.LazyFrame
) -> Deal.DataFrame | None: ) -> Deal.DataFrame:
"""Clean and parse deals from the raw source tables.""" """Clean and parse deals from the raw source tables."""
ic() ic()
partition_keys = get_partition_keys(context) partition_keys = get_partition_keys(context)
@@ -113,7 +112,7 @@ def cleaned_deals(
parsed_df = parse_sounds(df) parsed_df = parse_sounds(df)
case _: case _:
context.log.warning(f"Unknown source: {source}!") context.log.warning(f"Unknown source: {source}!")
return None return Deal.DataFrame()
ic(parsed_df.collect_schema()) ic(parsed_df.collect_schema())
@@ -144,3 +143,49 @@ def works(context: dg.AssetExecutionContext) -> pl.DataFrame | None:
columns = ["artist", "title", "release"] columns = ["artist", "title", "release"]
return pl.concat(dfs, how="vertical_relaxed").select(columns).unique() return pl.concat(dfs, how="vertical_relaxed").select(columns).unique()
return None return None
@dg.asset(
io_manager_key="polars_parquet_io_manager",
partitions_def=multi_partitions_def,
ins={
"partitions": dg.AssetIn(
key=cleaned_deals.key,
partition_mapping=dg.MultiPartitionMapping(
{
"date": dg.DimensionPartitionMapping(
dimension_name="date",
partition_mapping=dg.TimeWindowPartitionMapping(
start_offset=-10,
end_offset=0,
allow_nonexistent_upstream_partitions=True,
),
),
"source": dg.DimensionPartitionMapping(
dimension_name="source",
partition_mapping=dg.IdentityPartitionMapping(),
),
}
),
)
},
output_required=False,
dagster_type=patito_model_to_dagster_type(Deal),
)
def new_deals(
context: dg.AssetExecutionContext, partitions: dict[str, pl.LazyFrame | None]
) -> Iterator[dg.Output[Deal.DataFrame]]:
"""Fetch new deals from all sources."""
ic()
partition_keys = get_partition_keys(context)
ic(partition_keys)
if len(partition_keys := sorted(partitions.keys())) < 2:
context.log.warning("Not enough partitions to fetch new deals!")
return
yield dg.Output(Deal.DataFrame(partitions[partition_keys[-1]].limit(5).collect()))
# def good_deals(): ...

View File

@@ -1,16 +0,0 @@
from assets import deals
from jobs import musicbrainz_lookup_job
import dagster as dg
@dg.asset_sensor(
asset_key=deals.key,
job=musicbrainz_lookup_job,
default_status=dg.DefaultSensorStatus.STOPPED,
)
def musicbrainz_lookup_sensor(
context: dg.SensorEvaluationContext, asset_event: dg.EventLogEntry
):
assert asset_event.dagster_event and asset_event.dagster_event.asset_key
yield dg.RunRequest(run_key=context.cursor)

View File

@@ -2,7 +2,7 @@ import logging
from datetime import datetime from datetime import datetime
from typing import Any from typing import Any
from assets import cleaned_deals, deals, works from assets import cleaned_deals, deals, new_deals, works
from dagster_polars import PolarsParquetIOManager from dagster_polars import PolarsParquetIOManager
from definitions import definitions from definitions import definitions
from jobs import check_partitions_job from jobs import check_partitions_job
@@ -34,7 +34,7 @@ def test_deals(resources: dict[str, Any], source="sounds", date: str = None):
if __name__ == "__main__": if __name__ == "__main__":
run = 3 run = 5
resources = { resources = {
"polars_parquet_io_manager": PolarsParquetIOManager( "polars_parquet_io_manager": PolarsParquetIOManager(
base_dir="/opt/dagster/storage/vinyl" base_dir="/opt/dagster/storage/vinyl"
@@ -61,5 +61,12 @@ if __name__ == "__main__":
selection=[works.key], selection=[works.key],
resources=resources, resources=resources,
) )
case 5:
dg.materialize(
assets=definitions.assets,
selection=[new_deals.key],
partition_key=f"{today_str()}|{source}",
resources=resources,
)
case _: case _:
raise ValueError(f"Invalid run number: {run}!") raise ValueError(f"Invalid run number: {run}!")

View File

@@ -1,4 +1,4 @@
from collections.abc import Generator from collections.abc import Iterator
from typing import Mapping from typing import Mapping
import polars as pl import polars as pl
@@ -87,7 +87,7 @@ def parse_partition_keys(
def load_partitions( def load_partitions(
context: dg.AssetExecutionContext, asset_key: dg.AssetKey, partitions: set[str] context: dg.AssetExecutionContext, asset_key: dg.AssetKey, partitions: set[str]
) -> Generator[pl.DataFrame, None, None]: ) -> Iterator[pl.DataFrame]:
""" """
Load data from an asset for the specified partitions. Load data from an asset for the specified partitions.