From 1e0528bdfbc6774f19d5db4dfb343a5e46e62a9f Mon Sep 17 00:00:00 2001 From: Rik Veenboer Date: Mon, 14 Oct 2024 15:08:07 +0200 Subject: [PATCH] collect unique works from deals dumps --- dagster-requirements.txt | 14 +++---- dagster.yaml | 1 + pyproject.toml | 2 + requirements.txt | 7 ---- src/app/vinyl/assets.py | 85 ++++++++++++++++++++++++++++++++++------ src/app/vinyl/jobs.py | 7 +++- src/app/vinyl/repo.py | 31 ++++++++++++--- src/app/vinyl/sensors.py | 21 ++++++++++ src/app/vinyl/utils.py | 29 ++++++++++++++ 9 files changed, 164 insertions(+), 33 deletions(-) create mode 100644 src/app/vinyl/sensors.py create mode 100644 src/app/vinyl/utils.py diff --git a/dagster-requirements.txt b/dagster-requirements.txt index dafb3b1..41e78bf 100644 --- a/dagster-requirements.txt +++ b/dagster-requirements.txt @@ -27,8 +27,6 @@ appdirs==1.4.4 # via pint asttokens==2.4.1 # via icecream -async-timeout==4.0.3 - # via aiohttp attrs==24.2.0 # via aiohttp backoff==2.2.1 @@ -76,12 +74,17 @@ dagster==1.8.9 # via # dagster-aws # dagster-docker + # dagster-duckdb + # dagster-duckdb-pandas # dagster-graphql # dagster-polars # dagster-postgres # dagster-webserver dagster-aws==0.24.9 dagster-docker==0.24.9 +dagster-duckdb==0.24.9 + # via dagster-duckdb-pandas +dagster-duckdb-pandas==0.24.9 dagster-graphql==1.8.9 # via dagster-webserver dagster-pipes==1.8.9 @@ -99,14 +102,13 @@ docker-image-py==0.1.13 docstring-parser==0.16 # via dagster duckdb==1.1.1 + # via dagster-duckdb durationpy==0.8 # via kubernetes email-validator==2.2.0 # via pydantic et-xmlfile==1.1.0 # via openpyxl -exceptiongroup==1.2.2 - # via anyio executing==2.1.0 # via icecream fastapi==0.115.0 @@ -218,6 +220,7 @@ packaging==24.1 # pyogrio pandas==2.2.3 # via + # dagster-duckdb-pandas # fastparquet # geopandas # pint-pandas @@ -351,19 +354,16 @@ tqdm==4.66.5 typing-extensions==4.12.2 # via # alembic - # anyio # dagster # dagster-polars # fastapi # flexcache # flexparser - # multidict # pint # pydantic # pydantic-core # reactivex # sqlalchemy - # uvicorn tzdata==2024.2 # via pandas universal-pathlib==0.2.5 diff --git a/dagster.yaml b/dagster.yaml index 48a5598..1171e6d 100644 --- a/dagster.yaml +++ b/dagster.yaml @@ -22,6 +22,7 @@ run_launcher: # - /opt/dagster/storage/:/opt/dagster/home/storage/ - /opt/dagster/storage/import/:/opt/dagster/home/storage/import/ - /opt/dagster/storage/deals/:/opt/dagster/home/storage/deals/ + - /opt/dagster/storage/:/opt/dagster/home/storage/ run_storage: module: dagster_postgres.run_storage diff --git a/pyproject.toml b/pyproject.toml index c26df4d..3d48deb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -57,6 +57,8 @@ dagster = [ "dagster-docker", "dagster-aws", "dagster-polars", + "dagster-duckdb", + "dagster-duckdb-pandas", "dagit" ] diff --git a/requirements.txt b/requirements.txt index 0935faa..85e1c5c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -20,8 +20,6 @@ appdirs==1.4.4 # via pint asttokens==2.4.1 # via icecream -async-timeout==4.0.3 - # via aiohttp attrs==24.2.0 # via aiohttp beautifulsoup4==4.12.3 @@ -62,8 +60,6 @@ email-validator==2.2.0 # via pydantic et-xmlfile==1.1.0 # via openpyxl -exceptiongroup==1.2.2 - # via anyio executing==2.1.0 # via icecream fastapi==0.115.0 @@ -217,16 +213,13 @@ starlette==0.38.6 structlog==24.4.0 typing-extensions==4.12.2 # via - # anyio # fastapi # flexcache # flexparser - # multidict # pint # pydantic # pydantic-core # reactivex - # uvicorn tzdata==2024.2 # via pandas urllib3==2.2.3 diff --git a/src/app/vinyl/assets.py b/src/app/vinyl/assets.py index c468dda..3775f3b 100644 --- a/src/app/vinyl/assets.py +++ b/src/app/vinyl/assets.py @@ -1,13 +1,16 @@ from datetime import datetime from glob import glob +import duckdb +import pandas as pd import polars as pl import structlog +from duckdb.typing import DATE, VARCHAR from app.vinyl.plato.fetch import scrape_plato from app.vinyl.sounds.fetch import fetch_deals +from app.vinyl.utils import parse_date from dagster import ( - AssetIn, DailyPartitionsDefinition, DimensionPartitionMapping, Failure, @@ -15,6 +18,7 @@ from dagster import ( IdentityPartitionMapping, MultiPartitionMapping, MultiPartitionsDefinition, + OpExecutionContext, StaticPartitionsDefinition, TimeWindowPartitionMapping, asset, @@ -106,16 +110,73 @@ def deals(context): ) -@asset( - partitions_def=partitions_def, - ins={"asset_multi_1": AssetIn(partition_mapping=partition_mapping)}, -) -def new_deals(context, asset_multi_1): +@asset(deps=[deals], io_manager_key="duckdb_io_manager") +def new_deals( + context: OpExecutionContext, + # duckdb: DuckDBResource +) -> pd.DataFrame: ic() - ic(context.partition_key) - ic(context.partition_key.keys_by_dimension) - ic(asset_multi_1) + storage_dir = context.instance.storage_directory() + asset_key = "deals" - partition_key = context.asset_partition_key_for_output() - ic(partition_key) - return f"Processed data for {partition_key}" + with duckdb.connect() as con: + # with duckdb.get_connection() as con: + con.create_function("PARSE_DATE", parse_date, [VARCHAR], DATE) + return con.execute( + f""" + WITH tmp_plato AS ( + SELECT + source, + CAST(date AS DATE) AS date, + ean AS id, + _artist AS artist, + LOWER(title) AS title, + CAST(_date AS DATE) AS release, + CAST(_price AS FLOAT) AS price, + CONCAT('https://www.platomania.nl', url) AS url, + FROM read_parquet('{storage_dir}/{asset_key}/*/plato.parquet', union_by_name = true) + ), tmp_sounds AS ( + SELECT + source, + date, + id, + LOWER(TRIM(COALESCE(artist, SPLIT(name, '-')[1]))) AS artist, + LOWER(TRIM(COALESCE(title, ARRAY_TO_STRING(split(name, '-')[2:], '-')))) AS title, + PARSE_DATE(release) AS release, + CAST(price AS FLOAT) AS price, + CONCAT('https://www.sounds.nl/detail/', id) AS url + FROM read_parquet('{storage_dir}/{asset_key}/*/sounds.parquet', union_by_name = true) + ), tmp_both AS ( + SELECT * FROM tmp_plato UNION ALL SELECT * FROM tmp_sounds + ), tmp_rn AS ( + SELECT + *, + ROW_NUMBER() OVER(PARTITION BY source, id, artist, title, price ORDER BY date DESC) as rn + FROM tmp_both + ) + SELECT + source, + date, + id, + artist, + title, + release, + price, + url + FROM tmp_rn + WHERE rn = 1 + ORDER BY date ASC + """ + ).df() + + +@asset( + # deps=[new_deals], + io_manager_key="duckdb_io_manager" +) +def works(new_deals: pd.DataFrame) -> pd.DataFrame: + return new_deals[["artist", "title"]].drop_duplicates() + # with duckdb.get_connection() as con: + # return con.execute( + # "SELECT DISTINCT artist, title, release FROM vinyl.public.new_deals" + # ).df() diff --git a/src/app/vinyl/jobs.py b/src/app/vinyl/jobs.py index a5ed910..b1d6bcd 100644 --- a/src/app/vinyl/jobs.py +++ b/src/app/vinyl/jobs.py @@ -7,7 +7,7 @@ from dagster import ( op, ) -from .assets import deals +from .assets import deals, new_deals, works deals_job = define_asset_job( "deals_job", selection=[deals], partitions_def=deals.partitions_def @@ -51,3 +51,8 @@ def check_partititions(context: OpExecutionContext): @job def check_partititions_job(): check_partititions() + + +musicbrainz_lookup_job = define_asset_job( + "musicbrainz_lookup_job", selection=[works, new_deals] +) diff --git a/src/app/vinyl/repo.py b/src/app/vinyl/repo.py index 346fd4d..e8a2557 100644 --- a/src/app/vinyl/repo.py +++ b/src/app/vinyl/repo.py @@ -1,13 +1,32 @@ -from dagster import Definitions +from collections.abc import Sequence + +from dagster_duckdb import DuckDBIOManager +from dagster_duckdb.io_manager import DbTypeHandler +from dagster_duckdb_pandas import DuckDBPandasTypeHandler from dagster_polars import PolarsParquetIOManager -from .assets import deals -from .jobs import check_partititions_job, deals_job +from dagster import Definitions + +from .assets import deals, new_deals, works +from .jobs import check_partititions_job, deals_job, musicbrainz_lookup_job from .schedules import deals_schedule +from .sensors import musicbrainz_lookup_sensor + + +class PandasDuckDBIOManager(DuckDBIOManager): + @staticmethod + def type_handlers() -> Sequence[DbTypeHandler]: + return [DuckDBPandasTypeHandler()] + vinyl = Definitions( - assets=[deals], - resources={"polars_parquet_io_manager": PolarsParquetIOManager()}, - jobs=[deals_job, check_partititions_job], + assets=[deals, new_deals, works], + resources={ + "polars_parquet_io_manager": PolarsParquetIOManager(), + "duckdb_io_manager": PandasDuckDBIOManager(database="vinyl.duckdb"), + # "duckdb": DuckDBResource(database="vinyl.duckdb") + }, + jobs=[deals_job, check_partititions_job, musicbrainz_lookup_job], schedules=[deals_schedule], + sensors=[musicbrainz_lookup_sensor], ) diff --git a/src/app/vinyl/sensors.py b/src/app/vinyl/sensors.py new file mode 100644 index 0000000..7068c1f --- /dev/null +++ b/src/app/vinyl/sensors.py @@ -0,0 +1,21 @@ +from app.vinyl.assets import deals +from app.vinyl.jobs import musicbrainz_lookup_job +from dagster import ( + DefaultSensorStatus, + EventLogEntry, + RunRequest, + SensorEvaluationContext, + asset_sensor, +) + + +@asset_sensor( + asset_key=deals.key, + job=musicbrainz_lookup_job, + default_status=DefaultSensorStatus.RUNNING, +) +def musicbrainz_lookup_sensor( + context: SensorEvaluationContext, asset_event: EventLogEntry +): + assert asset_event.dagster_event and asset_event.dagster_event.asset_key + yield RunRequest(run_key=context.cursor) diff --git a/src/app/vinyl/utils.py b/src/app/vinyl/utils.py new file mode 100644 index 0000000..651148e --- /dev/null +++ b/src/app/vinyl/utils.py @@ -0,0 +1,29 @@ +import datetime + + +def parse_date(dutch_date: str): + # Create a dictionary to map Dutch month names to English + dutch_to_english_months = { + "januari": "January", + "februari": "February", + "maart": "March", + "april": "April", + "mei": "May", + "juni": "June", + "juli": "July", + "augustus": "August", + "september": "September", + "oktober": "October", + "november": "November", + "december": "December", + } + + # Split the date and replace the Dutch month with its English equivalent + day, dutch_month, year = dutch_date.split() + english_month = dutch_to_english_months[dutch_month] + + # Rebuild the date string in English format + english_date = f"{day} {english_month} {year}" + + # Parse the date using strptime + return datetime.datetime.strptime(english_date, "%d %B %Y").date()