From 433b52c0cbf64eb7ebe0df03e286098cc3139c69 Mon Sep 17 00:00:00 2001 From: Rik Veenboer Date: Sat, 26 Jul 2025 16:33:13 +0200 Subject: [PATCH] move parsing logic --- apps/vinyl/src/assets.py | 40 ++-------------------------------- apps/vinyl/src/plato/parse.py | 21 ++++++++++++++++++ apps/vinyl/src/sounds/parse.py | 19 ++++++++++++++++ 3 files changed, 42 insertions(+), 38 deletions(-) create mode 100644 apps/vinyl/src/plato/parse.py create mode 100644 apps/vinyl/src/sounds/parse.py diff --git a/apps/vinyl/src/assets.py b/apps/vinyl/src/assets.py index 1482ba9..2a3e882 100644 --- a/apps/vinyl/src/assets.py +++ b/apps/vinyl/src/assets.py @@ -4,9 +4,10 @@ from glob import glob import polars as pl import structlog from plato.fetch import scrape_plato +from plato.parse import parse as parse_plato from shared.utils import get_partition_keys, parse_partition_keys from sounds.fetch import fetch_deals -from utils import parse_date +from sounds.parse import parse as parse_sounds import dagster as dg @@ -103,43 +104,6 @@ def new_deals( return -def parse_plato(df: pl.LazyFrame) -> pl.LazyFrame: - """Parse the Sounds DataFrame.""" - ic() - return pl.sql( - """ - SELECT source, - CAST(date AS DATE) AS date, - ean AS id, - _artist AS artist, - LOWER(title) AS title, - CAST(_date AS DATE) AS release, - CAST(_price AS FLOAT) AS price, - CONCAT('https://www.platomania.nl', url) AS url - FROM df - QUALIFY ROW_NUMBER() OVER (PARTITION BY source, id, artist, title, price ORDER BY date DESC) = 1 - ORDER BY date ASC - """ - ) - - -def parse_sounds(df: pl.LazyFrame) -> pl.LazyFrame: - """Parse the Plato DataFrame.""" - return df.with_columns( - artist=pl.coalesce(pl.col("artist"), pl.col("name").str.split("-").list.get(1)) - .str.strip_chars() - .str.to_lowercase(), - title=pl.coalesce( - pl.col("title"), pl.col("name").str.split("-").list.slice(2).list.join("-") - ) - .str.strip_chars() - .str.to_lowercase(), - release=pl.col("release").map_elements(parse_date, return_dtype=pl.Date), - price=pl.col("price").cast(pl.Float64), - url=pl.format("https://www.sounds.nl/detail/{}", pl.col("id")), - ) - - @dg.asset( io_manager_key="polars_parquet_io_manager", partitions_def=deals.partitions_def, diff --git a/apps/vinyl/src/plato/parse.py b/apps/vinyl/src/plato/parse.py new file mode 100644 index 0000000..ca29f3e --- /dev/null +++ b/apps/vinyl/src/plato/parse.py @@ -0,0 +1,21 @@ +import polars as pl + + +def parse(df: pl.LazyFrame) -> pl.LazyFrame: + """Parse the Sounds DataFrame.""" + ic() + return pl.sql( + """ + SELECT source, + CAST(date AS DATE) AS date, + ean AS id, + _artist AS artist, + LOWER(title) AS title, + CAST(_date AS DATE) AS release, + CAST(_price AS FLOAT) AS price, + CONCAT('https://www.platomania.nl', url) AS url + FROM df + QUALIFY ROW_NUMBER() OVER (PARTITION BY source, id, artist, title, price ORDER BY date DESC) = 1 + ORDER BY date ASC + """ + ) diff --git a/apps/vinyl/src/sounds/parse.py b/apps/vinyl/src/sounds/parse.py new file mode 100644 index 0000000..a538e6e --- /dev/null +++ b/apps/vinyl/src/sounds/parse.py @@ -0,0 +1,19 @@ +import polars as pl +from utils import parse_date + + +def parse(df: pl.LazyFrame) -> pl.LazyFrame: + """Parse the Plato DataFrame.""" + return df.with_columns( + artist=pl.coalesce(pl.col("artist"), pl.col("name").str.split("-").list.get(1)) + .str.strip_chars() + .str.to_lowercase(), + title=pl.coalesce( + pl.col("title"), pl.col("name").str.split("-").list.slice(2).list.join("-") + ) + .str.strip_chars() + .str.to_lowercase(), + release=pl.col("release").map_elements(parse_date, return_dtype=pl.Date), + price=pl.col("price").cast(pl.Float64), + url=pl.format("https://www.sounds.nl/detail/{}", pl.col("id")), + )