diff --git a/Dockerfile.code b/Dockerfile.code index 147fcd0..b61f987 100644 --- a/Dockerfile.code +++ b/Dockerfile.code @@ -10,7 +10,7 @@ RUN uv pip install -r requirements.txt --system RUN uv pip install polars-lts-cpu --system ARG APP -ENV PYTHONPATH=/apps/$APP/src/ +ENV PYTHONPATH=/apps/$APP/src/:/shared/src/ WORKDIR /opt/dagster/home # Run dagster gRPC server on port 4000 diff --git a/apps/vinyl/src/assets.py b/apps/vinyl/src/assets.py index da3071f..1482ba9 100644 --- a/apps/vinyl/src/assets.py +++ b/apps/vinyl/src/assets.py @@ -1,11 +1,10 @@ from datetime import datetime from glob import glob -import duckdb import polars as pl import structlog -from duckdb.typing import DATE, VARCHAR from plato.fetch import scrape_plato +from shared.utils import get_partition_keys, parse_partition_keys from sounds.fetch import fetch_deals from utils import parse_date @@ -15,17 +14,23 @@ SOURCES = ["plato", "sounds"] logger = structlog.get_logger() -partitions_def = dg.MultiPartitionsDefinition( +daily_partitions_def = dg.DailyPartitionsDefinition( + start_date="2024-09-01", end_offset=1 +) +multi_partitions_def = dg.MultiPartitionsDefinition( { - "date": dg.DailyPartitionsDefinition(start_date="2024-09-01", end_offset=1), + "date": daily_partitions_def, "source": dg.StaticPartitionsDefinition(SOURCES), } ) +partitions_mapping = dg.MultiToSingleDimensionPartitionMapping( + partition_dimension_name="date" +) @dg.asset( io_manager_key="polars_parquet_io_manager", - partitions_def=partitions_def, + partitions_def=multi_partitions_def, metadata={ "partition_by": ["date", "source"], }, @@ -83,82 +88,95 @@ def deals(context): @dg.asset( - deps=[deals.key], - ins={"df": dg.AssetIn(key=deals.key)}, - automation_condition=dg.AutomationCondition.eager(), io_manager_key="polars_parquet_io_manager", + partitions_def=daily_partitions_def, + ins={"partitions": dg.AssetIn(key=deals.key, partition_mapping=partitions_mapping)}, + automation_condition=dg.AutomationCondition.eager(), ) -def new_deals(context: dg.OpExecutionContext) -> pl.DataFrame: +def new_deals( + context: dg.OpExecutionContext, partitions: dict[str, pl.DataFrame] +) -> None: # pl.DataFrame: """Combine deals from Plato and Sounds into a single DataFrame.""" ic() - storage_dir = context.resources.polars_parquet_io_manager.base_dir - asset_key = "deals" + partition_keys = parse_partition_keys(context) + ic(partition_keys) + return - # TODO: can we directly query from the deals input? - with duckdb.connect() as con: - con.create_function("PARSE_DATE", parse_date, [VARCHAR], DATE) - return con.execute( - f""" - WITH tmp_plato AS ( - SELECT - source, - CAST(date AS DATE) AS date, - ean AS id, - _artist AS artist, - LOWER(title) AS title, - CAST(_date AS DATE) AS release, - CAST(_price AS FLOAT) AS price, - CONCAT('https://www.platomania.nl', url) AS url - FROM read_parquet( - '{storage_dir}/{asset_key}/*/plato.parquet', - union_by_name = true - ) - ), - tmp_sounds AS ( - SELECT - source, - date, - id, - LOWER(TRIM(COALESCE(artist, SPLIT(name, '-')[1]))) AS artist, - LOWER(TRIM(COALESCE( - title, - ARRAY_TO_STRING(SPLIT(name, '-')[2:], '-') - ))) AS title, - PARSE_DATE(release) AS release, - CAST(price AS FLOAT) AS price, - CONCAT('https://www.sounds.nl/detail/', id) AS url - FROM read_parquet( - '{storage_dir}/{asset_key}/*/sounds.parquet', - union_by_name = true - ) - ), - tmp_both AS ( - SELECT * FROM tmp_plato - UNION ALL - SELECT * FROM tmp_sounds - ) - SELECT - source, - date, - id, - artist, - title, - release, - price, - url - FROM tmp_both - QUALIFY ROW_NUMBER() OVER ( - PARTITION BY source, id, artist, title, price - ORDER BY date DESC - ) = 1 - ORDER BY date ASC - """ - ).pl() +def parse_plato(df: pl.LazyFrame) -> pl.LazyFrame: + """Parse the Sounds DataFrame.""" + ic() + return pl.sql( + """ + SELECT source, + CAST(date AS DATE) AS date, + ean AS id, + _artist AS artist, + LOWER(title) AS title, + CAST(_date AS DATE) AS release, + CAST(_price AS FLOAT) AS price, + CONCAT('https://www.platomania.nl', url) AS url + FROM df + QUALIFY ROW_NUMBER() OVER (PARTITION BY source, id, artist, title, price ORDER BY date DESC) = 1 + ORDER BY date ASC + """ + ) + + +def parse_sounds(df: pl.LazyFrame) -> pl.LazyFrame: + """Parse the Plato DataFrame.""" + return df.with_columns( + artist=pl.coalesce(pl.col("artist"), pl.col("name").str.split("-").list.get(1)) + .str.strip_chars() + .str.to_lowercase(), + title=pl.coalesce( + pl.col("title"), pl.col("name").str.split("-").list.slice(2).list.join("-") + ) + .str.strip_chars() + .str.to_lowercase(), + release=pl.col("release").map_elements(parse_date, return_dtype=pl.Date), + price=pl.col("price").cast(pl.Float64), + url=pl.format("https://www.sounds.nl/detail/{}", pl.col("id")), + ) + + +@dg.asset( + io_manager_key="polars_parquet_io_manager", + partitions_def=deals.partitions_def, + ins={"df": dg.AssetIn(key=deals.key)}, + automation_condition=dg.AutomationCondition.on_missing().without( + dg.AutomationCondition.in_latest_time_window() + ), +) +def cleaned_deals( + context: dg.OpExecutionContext, df: pl.LazyFrame +) -> pl.DataFrame | None: + ic() + partition_keys = get_partition_keys(context) + ic(partition_keys) + + # Specific parsing for each source + match source := partition_keys["source"]: + case "plato": + parsed_df = parse_plato(df) + case "sounds": + parsed_df = parse_sounds(df) + case _: + context.log.warning(f"Unknown source: {source}!") + return None + + # Deduplicate and sort the DataFrame + columns = ["source", "id", "artist", "title", "price"] + return ( + parsed_df.collect() + .sort("date", descending=True) + .unique(subset=columns, keep="first") + .sort("date", descending=False) + .select(*columns, "date", "release", "url") + ) @dg.asset( - deps=[new_deals.key], ins={"df": dg.AssetIn(key=new_deals.key)}, io_manager_key="polars_parquet_io_manager", automation_condition=dg.AutomationCondition.eager(), diff --git a/apps/vinyl/src/definitions.py b/apps/vinyl/src/definitions.py index 762646f..02e26be 100644 --- a/apps/vinyl/src/definitions.py +++ b/apps/vinyl/src/definitions.py @@ -1,9 +1,4 @@ -from collections.abc import Sequence - import assets -from dagster_duckdb import DuckDBIOManager -from dagster_duckdb.io_manager import DbTypeHandler -from dagster_duckdb_pandas import DuckDBPandasTypeHandler from dagster_polars import PolarsParquetIOManager from icecream import install from jobs import check_partitions_job, deals_job, musicbrainz_lookup_job @@ -12,13 +7,6 @@ from sensors import musicbrainz_lookup_sensor import dagster as dg - -class PandasDuckDBIOManager(DuckDBIOManager): - @staticmethod - def type_handlers() -> Sequence[DbTypeHandler]: - return [DuckDBPandasTypeHandler()] - - install() definitions = dg.Definitions( assets=[ @@ -30,16 +18,15 @@ definitions = dg.Definitions( ], resources={ "polars_parquet_io_manager": PolarsParquetIOManager(base_dir="/storage"), - "duckdb_io_manager": PandasDuckDBIOManager(database="vinyl"), }, jobs=[deals_job, check_partitions_job, musicbrainz_lookup_job], schedules=[deals_schedule], sensors=[ - dg.AutomationConditionSensorDefinition( - "run_tags_automation_condition_sensor", - target=dg.AssetSelection.all(), - default_status=dg.DefaultSensorStatus.RUNNING, - ), + # dg.AutomationConditionSensorDefinition( + # "run_tags_automation_condition_sensor", + # target=dg.AssetSelection.all(), + # default_status=dg.DefaultSensorStatus.RUNNING, + # ), musicbrainz_lookup_sensor, ], ) diff --git a/apps/vinyl/src/jobs.py b/apps/vinyl/src/jobs.py index c560099..9ac912c 100644 --- a/apps/vinyl/src/jobs.py +++ b/apps/vinyl/src/jobs.py @@ -22,7 +22,7 @@ def check_partitions(context: dg.OpExecutionContext): ic(storage_dir) for row in ( pl.scan_parquet( - f"{storage_dir}/{asset_key}/*/*.parquet", extra_columns="ignore" + f"{storage_dir}/{asset_key}/*/*.parquet", # extra_columns="ignore" ) .select(["date", "source"]) .unique() diff --git a/apps/vinyl/src/test.py b/apps/vinyl/src/test.py index fca8389..4365a94 100644 --- a/apps/vinyl/src/test.py +++ b/apps/vinyl/src/test.py @@ -1,30 +1,30 @@ import logging import warnings from datetime import datetime +from typing import Any -from assets import deals +from assets import cleaned_deals, deals, new_deals from dagster_polars import PolarsParquetIOManager -from jobs import check_partititions_job +from definitions import definitions +from jobs import check_partitions_job -from dagster import materialize +import dagster as dg -warnings.filterwarnings("ignore", category=UserWarning) +warnings.filterwarnings("ignore", category=dg.ExperimentalWarning) logging.getLogger().setLevel(logging.INFO) -resources = { - "polars_parquet_io_manager": PolarsParquetIOManager(base_dir="/opt/dagster/storage") -} + +def today_str(): + """Returns today's date as a string in the format YYYY-MM-DD.""" + return datetime.today().strftime("%Y-%m-%d") -def test_deals(source="sounds", date: str = None): - if not date: - today = datetime.today().strftime("%Y-%m-%d") - date = today - - result = materialize( - [deals], - partition_key=f"{date}|{source}", +def test_deals(resources: dict[str, Any], source="sounds", date: str = None): + result = dg.materialize( + assets=definitions.assets, + selection=[deals.key], + partition_key=f"{date or today_str()}|{source}", resources=resources, run_config={ "loggers": {"console": {"config": {"log_level": "ERROR"}}}, @@ -32,9 +32,35 @@ def test_deals(source="sounds", date: str = None): }, ) assert result.success - ic(result.asset_value) if __name__ == "__main__": - # test_deals(source="plato") - check_partititions_job.execute_in_process() + run = 4 + resources = { + "polars_parquet_io_manager": PolarsParquetIOManager( + base_dir="/opt/dagster/storage" + ) + } + source = "sounds" # or "plato" + + match run: + case 1: + check_partitions_job.execute_in_process(resources=resources) + case 2: + test_deals(resources, source=source) + case 3: + dg.materialize( + assets=definitions.assets, + selection=[new_deals.key], + partition_key=today_str(), + resources=resources, + ) + case 4: + dg.materialize( + assets=definitions.assets, + selection=[cleaned_deals.key], + partition_key=f"{today_str()}|{source}", + resources=resources, + ) + case _: + raise ValueError("Invalid run number") diff --git a/apps/vinyl/src/utils.py b/apps/vinyl/src/utils.py index 651148e..fce7498 100644 --- a/apps/vinyl/src/utils.py +++ b/apps/vinyl/src/utils.py @@ -1,7 +1,16 @@ -import datetime +from datetime import date, datetime -def parse_date(dutch_date: str): +def parse_date(dutch_date: str) -> date: + """ + Parse a date string in Dutch format (e.g., "1 januari 2023") and return a date object. + + Args: + - dutch_date (str): The date string in Dutch format. + + Returns: + - date: A date object representing the parsed date. + """ # Create a dictionary to map Dutch month names to English dutch_to_english_months = { "januari": "January", @@ -25,5 +34,5 @@ def parse_date(dutch_date: str): # Rebuild the date string in English format english_date = f"{day} {english_month} {year}" - # Parse the date using strptime - return datetime.datetime.strptime(english_date, "%d %B %Y").date() + # Parse the date + return datetime.strptime(english_date, "%d %B %Y").date() diff --git a/compose.code.yaml b/compose.code.yaml index bd57798..189f6b1 100644 --- a/compose.code.yaml +++ b/compose.code.yaml @@ -25,6 +25,7 @@ services: DAGSTER_CURRENT_IMAGE: user_code_vinyl volumes: - /opt/dagster/apps/:/apps/:ro + - /opt/dagster/shared/:/shared/:ro - /opt/dagster/logs/:/logs:rw - /opt/dagster/storage/import/:/storage/import/:ro - /opt/dagster/storage/deals/:/storage/deals/:rw diff --git a/dagster.yaml b/dagster.yaml index fbe6f05..3cd086f 100644 --- a/dagster.yaml +++ b/dagster.yaml @@ -21,6 +21,7 @@ run_launcher: network: dagster container_kwargs: volumes: + - /opt/dagster/shared/:/shared/:ro - /opt/dagster/apps/:/apps:ro - /opt/dagster/storage/:/storage/:rw - /opt/dagster/logs/:/logs:rw diff --git a/shared/src/__init__.py b/shared/src/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/shared/src/shared/__init__.py b/shared/src/shared/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/shared/src/shared/utils.py b/shared/src/shared/utils.py new file mode 100644 index 0000000..eea6086 --- /dev/null +++ b/shared/src/shared/utils.py @@ -0,0 +1,82 @@ +from typing import Mapping + +import dagster as dg + + +def get_dimension_names( + context: dg.OpExecutionContext, input_name: str = "partitions" +) -> list[str]: + """ + Extract dimension names for an input. + + Args: + context: The Dagster execution context. + input_name: The name of the input to extract dimension names for (default is "partitions"). + + Returns: + A list of dimension names. + """ + partition_definition = context.asset_partitions_def_for_input(input_name) + if isinstance(partition_definition, dg.MultiPartitionsDefinition): + return [x.name for x in partition_definition.partitions_defs] + raise NotImplementedError("Only MultiPartitionsDefinition is supported.") + + +def parse_coalesced_partition_key( + coalesced_key: str, dimension_names: list[str] +) -> dict[str, str]: + """ + Parse a coalesced partition key into a dictionary of dimension values. + + Args: + coalesced_key: The coalesced partition key string. + dimension_names: A list of dimension names corresponding to the parts of the key. + + Returns: + A dictionary mapping dimension names to their corresponding values. + """ + parts = coalesced_key.split("|") + if len(parts) != len(dimension_names): + raise ValueError("Mismatch between dimension names and partition key parts") + return dict(zip(dimension_names, parts)) + + +def get_partition_keys(context: dg.OpExecutionContext) -> Mapping[str, str]: + """ + Get the partition key from the execution context. + + Args: + context: The Dagster execution context. + + Returns: + A mapping of dimension names to their corresponding values in the partition key. + + Raises: + ValueError: If the partition key is not a MultiPartitionKey. + """ + multi_partition_key = context.partition_key + if not isinstance(multi_partition_key, dg.MultiPartitionKey): + raise ValueError( + f"Expected MultiPartitionKey, got {type(context.partition_key)}: {context.partition_key}" + ) + return multi_partition_key.keys_by_dimension + + +def parse_partition_keys( + context: dg.OpExecutionContext, input_name: str = "partitions" +) -> dict[str, dict[str, str]]: + """ + Parse partition keys for a given input. + + Args: + context: The Dagster execution context. + input_name: The name of the input to parse partition keys for (default is "partitions"). + + Returns: + a dictionary mapping partition keys to their parsed dimension values. + """ + dimension_names = get_dimension_names(context, input_name) + return { + k: parse_coalesced_partition_key(k, dimension_names) + for k in context.asset_partition_keys_for_input(input_name) + }