refactor to allow for multiple code locations

This commit is contained in:
2025-07-20 19:49:30 +02:00
parent 9b8cfabee5
commit fd73e1367c
40 changed files with 161 additions and 628 deletions

181
apps/vinyl/src/assets.py Normal file
View File

@@ -0,0 +1,181 @@
from datetime import datetime
from glob import glob
import duckdb
import polars as pl
import structlog
from duckdb.typing import DATE, VARCHAR
from plato.fetch import scrape_plato
from sounds.fetch import fetch_deals
from utils import parse_date
from dagster import (
DailyPartitionsDefinition,
DimensionPartitionMapping,
Failure,
Field,
IdentityPartitionMapping,
MultiPartitionMapping,
MultiPartitionsDefinition,
OpExecutionContext,
StaticPartitionsDefinition,
TimeWindowPartitionMapping,
asset,
)
SOURCES = ["plato", "sounds"]
logger = structlog.get_logger()
partitions_def = MultiPartitionsDefinition(
{
"date": DailyPartitionsDefinition(start_date="2024-09-01", end_offset=1),
"source": StaticPartitionsDefinition(SOURCES),
}
)
partition_mapping = MultiPartitionMapping(
{
"date": DimensionPartitionMapping(
dimension_name="date",
partition_mapping=TimeWindowPartitionMapping(start_offset=-1, end_offset=0),
),
"source": DimensionPartitionMapping(
dimension_name="source",
partition_mapping=IdentityPartitionMapping(),
),
}
)
@asset(
io_manager_key="polars_parquet_io_manager",
partitions_def=partitions_def,
metadata={
"partition_by": ["date", "source"],
},
config_schema={
"import_dir": Field(str, default_value="/opt/dagster/home/storage/import")
},
)
def deals(context):
ic()
ic(context.partition_key)
ic(context.op_config)
import_dir = context.op_config["import_dir"]
partition_key = context.partition_key.keys_by_dimension
date_str = partition_key["date"]
source = partition_key["source"]
logger.info("Materializing deals", date=date_str, source=source)
date = datetime.strptime(partition_key["date"], "%Y-%m-%d")
days = (date - datetime.today()).days
ic(days)
if days > 0:
raise Failure(f"Cannot materialize for the future: {date.date()}")
if days < -1:
if source == "sounds":
pattern = f"{import_dir}/{date.date()}_*_sounds.csv"
logger.info("Looking for existing CSV files", pattern=pattern)
files = glob(pattern)
if len(files):
file = sorted(files)[-1]
logger.info("Using existing CSV file", file=file)
try:
df = pl.read_csv(file)
logger.info("Loaded CSV file", rows=len(df))
return df.with_columns(
**{k: pl.lit(v) for k, v in partition_key.items()}
)
except Exception as e:
logger.error("Failed to load CSV file!", error=e)
raise Failure(f"Cannot materialize for the past: {date.date()}")
if source == "plato":
logger.info("Scraping Plato")
df = scrape_plato()
logger.info("Scraped Plato", rows=len(df), head=df.head().to_markdown())
ic(df.columns)
return pl.from_pandas(df.assign(**partition_key))
if source == "sounds":
logger.info("Scraping Sounds")
df = fetch_deals()
ic(df.columns)
logger.info("Scraped Sounds", rows=len(df), head=df.head().to_markdown())
return pl.from_pandas(df.assign(**partition_key))
return pl.DataFrame(
[{"date": context.partition_key, "data": f"Data for {context.partition_key}"}]
)
@asset(deps=[deals], io_manager_key="polars_parquet_io_manager")
def new_deals(context: OpExecutionContext) -> pl.DataFrame:
ic()
storage_dir = context.instance.storage_directory()
asset_key = "deals"
with duckdb.connect() as con:
con.create_function("PARSE_DATE", parse_date, [VARCHAR], DATE)
return con.execute(
f"""
WITH tmp_plato AS (
SELECT
source,
CAST(date AS DATE) AS date,
ean AS id,
_artist AS artist,
LOWER(title) AS title,
CAST(_date AS DATE) AS release,
CAST(_price AS FLOAT) AS price,
CONCAT('https://www.platomania.nl', url) AS url,
FROM read_parquet('{storage_dir}/{asset_key}/*/plato.parquet', union_by_name = true)
), tmp_sounds AS (
SELECT
source,
date,
id,
LOWER(TRIM(COALESCE(artist, SPLIT(name, '-')[1]))) AS artist,
LOWER(TRIM(COALESCE(title, ARRAY_TO_STRING(split(name, '-')[2:], '-')))) AS title,
PARSE_DATE(release) AS release,
CAST(price AS FLOAT) AS price,
CONCAT('https://www.sounds.nl/detail/', id) AS url
FROM read_parquet('{storage_dir}/{asset_key}/*/sounds.parquet', union_by_name = true)
), tmp_both AS (
SELECT * FROM tmp_plato UNION ALL SELECT * FROM tmp_sounds
), tmp_rn AS (
SELECT
*,
ROW_NUMBER() OVER(PARTITION BY source, id, artist, title, price ORDER BY date DESC) as rn
FROM tmp_both
)
SELECT
source,
date,
id,
artist,
title,
release,
price,
url
FROM tmp_rn
WHERE rn = 1
ORDER BY date ASC
"""
).pl()
@asset(
io_manager_key="polars_parquet_io_manager",
)
def works(new_deals: pl.DataFrame) -> pl.DataFrame:
# Pandas
# columns = ["artist", "title"]
# return pl.from_pandas(new_deals[columns].to_pandas().drop_duplicates())
# Polars
# return new_deals[columns].unique(subset=columns)
# DuckDB
with duckdb.connect() as con:
return con.execute("SELECT DISTINCT artist, title, release FROM new_deals").pl()