improve sql

This commit is contained in:
2025-07-22 13:42:31 +02:00
parent eb43acbb22
commit 6b5bda5cb2

View File

@@ -109,6 +109,7 @@ def deals(context):
@asset(deps=[deals], io_manager_key="polars_parquet_io_manager")
def new_deals(context: OpExecutionContext) -> pl.DataFrame:
"""Combine deals from Plato and Sounds into a single DataFrame."""
ic()
storage_dir = context.resources.polars_parquet_io_manager.base_dir
asset_key = "deals"
@@ -126,26 +127,34 @@ def new_deals(context: OpExecutionContext) -> pl.DataFrame:
LOWER(title) AS title,
CAST(_date AS DATE) AS release,
CAST(_price AS FLOAT) AS price,
CONCAT('https://www.platomania.nl', url) AS url,
FROM read_parquet('{storage_dir}/{asset_key}/*/plato.parquet', union_by_name = true)
), tmp_sounds AS (
CONCAT('https://www.platomania.nl', url) AS url
FROM read_parquet(
'{storage_dir}/{asset_key}/*/plato.parquet',
union_by_name = true
)
),
tmp_sounds AS (
SELECT
source,
date,
id,
LOWER(TRIM(COALESCE(artist, SPLIT(name, '-')[1]))) AS artist,
LOWER(TRIM(COALESCE(title, ARRAY_TO_STRING(split(name, '-')[2:], '-')))) AS title,
LOWER(TRIM(COALESCE(
title,
ARRAY_TO_STRING(SPLIT(name, '-')[2:], '-')
))) AS title,
PARSE_DATE(release) AS release,
CAST(price AS FLOAT) AS price,
CONCAT('https://www.sounds.nl/detail/', id) AS url
FROM read_parquet('{storage_dir}/{asset_key}/*/sounds.parquet', union_by_name = true)
), tmp_both AS (
SELECT * FROM tmp_plato UNION ALL SELECT * FROM tmp_sounds
), tmp_rn AS (
SELECT
*,
ROW_NUMBER() OVER(PARTITION BY source, id, artist, title, price ORDER BY date DESC) as rn
FROM tmp_both
FROM read_parquet(
'{storage_dir}/{asset_key}/*/sounds.parquet',
union_by_name = true
)
),
tmp_both AS (
SELECT * FROM tmp_plato
UNION ALL
SELECT * FROM tmp_sounds
)
SELECT
source,
@@ -156,8 +165,11 @@ def new_deals(context: OpExecutionContext) -> pl.DataFrame:
release,
price,
url
FROM tmp_rn
WHERE rn = 1
FROM tmp_both
QUALIFY ROW_NUMBER() OVER (
PARTITION BY source, id, artist, title, price
ORDER BY date DESC
) = 1
ORDER BY date ASC
"""
).pl()