implement schema check and use automation instead of sensor

This commit is contained in:
2025-07-26 17:53:12 +02:00
parent ac8759258d
commit 8d06b236b7
5 changed files with 59 additions and 64 deletions

View File

@@ -3,9 +3,10 @@ from glob import glob
import polars as pl import polars as pl
import structlog import structlog
from models import Deal
from plato.fetch import scrape_plato from plato.fetch import scrape_plato
from plato.parse import parse as parse_plato from plato.parse import parse as parse_plato
from shared.utils import get_partition_keys, parse_partition_keys from shared.utils import get_partition_keys, load_partitions
from sounds.fetch import fetch_deals from sounds.fetch import fetch_deals
from sounds.parse import parse as parse_sounds from sounds.parse import parse as parse_sounds
@@ -37,12 +38,12 @@ partitions_mapping = dg.MultiToSingleDimensionPartitionMapping(
}, },
config_schema={"import_dir": dg.Field(str, default_value="/storage/import")}, config_schema={"import_dir": dg.Field(str, default_value="/storage/import")},
) )
def deals(context): def deals(context: dg.AssetExecutionContext) -> pl.DataFrame:
ic() ic()
ic(context.partition_key) ic(context.partition_key)
ic(context.op_config) ic(context.op_config)
import_dir = context.op_config["import_dir"] import_dir = context.op_config["import_dir"]
partition_key = context.partition_key.keys_by_dimension partition_key = get_partition_keys(context)
date_str = partition_key["date"] date_str = partition_key["date"]
source = partition_key["source"] source = partition_key["source"]
logger.info("Materializing deals", date=date_str, source=source) logger.info("Materializing deals", date=date_str, source=source)
@@ -88,22 +89,6 @@ def deals(context):
) )
@dg.asset(
io_manager_key="polars_parquet_io_manager",
partitions_def=daily_partitions_def,
ins={"partitions": dg.AssetIn(key=deals.key, partition_mapping=partitions_mapping)},
automation_condition=dg.AutomationCondition.eager(),
)
def new_deals(
context: dg.OpExecutionContext, partitions: dict[str, pl.DataFrame]
) -> None: # pl.DataFrame:
"""Combine deals from Plato and Sounds into a single DataFrame."""
ic()
partition_keys = parse_partition_keys(context)
ic(partition_keys)
return
@dg.asset( @dg.asset(
io_manager_key="polars_parquet_io_manager", io_manager_key="polars_parquet_io_manager",
partitions_def=deals.partitions_def, partitions_def=deals.partitions_def,
@@ -113,8 +98,9 @@ def new_deals(
), ),
) )
def cleaned_deals( def cleaned_deals(
context: dg.OpExecutionContext, df: pl.LazyFrame context: dg.AssetExecutionContext, df: pl.LazyFrame
) -> pl.DataFrame | None: ) -> Deal.DataFrame | None:
"""Clean and parse deals from the raw source tables."""
ic() ic()
partition_keys = get_partition_keys(context) partition_keys = get_partition_keys(context)
ic(partition_keys) ic(partition_keys)
@@ -129,22 +115,32 @@ def cleaned_deals(
context.log.warning(f"Unknown source: {source}!") context.log.warning(f"Unknown source: {source}!")
return None return None
ic(parsed_df.collect_schema())
# Deduplicate and sort the DataFrame # Deduplicate and sort the DataFrame
columns = ["source", "id", "artist", "title", "price"] columns = ["source", "id", "artist", "title", "price"]
return ( return Deal.DataFrame(
parsed_df.collect() parsed_df.sort("date", descending=True)
.sort("date", descending=True)
.unique(subset=columns, keep="first") .unique(subset=columns, keep="first")
.sort("date", descending=False) .sort("date", descending=False)
.select(*columns, "date", "release", "url") .select(*columns, "date", "release", "url")
.collect()
) )
@dg.asset( @dg.asset(
ins={"df": dg.AssetIn(key=new_deals.key)}, deps=[cleaned_deals],
io_manager_key="polars_parquet_io_manager", io_manager_key="polars_parquet_io_manager",
automation_condition=dg.AutomationCondition.eager(), automation_condition=dg.AutomationCondition.on_missing().without(
dg.AutomationCondition.in_latest_time_window()
),
) )
def works(df: pl.DataFrame) -> pl.DataFrame: def works(context: dg.AssetExecutionContext) -> pl.DataFrame | None:
columns = ["artist", "title", "release"] """Aggregate works from cleaned deals."""
return df[columns].unique() partitions = context.instance.get_materialized_partitions(cleaned_deals.key)
ic(partitions)
dfs = list(load_partitions(context, cleaned_deals.key, partitions))
if dfs:
columns = ["artist", "title", "release"]
return pl.concat(dfs, how="vertical_relaxed").select(columns).unique()
return None

View File

@@ -1,9 +1,8 @@
import assets import assets
from dagster_polars import PolarsParquetIOManager from dagster_polars import PolarsParquetIOManager
from icecream import install from icecream import install
from jobs import check_partitions_job, deals_job, musicbrainz_lookup_job from jobs import check_partitions_job, deals_job
from schedules import deals_schedule from schedules import deals_schedule
from sensors import musicbrainz_lookup_sensor
import dagster as dg import dagster as dg
@@ -17,16 +16,8 @@ definitions = dg.Definitions(
for asset in dg.load_assets_from_modules([assets]) for asset in dg.load_assets_from_modules([assets])
], ],
resources={ resources={
"polars_parquet_io_manager": PolarsParquetIOManager(base_dir="/storage"), "polars_parquet_io_manager": PolarsParquetIOManager(base_dir="/storage/vinyl"),
}, },
jobs=[deals_job, check_partitions_job, musicbrainz_lookup_job], jobs=[deals_job, check_partitions_job],
schedules=[deals_schedule], schedules=[deals_schedule],
sensors=[
# dg.AutomationConditionSensorDefinition(
# "run_tags_automation_condition_sensor",
# target=dg.AssetSelection.all(),
# default_status=dg.DefaultSensorStatus.RUNNING,
# ),
musicbrainz_lookup_sensor,
],
) )

View File

@@ -1,5 +1,5 @@
import polars as pl import polars as pl
from assets import deals, new_deals, works from assets import deals
import dagster as dg import dagster as dg
@@ -12,11 +12,11 @@ deals_job = dg.define_asset_job(
def check_partitions(context: dg.OpExecutionContext): def check_partitions(context: dg.OpExecutionContext):
asset_key = "deals" asset_key = "deals"
# Fetch the materializations for the asset key # Fetch the materialized partitions for the asset key
materializations = context.instance.get_materialized_partitions( materialized_partitions = context.instance.get_materialized_partitions(
asset_key=dg.AssetKey(asset_key) asset_key=dg.AssetKey(asset_key)
) )
ic(materializations) ic(materialized_partitions)
storage_dir = context.resources.polars_parquet_io_manager.base_dir storage_dir = context.resources.polars_parquet_io_manager.base_dir
ic(storage_dir) ic(storage_dir)
@@ -30,7 +30,7 @@ def check_partitions(context: dg.OpExecutionContext):
.iter_rows() .iter_rows()
): ):
partition = "|".join(row) partition = "|".join(row)
if partition not in materializations: if partition not in materialized_partitions:
context.log.info(f"Missing partition: {partition}") context.log.info(f"Missing partition: {partition}")
context.log_event( context.log_event(
dg.AssetMaterialization(asset_key=asset_key, partition=partition) dg.AssetMaterialization(asset_key=asset_key, partition=partition)
@@ -40,8 +40,3 @@ def check_partitions(context: dg.OpExecutionContext):
@dg.job @dg.job
def check_partitions_job(): def check_partitions_job():
check_partitions() check_partitions()
musicbrainz_lookup_job = dg.define_asset_job(
"musicbrainz_lookup_job", selection=[works.key, new_deals.key]
)

14
apps/vinyl/src/models.py Normal file
View File

@@ -0,0 +1,14 @@
import datetime
import patito as pt
class Deal(pt.Model):
source: str = pt.Field(description="Source of the deal, e.g., 'plato' or 'sounds'.")
id: str = pt.Field(description="Unique identifier that is used at the source.")
artist: str = pt.Field(description="Artist.")
title: str = pt.Field(description="Title of the deal.")
url: str = pt.Field(description="URL to the deal.")
date: datetime.date = pt.Field(description="Day the deal was listed.")
release: datetime.date = pt.Field(description="Release date.")
price: float = pt.Field(description="Price of the deal in EUR.")

View File

@@ -1,16 +1,15 @@
import logging import logging
import warnings
from datetime import datetime from datetime import datetime
from typing import Any from typing import Any
from assets import cleaned_deals, deals, new_deals from assets import cleaned_deals, deals, works
from dagster_polars import PolarsParquetIOManager from dagster_polars import PolarsParquetIOManager
from definitions import definitions from definitions import definitions
from jobs import check_partitions_job from jobs import check_partitions_job
import dagster as dg import dagster as dg
warnings.filterwarnings("ignore", category=dg.ExperimentalWarning) # warnings.filterwarnings("ignore", category=dg.Ex)
logging.getLogger().setLevel(logging.INFO) logging.getLogger().setLevel(logging.INFO)
@@ -35,10 +34,10 @@ def test_deals(resources: dict[str, Any], source="sounds", date: str = None):
if __name__ == "__main__": if __name__ == "__main__":
run = 4 run = 3
resources = { resources = {
"polars_parquet_io_manager": PolarsParquetIOManager( "polars_parquet_io_manager": PolarsParquetIOManager(
base_dir="/opt/dagster/storage" base_dir="/opt/dagster/storage/vinyl"
) )
} }
source = "sounds" # or "plato" source = "sounds" # or "plato"
@@ -48,19 +47,19 @@ if __name__ == "__main__":
check_partitions_job.execute_in_process(resources=resources) check_partitions_job.execute_in_process(resources=resources)
case 2: case 2:
test_deals(resources, source=source) test_deals(resources, source=source)
case 3: case 3:
dg.materialize(
assets=definitions.assets,
selection=[new_deals.key],
partition_key=today_str(),
resources=resources,
)
case 4:
dg.materialize( dg.materialize(
assets=definitions.assets, assets=definitions.assets,
selection=[cleaned_deals.key], selection=[cleaned_deals.key],
partition_key=f"{today_str()}|{source}", partition_key=f"{today_str()}|{source}",
resources=resources, resources=resources,
) )
case 4:
dg.materialize(
assets=definitions.assets,
selection=[works.key],
resources=resources,
)
case _: case _:
raise ValueError("Invalid run number") raise ValueError(f"Invalid run number: {run}!")