from datetime import datetime from glob import glob import polars as pl import structlog from plato.fetch import scrape_plato from shared.utils import get_partition_keys, parse_partition_keys from sounds.fetch import fetch_deals from utils import parse_date import dagster as dg SOURCES = ["plato", "sounds"] logger = structlog.get_logger() daily_partitions_def = dg.DailyPartitionsDefinition( start_date="2024-09-01", end_offset=1 ) multi_partitions_def = dg.MultiPartitionsDefinition( { "date": daily_partitions_def, "source": dg.StaticPartitionsDefinition(SOURCES), } ) partitions_mapping = dg.MultiToSingleDimensionPartitionMapping( partition_dimension_name="date" ) @dg.asset( io_manager_key="polars_parquet_io_manager", partitions_def=multi_partitions_def, metadata={ "partition_by": ["date", "source"], }, config_schema={"import_dir": dg.Field(str, default_value="/storage/import")}, ) def deals(context): ic() ic(context.partition_key) ic(context.op_config) import_dir = context.op_config["import_dir"] partition_key = context.partition_key.keys_by_dimension date_str = partition_key["date"] source = partition_key["source"] logger.info("Materializing deals", date=date_str, source=source) date = datetime.strptime(partition_key["date"], "%Y-%m-%d") days = (date - datetime.today()).days ic(days) if days > 0: raise dg.Failure(f"Cannot materialize for the future: {date.date()}") if days < -1: if source == "sounds": pattern = f"{import_dir}/{date.date()}_*_sounds.csv" logger.info("Looking for existing CSV files", pattern=pattern) files = glob(pattern) if len(files): file = sorted(files)[-1] logger.info("Using existing CSV file", file=file) try: df = pl.read_csv(file) logger.info("Loaded CSV file", rows=len(df)) return df.with_columns( **{k: pl.lit(v) for k, v in partition_key.items()} ) except Exception as e: logger.error("Failed to load CSV file!", error=e) raise dg.Failure(f"Cannot materialize for the past: {date.date()}") if source == "plato": logger.info("Scraping Plato") df = scrape_plato() logger.info("Scraped Plato", rows=len(df), head=df.head().to_markdown()) ic(df.columns) return pl.from_pandas(df.assign(**partition_key)) if source == "sounds": logger.info("Scraping Sounds") df = fetch_deals() ic(df.columns) logger.info("Scraped Sounds", rows=len(df), head=df.head().to_markdown()) return pl.from_pandas(df.assign(**partition_key)) return pl.DataFrame( [{"date": context.partition_key, "data": f"Data for {context.partition_key}"}] ) @dg.asset( io_manager_key="polars_parquet_io_manager", partitions_def=daily_partitions_def, ins={"partitions": dg.AssetIn(key=deals.key, partition_mapping=partitions_mapping)}, automation_condition=dg.AutomationCondition.eager(), ) def new_deals( context: dg.OpExecutionContext, partitions: dict[str, pl.DataFrame] ) -> None: # pl.DataFrame: """Combine deals from Plato and Sounds into a single DataFrame.""" ic() partition_keys = parse_partition_keys(context) ic(partition_keys) return def parse_plato(df: pl.LazyFrame) -> pl.LazyFrame: """Parse the Sounds DataFrame.""" ic() return pl.sql( """ SELECT source, CAST(date AS DATE) AS date, ean AS id, _artist AS artist, LOWER(title) AS title, CAST(_date AS DATE) AS release, CAST(_price AS FLOAT) AS price, CONCAT('https://www.platomania.nl', url) AS url FROM df QUALIFY ROW_NUMBER() OVER (PARTITION BY source, id, artist, title, price ORDER BY date DESC) = 1 ORDER BY date ASC """ ) def parse_sounds(df: pl.LazyFrame) -> pl.LazyFrame: """Parse the Plato DataFrame.""" return df.with_columns( artist=pl.coalesce(pl.col("artist"), pl.col("name").str.split("-").list.get(1)) .str.strip_chars() .str.to_lowercase(), title=pl.coalesce( pl.col("title"), pl.col("name").str.split("-").list.slice(2).list.join("-") ) .str.strip_chars() .str.to_lowercase(), release=pl.col("release").map_elements(parse_date, return_dtype=pl.Date), price=pl.col("price").cast(pl.Float64), url=pl.format("https://www.sounds.nl/detail/{}", pl.col("id")), ) @dg.asset( io_manager_key="polars_parquet_io_manager", partitions_def=deals.partitions_def, ins={"df": dg.AssetIn(key=deals.key)}, automation_condition=dg.AutomationCondition.on_missing().without( dg.AutomationCondition.in_latest_time_window() ), ) def cleaned_deals( context: dg.OpExecutionContext, df: pl.LazyFrame ) -> pl.DataFrame | None: ic() partition_keys = get_partition_keys(context) ic(partition_keys) # Specific parsing for each source match source := partition_keys["source"]: case "plato": parsed_df = parse_plato(df) case "sounds": parsed_df = parse_sounds(df) case _: context.log.warning(f"Unknown source: {source}!") return None # Deduplicate and sort the DataFrame columns = ["source", "id", "artist", "title", "price"] return ( parsed_df.collect() .sort("date", descending=True) .unique(subset=columns, keep="first") .sort("date", descending=False) .select(*columns, "date", "release", "url") ) @dg.asset( ins={"df": dg.AssetIn(key=new_deals.key)}, io_manager_key="polars_parquet_io_manager", automation_condition=dg.AutomationCondition.eager(), ) def works(df: pl.DataFrame) -> pl.DataFrame: columns = ["artist", "title", "release"] return df[columns].unique()