from datetime import datetime from glob import glob import polars as pl import structlog from dagster import (AssetIn, DailyPartitionsDefinition, DimensionPartitionMapping, Failure, Field, IdentityPartitionMapping, MultiPartitionMapping, MultiPartitionsDefinition, StaticPartitionsDefinition, TimeWindowPartitionMapping, asset) from app.vinyl.plato.check_plato import scrape_plato from app.vinyl.sounds.fetch import fetch_deals SOURCES = ["plato", "sounds"] logger = structlog.get_logger() partitions_def = MultiPartitionsDefinition( { "date": DailyPartitionsDefinition(start_date="2024-09-01", end_offset=1), "source": StaticPartitionsDefinition(SOURCES), } ) partition_mapping = MultiPartitionMapping( { "date": DimensionPartitionMapping( dimension_name="date", partition_mapping=TimeWindowPartitionMapping(start_offset=-1, end_offset=0), ), "source": DimensionPartitionMapping( dimension_name="source", partition_mapping=IdentityPartitionMapping(), ), } ) @asset( io_manager_key="polars_parquet_io_manager", partitions_def=partitions_def, metadata={ "partition_by": ["date", "source"], }, config_schema={ "import_dir": Field(str, default_value="/opt/dagster/home/storage/import") }, ) def deals(context): ic() ic(context.partition_key) ic(context.op_config) import_dir = context.op_config["import_dir"] partition_key = context.partition_key.keys_by_dimension date_str = partition_key["date"] source = partition_key["source"] logger.info("Materializing deals", date=partition_key["date"], source=source) date = datetime.strptime(partition_key["date"], "%Y-%m-%d") days = (date - datetime.today()).days ic(days) if days > 0: raise Failure(f"Cannot materialize for the future: {date.date()}") if days < -1: if source == "sounds": pattern = f"{import_dir}/{date.date()}_*_sounds.csv" logger.info("Looking for existing CSV files", pattern=pattern) files = glob(pattern) if len(files): file = sorted(files)[-1] logger.info("Using existing CSV file", file=file) try: df = pl.read_csv(file) logger.info("Loaded CSV file", rows=len(df)) return df.with_columns( **{k: pl.lit(v) for k, v in partition_key.items()} ) except Exception as e: logger.error("Failed to load CSV file!", error=e) raise Failure(f"Cannot materialize for the past: {date.date()}") if source == "plato": logger.info("Scraping Plato") df = scrape_plato() logger.info("Scraped Plato", rows=len(df), head=df.head().to_markdown()) ic(df.columns) return pl.from_pandas(df.assign(**partition_key)) if source == "sounds": logger.info("Scraping Sounds") df = fetch_deals() ic(df.columns) logger.info("Scraped Sounds", rows=len(df), head=df.head().to_markdown()) return pl.from_pandas(df.assign(**partition_key)) return pl.DataFrame( [{"date": context.partition_key, "data": f"Data for {context.partition_key}"}] ) @asset( partitions_def=partitions_def, ins={"asset_multi_1": AssetIn(partition_mapping=partition_mapping)}, ) def new_deals(context, asset_multi_1): ic() ic(context.partition_key) ic(context.partition_key.keys_by_dimension) ic(asset_multi_1) partition_key = context.asset_partition_key_for_output() ic(partition_key) return f"Processed data for {partition_key}"