From 6b5bda5cb2c70ce171c64aafcf01b504c054d852 Mon Sep 17 00:00:00 2001 From: Rik Veenboer Date: Tue, 22 Jul 2025 13:42:31 +0200 Subject: [PATCH] improve sql --- apps/vinyl/src/assets.py | 40 ++++++++++++++++++++++++++-------------- 1 file changed, 26 insertions(+), 14 deletions(-) diff --git a/apps/vinyl/src/assets.py b/apps/vinyl/src/assets.py index e74ddff..9f5b1fb 100644 --- a/apps/vinyl/src/assets.py +++ b/apps/vinyl/src/assets.py @@ -109,6 +109,7 @@ def deals(context): @asset(deps=[deals], io_manager_key="polars_parquet_io_manager") def new_deals(context: OpExecutionContext) -> pl.DataFrame: + """Combine deals from Plato and Sounds into a single DataFrame.""" ic() storage_dir = context.resources.polars_parquet_io_manager.base_dir asset_key = "deals" @@ -126,26 +127,34 @@ def new_deals(context: OpExecutionContext) -> pl.DataFrame: LOWER(title) AS title, CAST(_date AS DATE) AS release, CAST(_price AS FLOAT) AS price, - CONCAT('https://www.platomania.nl', url) AS url, - FROM read_parquet('{storage_dir}/{asset_key}/*/plato.parquet', union_by_name = true) - ), tmp_sounds AS ( + CONCAT('https://www.platomania.nl', url) AS url + FROM read_parquet( + '{storage_dir}/{asset_key}/*/plato.parquet', + union_by_name = true + ) + ), + tmp_sounds AS ( SELECT source, date, id, LOWER(TRIM(COALESCE(artist, SPLIT(name, '-')[1]))) AS artist, - LOWER(TRIM(COALESCE(title, ARRAY_TO_STRING(split(name, '-')[2:], '-')))) AS title, + LOWER(TRIM(COALESCE( + title, + ARRAY_TO_STRING(SPLIT(name, '-')[2:], '-') + ))) AS title, PARSE_DATE(release) AS release, CAST(price AS FLOAT) AS price, CONCAT('https://www.sounds.nl/detail/', id) AS url - FROM read_parquet('{storage_dir}/{asset_key}/*/sounds.parquet', union_by_name = true) - ), tmp_both AS ( - SELECT * FROM tmp_plato UNION ALL SELECT * FROM tmp_sounds - ), tmp_rn AS ( - SELECT - *, - ROW_NUMBER() OVER(PARTITION BY source, id, artist, title, price ORDER BY date DESC) as rn - FROM tmp_both + FROM read_parquet( + '{storage_dir}/{asset_key}/*/sounds.parquet', + union_by_name = true + ) + ), + tmp_both AS ( + SELECT * FROM tmp_plato + UNION ALL + SELECT * FROM tmp_sounds ) SELECT source, @@ -156,8 +165,11 @@ def new_deals(context: OpExecutionContext) -> pl.DataFrame: release, price, url - FROM tmp_rn - WHERE rn = 1 + FROM tmp_both + QUALIFY ROW_NUMBER() OVER ( + PARTITION BY source, id, artist, title, price + ORDER BY date DESC + ) = 1 ORDER BY date ASC """ ).pl()