move parsing logic

This commit is contained in:
2025-07-26 16:33:13 +02:00
parent fb2e90d47d
commit 433b52c0cb
3 changed files with 42 additions and 38 deletions

View File

@@ -4,9 +4,10 @@ from glob import glob
import polars as pl
import structlog
from plato.fetch import scrape_plato
from plato.parse import parse as parse_plato
from shared.utils import get_partition_keys, parse_partition_keys
from sounds.fetch import fetch_deals
from utils import parse_date
from sounds.parse import parse as parse_sounds
import dagster as dg
@@ -103,43 +104,6 @@ def new_deals(
return
def parse_plato(df: pl.LazyFrame) -> pl.LazyFrame:
"""Parse the Sounds DataFrame."""
ic()
return pl.sql(
"""
SELECT source,
CAST(date AS DATE) AS date,
ean AS id,
_artist AS artist,
LOWER(title) AS title,
CAST(_date AS DATE) AS release,
CAST(_price AS FLOAT) AS price,
CONCAT('https://www.platomania.nl', url) AS url
FROM df
QUALIFY ROW_NUMBER() OVER (PARTITION BY source, id, artist, title, price ORDER BY date DESC) = 1
ORDER BY date ASC
"""
)
def parse_sounds(df: pl.LazyFrame) -> pl.LazyFrame:
"""Parse the Plato DataFrame."""
return df.with_columns(
artist=pl.coalesce(pl.col("artist"), pl.col("name").str.split("-").list.get(1))
.str.strip_chars()
.str.to_lowercase(),
title=pl.coalesce(
pl.col("title"), pl.col("name").str.split("-").list.slice(2).list.join("-")
)
.str.strip_chars()
.str.to_lowercase(),
release=pl.col("release").map_elements(parse_date, return_dtype=pl.Date),
price=pl.col("price").cast(pl.Float64),
url=pl.format("https://www.sounds.nl/detail/{}", pl.col("id")),
)
@dg.asset(
io_manager_key="polars_parquet_io_manager",
partitions_def=deals.partitions_def,

View File

@@ -0,0 +1,21 @@
import polars as pl
def parse(df: pl.LazyFrame) -> pl.LazyFrame:
"""Parse the Sounds DataFrame."""
ic()
return pl.sql(
"""
SELECT source,
CAST(date AS DATE) AS date,
ean AS id,
_artist AS artist,
LOWER(title) AS title,
CAST(_date AS DATE) AS release,
CAST(_price AS FLOAT) AS price,
CONCAT('https://www.platomania.nl', url) AS url
FROM df
QUALIFY ROW_NUMBER() OVER (PARTITION BY source, id, artist, title, price ORDER BY date DESC) = 1
ORDER BY date ASC
"""
)

View File

@@ -0,0 +1,19 @@
import polars as pl
from utils import parse_date
def parse(df: pl.LazyFrame) -> pl.LazyFrame:
"""Parse the Plato DataFrame."""
return df.with_columns(
artist=pl.coalesce(pl.col("artist"), pl.col("name").str.split("-").list.get(1))
.str.strip_chars()
.str.to_lowercase(),
title=pl.coalesce(
pl.col("title"), pl.col("name").str.split("-").list.slice(2).list.join("-")
)
.str.strip_chars()
.str.to_lowercase(),
release=pl.col("release").map_elements(parse_date, return_dtype=pl.Date),
price=pl.col("price").cast(pl.Float64),
url=pl.format("https://www.sounds.nl/detail/{}", pl.col("id")),
)