import os from collections.abc import Iterator from datetime import datetime from functools import partial from glob import glob from types import SimpleNamespace import polars as pl import structlog from config import APP from dagster_polars.patito import patito_model_to_dagster_type from jinja2 import Environment, FileSystemLoader from models import Deal from partitions import daily_partitions_def, multi_partitions_def from platenzaak.parse import parse as parse_platenzaak from platenzaak.scrape import scrape as scrape_platenzaak from plato.parse import parse as parse_plato from plato.scrape import scrape as scrape_plato from shared.utils import get_partition_keys, parse_partition_keys from sounds.parse import parse as parse_sounds from sounds.scrape import scrape as scrape_sounds from structlog.stdlib import BoundLogger from utils.email import EmailService import dagster as dg asset = partial(dg.asset, key_prefix=APP) logger: BoundLogger = structlog.get_logger() @asset( io_manager_key="polars_parquet_io_manager", partitions_def=multi_partitions_def, config_schema={"import_dir": dg.Field(str, default_value="/storage/import")}, ) def deals(context: dg.AssetExecutionContext) -> pl.DataFrame: ic() ic(context.partition_key) ic(context.op_config) import_dir = context.op_config["import_dir"] partition_key = get_partition_keys(context) date_str = partition_key["date"] source = partition_key["source"] logger.info("Materializing deals", date=date_str, source=source) date = datetime.strptime(partition_key["date"], "%Y-%m-%d") days = (date - datetime.today()).days ic(days) if days > 0: raise dg.Failure(f"Cannot materialize for the future: {date.date()}") if days < -1: if source == "sounds": pattern = f"{import_dir}/{date.date()}_*_sounds.csv" logger.info("Looking for existing CSV files", pattern=pattern) files = glob(pattern) if len(files): file = sorted(files)[-1] logger.info("Using existing CSV file", file=file) try: df = pl.read_csv(file) logger.info("Loaded CSV file", rows=len(df)) return df.with_columns( **{k: pl.lit(v) for k, v in partition_key.items()} ) except Exception as e: logger.error("Failed to load CSV file!", error=e) raise dg.Failure(f"Cannot materialize for the past: {date.date()}") match source: case "plato": logger.info("Scraping Plato") df = scrape_plato() logger.info("Scraped Plato", rows=len(df), head=df.head().to_markdown()) case "sounds": logger.info("Scraping Sounds") df = scrape_sounds() logger.info("Scraped Sounds", rows=len(df), head=df.head().to_markdown()) case "platenzaak": logger.info("Scraping Platenzaak") df = scrape_platenzaak(logger=logger) logger.info("Scraped Sounds", rows=len(df), head=df.head().to_markdown()) case _: raise ValueError(f"Unknown source: {source}!") ic(df.columns) return pl.from_pandas(df.assign(**partition_key)) @asset( io_manager_key="polars_parquet_io_manager", partitions_def=deals.partitions_def, ins={"df": dg.AssetIn(key=deals.key)}, automation_condition=dg.AutomationCondition.eager(), output_required=False, ) def cleaned_deals( context: dg.AssetExecutionContext, df: pl.LazyFrame | None ) -> Iterator[dg.Output[Deal.DataFrame]]: """Clean and parse deals from the raw source tables.""" if df is None: return ic() partition_keys = get_partition_keys(context) ic(partition_keys) # Specific parsing for each source match source := partition_keys["source"]: case "plato": parsed_df = parse_plato(df) case "sounds": parsed_df = parse_sounds(df) case "platenzaak": parsed_df = parse_platenzaak(df) case _: raise ValueError(f"Unknown source: {source}!") ic(parsed_df.collect_schema()) # Deduplicate and sort the DataFrame columns = ["source", "id", "artist", "title", "price"] yield dg.Output( Deal.DataFrame( parsed_df.sort("date", descending=True) .unique(subset=columns, keep="first") .sort("date", descending=False) .select(*columns, "date", "release", "url") .collect() ) ) @asset( deps=[cleaned_deals], io_manager_key="polars_parquet_io_manager", automation_condition=dg.AutomationCondition.eager(), output_required=False, ) def works(context: dg.AssetExecutionContext) -> Iterator[dg.Output[pl.DataFrame]]: """Aggregate works from cleaned deals.""" partitions = context.instance.get_materialized_partitions(cleaned_deals.key) ic(partitions) logger.info("Works", partitions=partitions) path = os.path.join( context.resources.polars_parquet_io_manager.base_dir, *cleaned_deals.key.path ) ic(path) yield dg.Output( pl.scan_parquet(path).select(["artist", "title", "release"]).unique().collect() ) @asset( io_manager_key="polars_parquet_io_manager", partitions_def=multi_partitions_def, ins={ "partitions": dg.AssetIn( key=cleaned_deals.key, partition_mapping=dg.MultiPartitionMapping( { "date": dg.DimensionPartitionMapping( dimension_name="date", partition_mapping=dg.TimeWindowPartitionMapping( start_offset=-3, end_offset=0, allow_nonexistent_upstream_partitions=True, ), ), "source": dg.DimensionPartitionMapping( dimension_name="source", partition_mapping=dg.IdentityPartitionMapping(), ), } ), ) }, output_required=False, dagster_type=patito_model_to_dagster_type(Deal), automation_condition=dg.AutomationCondition.eager(), ) def new_deals( context: dg.AssetExecutionContext, partitions: dict[str, pl.LazyFrame | None] ) -> Iterator[dg.Output[Deal.DataFrame]]: """Fetch new deals from all sources.""" ic() ic(partitions.keys()) if not (partitions := {k: v for k, v in partitions.items() if v is not None}): return ic(partitions.keys()) partition_keys = get_partition_keys(context) ic(partition_keys.keys()) parsed_partition_keys = parse_partition_keys(context, "partitions") ic(parsed_partition_keys) if len(partition_keys := sorted(partitions.keys())) < 2: context.log.warning("Not enough partitions to fetch new deals!") return before, after = partition_keys[-2:] before_str, after_str = [ parsed_partition_keys[partition_key]["date"] for partition_key in (before, after) ] df_before = partitions[before] df_after = partitions[after] num_rows_before, num_rows_after = [ df.select(pl.len()).collect().item() for df in (df_before, df_after) ] context.log.info( f"Fetching new deals between {before_str} ({num_rows_before}) and {after_str} ({num_rows_after})" ) new_df = df_after.join(df_before.select("id"), on="id", how="anti").collect() if new_df.height: context.log.info(f"New deals found ({new_df.height}x)!") yield dg.Output( Deal.DataFrame(new_df.with_columns(pl.col("release").cast(pl.Date))) ) else: context.log.info("No new deals found!") @asset( io_manager_key="polars_parquet_io_manager", partitions_def=daily_partitions_def, metadata={ "partition_by": ["date", "source", "release"], }, ins={"partitions": dg.AssetIn(key=new_deals.key)}, output_required=False, automation_condition=dg.AutomationCondition.eager(), ) def good_deals( context: dg.AssetExecutionContext, email_service: EmailService, partitions: dict[str, pl.LazyFrame], ) -> Iterator[dg.Output[Deal.DataFrame]]: parsed_partition_keys = parse_partition_keys(context, "partitions") ic(parsed_partition_keys) df = pl.concat(partitions.values(), how="vertical_relaxed").collect() counts = dict(df.group_by("source").len().iter_rows()) logger.info(f"Processing new deals ({df.height}x).", counts=counts) filtered_df = df.filter(pl.col("price") <= 25).sort(["artist", "title"]) if filtered_df.is_empty(): logger.info("No good deals found!") return logger.info(f"Good deals found ({filtered_df.height}x)!", counts=counts) yield dg.Output(Deal.DataFrame(filtered_df)) # Prepare data for email deals: dict[str, list[SimpleNamespace]] = {} for source in filtered_df.select("source").unique().to_series(): group_df = filtered_df.filter(pl.col("source") == source) deals[source] = [ SimpleNamespace(**row) for row in group_df.head(10).iter_rows(named=True) ] # Render HTML from Jinja template env = Environment(loader=FileSystemLoader(f"/code/apps/{APP}")) template = env.get_template("email.html") html_content = template.render(deals=deals) # Send the email email_service.send_email(html_content)