From f9c83d29b3ef3a1f39c091b86693ea5087900125 Mon Sep 17 00:00:00 2001 From: Rik Veenboer Date: Mon, 14 Oct 2024 15:23:49 +0200 Subject: [PATCH] use polars and duckdb for lazy processing --- src/app/vinyl/assets.py | 109 ++++++++++++++++++++-------------------- src/app/vinyl/repo.py | 3 +- 2 files changed, 55 insertions(+), 57 deletions(-) diff --git a/src/app/vinyl/assets.py b/src/app/vinyl/assets.py index 3775f3b..b8051f7 100644 --- a/src/app/vinyl/assets.py +++ b/src/app/vinyl/assets.py @@ -2,7 +2,6 @@ from datetime import datetime from glob import glob import duckdb -import pandas as pd import polars as pl import structlog from duckdb.typing import DATE, VARCHAR @@ -110,73 +109,73 @@ def deals(context): ) -@asset(deps=[deals], io_manager_key="duckdb_io_manager") -def new_deals( - context: OpExecutionContext, - # duckdb: DuckDBResource -) -> pd.DataFrame: +@asset(deps=[deals], io_manager_key="polars_parquet_io_manager") +def new_deals(context: OpExecutionContext) -> pl.DataFrame: ic() storage_dir = context.instance.storage_directory() asset_key = "deals" with duckdb.connect() as con: - # with duckdb.get_connection() as con: con.create_function("PARSE_DATE", parse_date, [VARCHAR], DATE) return con.execute( f""" - WITH tmp_plato AS ( - SELECT - source, - CAST(date AS DATE) AS date, - ean AS id, - _artist AS artist, - LOWER(title) AS title, - CAST(_date AS DATE) AS release, - CAST(_price AS FLOAT) AS price, - CONCAT('https://www.platomania.nl', url) AS url, - FROM read_parquet('{storage_dir}/{asset_key}/*/plato.parquet', union_by_name = true) - ), tmp_sounds AS ( + WITH tmp_plato AS ( + SELECT + source, + CAST(date AS DATE) AS date, + ean AS id, + _artist AS artist, + LOWER(title) AS title, + CAST(_date AS DATE) AS release, + CAST(_price AS FLOAT) AS price, + CONCAT('https://www.platomania.nl', url) AS url, + FROM read_parquet('{storage_dir}/{asset_key}/*/plato.parquet', union_by_name = true) + ), tmp_sounds AS ( + SELECT + source, + date, + id, + LOWER(TRIM(COALESCE(artist, SPLIT(name, '-')[1]))) AS artist, + LOWER(TRIM(COALESCE(title, ARRAY_TO_STRING(split(name, '-')[2:], '-')))) AS title, + PARSE_DATE(release) AS release, + CAST(price AS FLOAT) AS price, + CONCAT('https://www.sounds.nl/detail/', id) AS url + FROM read_parquet('{storage_dir}/{asset_key}/*/sounds.parquet', union_by_name = true) + ), tmp_both AS ( + SELECT * FROM tmp_plato UNION ALL SELECT * FROM tmp_sounds + ), tmp_rn AS ( + SELECT + *, + ROW_NUMBER() OVER(PARTITION BY source, id, artist, title, price ORDER BY date DESC) as rn + FROM tmp_both + ) SELECT source, date, id, - LOWER(TRIM(COALESCE(artist, SPLIT(name, '-')[1]))) AS artist, - LOWER(TRIM(COALESCE(title, ARRAY_TO_STRING(split(name, '-')[2:], '-')))) AS title, - PARSE_DATE(release) AS release, - CAST(price AS FLOAT) AS price, - CONCAT('https://www.sounds.nl/detail/', id) AS url - FROM read_parquet('{storage_dir}/{asset_key}/*/sounds.parquet', union_by_name = true) - ), tmp_both AS ( - SELECT * FROM tmp_plato UNION ALL SELECT * FROM tmp_sounds - ), tmp_rn AS ( - SELECT - *, - ROW_NUMBER() OVER(PARTITION BY source, id, artist, title, price ORDER BY date DESC) as rn - FROM tmp_both - ) - SELECT - source, - date, - id, - artist, - title, - release, - price, - url - FROM tmp_rn - WHERE rn = 1 - ORDER BY date ASC - """ - ).df() + artist, + title, + release, + price, + url + FROM tmp_rn + WHERE rn = 1 + ORDER BY date ASC + """ + ).pl() @asset( - # deps=[new_deals], - io_manager_key="duckdb_io_manager" + io_manager_key="polars_parquet_io_manager", ) -def works(new_deals: pd.DataFrame) -> pd.DataFrame: - return new_deals[["artist", "title"]].drop_duplicates() - # with duckdb.get_connection() as con: - # return con.execute( - # "SELECT DISTINCT artist, title, release FROM vinyl.public.new_deals" - # ).df() +def works(new_deals: pl.DataFrame) -> pl.DataFrame: + # Pandas + # columns = ["artist", "title"] + # return pl.from_pandas(new_deals[columns].to_pandas().drop_duplicates()) + + # Polars + # return new_deals[columns].unique(subset=columns) + + # DuckDB + with duckdb.connect() as con: + return con.execute("SELECT DISTINCT artist, title, release FROM new_deals").pl() diff --git a/src/app/vinyl/repo.py b/src/app/vinyl/repo.py index e8a2557..5eb239b 100644 --- a/src/app/vinyl/repo.py +++ b/src/app/vinyl/repo.py @@ -23,8 +23,7 @@ vinyl = Definitions( assets=[deals, new_deals, works], resources={ "polars_parquet_io_manager": PolarsParquetIOManager(), - "duckdb_io_manager": PandasDuckDBIOManager(database="vinyl.duckdb"), - # "duckdb": DuckDBResource(database="vinyl.duckdb") + "duckdb_io_manager": PandasDuckDBIOManager(database="vinyl"), }, jobs=[deals_job, check_partititions_job, musicbrainz_lookup_job], schedules=[deals_schedule],