use polars and duckdb for lazy processing

This commit is contained in:
2024-10-14 15:23:49 +02:00
parent 1e0528bdfb
commit f9c83d29b3
2 changed files with 55 additions and 57 deletions

View File

@@ -2,7 +2,6 @@ from datetime import datetime
from glob import glob from glob import glob
import duckdb import duckdb
import pandas as pd
import polars as pl import polars as pl
import structlog import structlog
from duckdb.typing import DATE, VARCHAR from duckdb.typing import DATE, VARCHAR
@@ -110,73 +109,73 @@ def deals(context):
) )
@asset(deps=[deals], io_manager_key="duckdb_io_manager") @asset(deps=[deals], io_manager_key="polars_parquet_io_manager")
def new_deals( def new_deals(context: OpExecutionContext) -> pl.DataFrame:
context: OpExecutionContext,
# duckdb: DuckDBResource
) -> pd.DataFrame:
ic() ic()
storage_dir = context.instance.storage_directory() storage_dir = context.instance.storage_directory()
asset_key = "deals" asset_key = "deals"
with duckdb.connect() as con: with duckdb.connect() as con:
# with duckdb.get_connection() as con:
con.create_function("PARSE_DATE", parse_date, [VARCHAR], DATE) con.create_function("PARSE_DATE", parse_date, [VARCHAR], DATE)
return con.execute( return con.execute(
f""" f"""
WITH tmp_plato AS ( WITH tmp_plato AS (
SELECT SELECT
source, source,
CAST(date AS DATE) AS date, CAST(date AS DATE) AS date,
ean AS id, ean AS id,
_artist AS artist, _artist AS artist,
LOWER(title) AS title, LOWER(title) AS title,
CAST(_date AS DATE) AS release, CAST(_date AS DATE) AS release,
CAST(_price AS FLOAT) AS price, CAST(_price AS FLOAT) AS price,
CONCAT('https://www.platomania.nl', url) AS url, CONCAT('https://www.platomania.nl', url) AS url,
FROM read_parquet('{storage_dir}/{asset_key}/*/plato.parquet', union_by_name = true) FROM read_parquet('{storage_dir}/{asset_key}/*/plato.parquet', union_by_name = true)
), tmp_sounds AS ( ), tmp_sounds AS (
SELECT
source,
date,
id,
LOWER(TRIM(COALESCE(artist, SPLIT(name, '-')[1]))) AS artist,
LOWER(TRIM(COALESCE(title, ARRAY_TO_STRING(split(name, '-')[2:], '-')))) AS title,
PARSE_DATE(release) AS release,
CAST(price AS FLOAT) AS price,
CONCAT('https://www.sounds.nl/detail/', id) AS url
FROM read_parquet('{storage_dir}/{asset_key}/*/sounds.parquet', union_by_name = true)
), tmp_both AS (
SELECT * FROM tmp_plato UNION ALL SELECT * FROM tmp_sounds
), tmp_rn AS (
SELECT
*,
ROW_NUMBER() OVER(PARTITION BY source, id, artist, title, price ORDER BY date DESC) as rn
FROM tmp_both
)
SELECT SELECT
source, source,
date, date,
id, id,
LOWER(TRIM(COALESCE(artist, SPLIT(name, '-')[1]))) AS artist, artist,
LOWER(TRIM(COALESCE(title, ARRAY_TO_STRING(split(name, '-')[2:], '-')))) AS title, title,
PARSE_DATE(release) AS release, release,
CAST(price AS FLOAT) AS price, price,
CONCAT('https://www.sounds.nl/detail/', id) AS url url
FROM read_parquet('{storage_dir}/{asset_key}/*/sounds.parquet', union_by_name = true) FROM tmp_rn
), tmp_both AS ( WHERE rn = 1
SELECT * FROM tmp_plato UNION ALL SELECT * FROM tmp_sounds ORDER BY date ASC
), tmp_rn AS ( """
SELECT ).pl()
*,
ROW_NUMBER() OVER(PARTITION BY source, id, artist, title, price ORDER BY date DESC) as rn
FROM tmp_both
)
SELECT
source,
date,
id,
artist,
title,
release,
price,
url
FROM tmp_rn
WHERE rn = 1
ORDER BY date ASC
"""
).df()
@asset( @asset(
# deps=[new_deals], io_manager_key="polars_parquet_io_manager",
io_manager_key="duckdb_io_manager"
) )
def works(new_deals: pd.DataFrame) -> pd.DataFrame: def works(new_deals: pl.DataFrame) -> pl.DataFrame:
return new_deals[["artist", "title"]].drop_duplicates() # Pandas
# with duckdb.get_connection() as con: # columns = ["artist", "title"]
# return con.execute( # return pl.from_pandas(new_deals[columns].to_pandas().drop_duplicates())
# "SELECT DISTINCT artist, title, release FROM vinyl.public.new_deals"
# ).df() # Polars
# return new_deals[columns].unique(subset=columns)
# DuckDB
with duckdb.connect() as con:
return con.execute("SELECT DISTINCT artist, title, release FROM new_deals").pl()

View File

@@ -23,8 +23,7 @@ vinyl = Definitions(
assets=[deals, new_deals, works], assets=[deals, new_deals, works],
resources={ resources={
"polars_parquet_io_manager": PolarsParquetIOManager(), "polars_parquet_io_manager": PolarsParquetIOManager(),
"duckdb_io_manager": PandasDuckDBIOManager(database="vinyl.duckdb"), "duckdb_io_manager": PandasDuckDBIOManager(database="vinyl"),
# "duckdb": DuckDBResource(database="vinyl.duckdb")
}, },
jobs=[deals_job, check_partititions_job, musicbrainz_lookup_job], jobs=[deals_job, check_partititions_job, musicbrainz_lookup_job],
schedules=[deals_schedule], schedules=[deals_schedule],