collect unique works from deals dumps

This commit is contained in:
2024-10-14 15:08:07 +02:00
parent 6f2c4d1249
commit 1e0528bdfb
9 changed files with 164 additions and 33 deletions

View File

@@ -27,8 +27,6 @@ appdirs==1.4.4
# via pint # via pint
asttokens==2.4.1 asttokens==2.4.1
# via icecream # via icecream
async-timeout==4.0.3
# via aiohttp
attrs==24.2.0 attrs==24.2.0
# via aiohttp # via aiohttp
backoff==2.2.1 backoff==2.2.1
@@ -76,12 +74,17 @@ dagster==1.8.9
# via # via
# dagster-aws # dagster-aws
# dagster-docker # dagster-docker
# dagster-duckdb
# dagster-duckdb-pandas
# dagster-graphql # dagster-graphql
# dagster-polars # dagster-polars
# dagster-postgres # dagster-postgres
# dagster-webserver # dagster-webserver
dagster-aws==0.24.9 dagster-aws==0.24.9
dagster-docker==0.24.9 dagster-docker==0.24.9
dagster-duckdb==0.24.9
# via dagster-duckdb-pandas
dagster-duckdb-pandas==0.24.9
dagster-graphql==1.8.9 dagster-graphql==1.8.9
# via dagster-webserver # via dagster-webserver
dagster-pipes==1.8.9 dagster-pipes==1.8.9
@@ -99,14 +102,13 @@ docker-image-py==0.1.13
docstring-parser==0.16 docstring-parser==0.16
# via dagster # via dagster
duckdb==1.1.1 duckdb==1.1.1
# via dagster-duckdb
durationpy==0.8 durationpy==0.8
# via kubernetes # via kubernetes
email-validator==2.2.0 email-validator==2.2.0
# via pydantic # via pydantic
et-xmlfile==1.1.0 et-xmlfile==1.1.0
# via openpyxl # via openpyxl
exceptiongroup==1.2.2
# via anyio
executing==2.1.0 executing==2.1.0
# via icecream # via icecream
fastapi==0.115.0 fastapi==0.115.0
@@ -218,6 +220,7 @@ packaging==24.1
# pyogrio # pyogrio
pandas==2.2.3 pandas==2.2.3
# via # via
# dagster-duckdb-pandas
# fastparquet # fastparquet
# geopandas # geopandas
# pint-pandas # pint-pandas
@@ -351,19 +354,16 @@ tqdm==4.66.5
typing-extensions==4.12.2 typing-extensions==4.12.2
# via # via
# alembic # alembic
# anyio
# dagster # dagster
# dagster-polars # dagster-polars
# fastapi # fastapi
# flexcache # flexcache
# flexparser # flexparser
# multidict
# pint # pint
# pydantic # pydantic
# pydantic-core # pydantic-core
# reactivex # reactivex
# sqlalchemy # sqlalchemy
# uvicorn
tzdata==2024.2 tzdata==2024.2
# via pandas # via pandas
universal-pathlib==0.2.5 universal-pathlib==0.2.5

View File

@@ -22,6 +22,7 @@ run_launcher:
# - /opt/dagster/storage/:/opt/dagster/home/storage/ # - /opt/dagster/storage/:/opt/dagster/home/storage/
- /opt/dagster/storage/import/:/opt/dagster/home/storage/import/ - /opt/dagster/storage/import/:/opt/dagster/home/storage/import/
- /opt/dagster/storage/deals/:/opt/dagster/home/storage/deals/ - /opt/dagster/storage/deals/:/opt/dagster/home/storage/deals/
- /opt/dagster/storage/:/opt/dagster/home/storage/
run_storage: run_storage:
module: dagster_postgres.run_storage module: dagster_postgres.run_storage

View File

@@ -57,6 +57,8 @@ dagster = [
"dagster-docker", "dagster-docker",
"dagster-aws", "dagster-aws",
"dagster-polars", "dagster-polars",
"dagster-duckdb",
"dagster-duckdb-pandas",
"dagit" "dagit"
] ]

View File

@@ -20,8 +20,6 @@ appdirs==1.4.4
# via pint # via pint
asttokens==2.4.1 asttokens==2.4.1
# via icecream # via icecream
async-timeout==4.0.3
# via aiohttp
attrs==24.2.0 attrs==24.2.0
# via aiohttp # via aiohttp
beautifulsoup4==4.12.3 beautifulsoup4==4.12.3
@@ -62,8 +60,6 @@ email-validator==2.2.0
# via pydantic # via pydantic
et-xmlfile==1.1.0 et-xmlfile==1.1.0
# via openpyxl # via openpyxl
exceptiongroup==1.2.2
# via anyio
executing==2.1.0 executing==2.1.0
# via icecream # via icecream
fastapi==0.115.0 fastapi==0.115.0
@@ -217,16 +213,13 @@ starlette==0.38.6
structlog==24.4.0 structlog==24.4.0
typing-extensions==4.12.2 typing-extensions==4.12.2
# via # via
# anyio
# fastapi # fastapi
# flexcache # flexcache
# flexparser # flexparser
# multidict
# pint # pint
# pydantic # pydantic
# pydantic-core # pydantic-core
# reactivex # reactivex
# uvicorn
tzdata==2024.2 tzdata==2024.2
# via pandas # via pandas
urllib3==2.2.3 urllib3==2.2.3

View File

@@ -1,13 +1,16 @@
from datetime import datetime from datetime import datetime
from glob import glob from glob import glob
import duckdb
import pandas as pd
import polars as pl import polars as pl
import structlog import structlog
from duckdb.typing import DATE, VARCHAR
from app.vinyl.plato.fetch import scrape_plato from app.vinyl.plato.fetch import scrape_plato
from app.vinyl.sounds.fetch import fetch_deals from app.vinyl.sounds.fetch import fetch_deals
from app.vinyl.utils import parse_date
from dagster import ( from dagster import (
AssetIn,
DailyPartitionsDefinition, DailyPartitionsDefinition,
DimensionPartitionMapping, DimensionPartitionMapping,
Failure, Failure,
@@ -15,6 +18,7 @@ from dagster import (
IdentityPartitionMapping, IdentityPartitionMapping,
MultiPartitionMapping, MultiPartitionMapping,
MultiPartitionsDefinition, MultiPartitionsDefinition,
OpExecutionContext,
StaticPartitionsDefinition, StaticPartitionsDefinition,
TimeWindowPartitionMapping, TimeWindowPartitionMapping,
asset, asset,
@@ -106,16 +110,73 @@ def deals(context):
) )
@asset( @asset(deps=[deals], io_manager_key="duckdb_io_manager")
partitions_def=partitions_def, def new_deals(
ins={"asset_multi_1": AssetIn(partition_mapping=partition_mapping)}, context: OpExecutionContext,
) # duckdb: DuckDBResource
def new_deals(context, asset_multi_1): ) -> pd.DataFrame:
ic() ic()
ic(context.partition_key) storage_dir = context.instance.storage_directory()
ic(context.partition_key.keys_by_dimension) asset_key = "deals"
ic(asset_multi_1)
partition_key = context.asset_partition_key_for_output() with duckdb.connect() as con:
ic(partition_key) # with duckdb.get_connection() as con:
return f"Processed data for {partition_key}" con.create_function("PARSE_DATE", parse_date, [VARCHAR], DATE)
return con.execute(
f"""
WITH tmp_plato AS (
SELECT
source,
CAST(date AS DATE) AS date,
ean AS id,
_artist AS artist,
LOWER(title) AS title,
CAST(_date AS DATE) AS release,
CAST(_price AS FLOAT) AS price,
CONCAT('https://www.platomania.nl', url) AS url,
FROM read_parquet('{storage_dir}/{asset_key}/*/plato.parquet', union_by_name = true)
), tmp_sounds AS (
SELECT
source,
date,
id,
LOWER(TRIM(COALESCE(artist, SPLIT(name, '-')[1]))) AS artist,
LOWER(TRIM(COALESCE(title, ARRAY_TO_STRING(split(name, '-')[2:], '-')))) AS title,
PARSE_DATE(release) AS release,
CAST(price AS FLOAT) AS price,
CONCAT('https://www.sounds.nl/detail/', id) AS url
FROM read_parquet('{storage_dir}/{asset_key}/*/sounds.parquet', union_by_name = true)
), tmp_both AS (
SELECT * FROM tmp_plato UNION ALL SELECT * FROM tmp_sounds
), tmp_rn AS (
SELECT
*,
ROW_NUMBER() OVER(PARTITION BY source, id, artist, title, price ORDER BY date DESC) as rn
FROM tmp_both
)
SELECT
source,
date,
id,
artist,
title,
release,
price,
url
FROM tmp_rn
WHERE rn = 1
ORDER BY date ASC
"""
).df()
@asset(
# deps=[new_deals],
io_manager_key="duckdb_io_manager"
)
def works(new_deals: pd.DataFrame) -> pd.DataFrame:
return new_deals[["artist", "title"]].drop_duplicates()
# with duckdb.get_connection() as con:
# return con.execute(
# "SELECT DISTINCT artist, title, release FROM vinyl.public.new_deals"
# ).df()

View File

@@ -7,7 +7,7 @@ from dagster import (
op, op,
) )
from .assets import deals from .assets import deals, new_deals, works
deals_job = define_asset_job( deals_job = define_asset_job(
"deals_job", selection=[deals], partitions_def=deals.partitions_def "deals_job", selection=[deals], partitions_def=deals.partitions_def
@@ -51,3 +51,8 @@ def check_partititions(context: OpExecutionContext):
@job @job
def check_partititions_job(): def check_partititions_job():
check_partititions() check_partititions()
musicbrainz_lookup_job = define_asset_job(
"musicbrainz_lookup_job", selection=[works, new_deals]
)

View File

@@ -1,13 +1,32 @@
from dagster import Definitions from collections.abc import Sequence
from dagster_duckdb import DuckDBIOManager
from dagster_duckdb.io_manager import DbTypeHandler
from dagster_duckdb_pandas import DuckDBPandasTypeHandler
from dagster_polars import PolarsParquetIOManager from dagster_polars import PolarsParquetIOManager
from .assets import deals from dagster import Definitions
from .jobs import check_partititions_job, deals_job
from .assets import deals, new_deals, works
from .jobs import check_partititions_job, deals_job, musicbrainz_lookup_job
from .schedules import deals_schedule from .schedules import deals_schedule
from .sensors import musicbrainz_lookup_sensor
class PandasDuckDBIOManager(DuckDBIOManager):
@staticmethod
def type_handlers() -> Sequence[DbTypeHandler]:
return [DuckDBPandasTypeHandler()]
vinyl = Definitions( vinyl = Definitions(
assets=[deals], assets=[deals, new_deals, works],
resources={"polars_parquet_io_manager": PolarsParquetIOManager()}, resources={
jobs=[deals_job, check_partititions_job], "polars_parquet_io_manager": PolarsParquetIOManager(),
"duckdb_io_manager": PandasDuckDBIOManager(database="vinyl.duckdb"),
# "duckdb": DuckDBResource(database="vinyl.duckdb")
},
jobs=[deals_job, check_partititions_job, musicbrainz_lookup_job],
schedules=[deals_schedule], schedules=[deals_schedule],
sensors=[musicbrainz_lookup_sensor],
) )

21
src/app/vinyl/sensors.py Normal file
View File

@@ -0,0 +1,21 @@
from app.vinyl.assets import deals
from app.vinyl.jobs import musicbrainz_lookup_job
from dagster import (
DefaultSensorStatus,
EventLogEntry,
RunRequest,
SensorEvaluationContext,
asset_sensor,
)
@asset_sensor(
asset_key=deals.key,
job=musicbrainz_lookup_job,
default_status=DefaultSensorStatus.RUNNING,
)
def musicbrainz_lookup_sensor(
context: SensorEvaluationContext, asset_event: EventLogEntry
):
assert asset_event.dagster_event and asset_event.dagster_event.asset_key
yield RunRequest(run_key=context.cursor)

29
src/app/vinyl/utils.py Normal file
View File

@@ -0,0 +1,29 @@
import datetime
def parse_date(dutch_date: str):
# Create a dictionary to map Dutch month names to English
dutch_to_english_months = {
"januari": "January",
"februari": "February",
"maart": "March",
"april": "April",
"mei": "May",
"juni": "June",
"juli": "July",
"augustus": "August",
"september": "September",
"oktober": "October",
"november": "November",
"december": "December",
}
# Split the date and replace the Dutch month with its English equivalent
day, dutch_month, year = dutch_date.split()
english_month = dutch_to_english_months[dutch_month]
# Rebuild the date string in English format
english_date = f"{day} {english_month} {year}"
# Parse the date using strptime
return datetime.datetime.strptime(english_date, "%d %B %Y").date()