diff --git a/apps/vinyl/src/assets.py b/apps/vinyl/src/assets.py
index 5bdc893..108e6a8 100644
--- a/apps/vinyl/src/assets.py
+++ b/apps/vinyl/src/assets.py
@@ -12,17 +12,19 @@ from dagster_polars.patito import patito_model_to_dagster_type
from jinja2 import Environment, FileSystemLoader
from models import Deal
from partitions import daily_partitions_def, multi_partitions_def
+from platenzaak.scrape import scrape as scrape_platenzaak
from plato.parse import parse as parse_plato
from plato.scrape import scrape as scrape_plato
from shared.utils import get_partition_keys, parse_partition_keys
from sounds.parse import parse as parse_sounds
from sounds.scrape import scrape as scrape_sounds
+from structlog.stdlib import BoundLogger
from utils.email import EmailService
import dagster as dg
asset = partial(dg.asset, key_prefix=APP)
-logger = structlog.get_logger()
+logger: BoundLogger = structlog.get_logger()
@asset(
@@ -75,10 +77,14 @@ def deals(context: dg.AssetExecutionContext) -> pl.DataFrame:
logger.info("Scraped Sounds", rows=len(df), head=df.head().to_markdown())
ic(df.columns)
return pl.from_pandas(df.assign(**partition_key))
+ if source == "platenzaak":
+ logger.info("Scraping Platenzaak")
+ df = scrape_platenzaak(logger=logger)
+ logger.info("Scraped Sounds", rows=len(df), head=df.head().to_markdown())
+ ic(df.columns)
+ return pl.from_pandas(df.assign(**partition_key))
- return pl.DataFrame(
- [{"date": context.partition_key, "data": f"Data for {context.partition_key}"}]
- )
+ raise NotImplementedError(f"No implementation for source {source}")
@asset(
diff --git a/apps/vinyl/src/partitions.py b/apps/vinyl/src/partitions.py
index 9d0e0c6..b856100 100644
--- a/apps/vinyl/src/partitions.py
+++ b/apps/vinyl/src/partitions.py
@@ -2,7 +2,7 @@ import os
import dagster as dg
-SOURCES = ["plato", "sounds"]
+SOURCES = ["plato", "sounds", "platenzaak"]
daily_partitions_def = dg.DailyPartitionsDefinition(
start_date="2024-09-01", end_offset=1, timezone=os.environ.get("TZ", "UTC")
)
diff --git a/apps/vinyl/src/platenzaak/__init__.py b/apps/vinyl/src/platenzaak/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/apps/vinyl/src/platenzaak/scrape.py b/apps/vinyl/src/platenzaak/scrape.py
new file mode 100644
index 0000000..f245960
--- /dev/null
+++ b/apps/vinyl/src/platenzaak/scrape.py
@@ -0,0 +1,90 @@
+from collections.abc import Iterator
+
+import pandas as pd
+import requests
+from bs4 import BeautifulSoup
+from structlog.stdlib import BoundLogger
+
+
+def parse_price(price_block):
+ """
+ Convert a price block like:
+ € 3099
+ into a float: 30.99
+ """
+ if not price_block:
+ return None
+
+ # Extract the main number (before )
+ main = price_block.find(string=True, recursive=False)
+ main = main.strip().replace("€", "").replace(",", ".").strip()
+
+ # Extract the part (cents)
+ sup = price_block.find("sup")
+ cents = sup.get_text(strip=True) if sup else "00"
+
+ try:
+ return float(f"{main}.{cents}")
+ except ValueError:
+ return None
+
+
+def parse_page(html) -> Iterator[dict]:
+ soup = BeautifulSoup(html, "html.parser")
+
+ for block in soup.select("div.product-block__inner"):
+ # Wishlist button holds most metadata
+ wishlist = block.select_one("[data-wlh-id]")
+ if not wishlist:
+ continue
+
+ product = {
+ "id": wishlist.get("data-wlh-id"),
+ "variant_id": wishlist.get("data-wlh-variantid"),
+ "name": wishlist.get("data-wlh-name"),
+ "price": wishlist.get("data-wlh-price"),
+ "url": wishlist.get("data-wlh-link"),
+ "image": wishlist.get("data-wlh-image"),
+ }
+
+ # Artist + Title (in the title link)
+ title_block = block.select_one(".product-block__title-price .title")
+ if title_block:
+ artist = title_block.find("span")
+ if artist:
+ product["artist"] = artist.get_text(strip=True)
+ # The text after
is the album title
+ product["album"] = (
+ title_block.get_text(separator="|").split("|")[-1].strip()
+ )
+
+ # Current price (might include discounts)
+ price_block = block.select_one(".price .amount")
+ product["current_price"] = parse_price(price_block)
+
+ # Original price if on sale
+ old_price_block = block.select_one(".price del .theme-money")
+ product["original_price"] = parse_price(old_price_block)
+
+ # Sale label
+ sale_label = block.select_one(".product-label--sale")
+ product["on_sale"] = bool(sale_label)
+
+ yield product
+
+
+def scrape(logger: BoundLogger) -> pd.DataFrame:
+ page = 1
+ products = []
+ while True:
+ response = requests.get(
+ f"https://www.platenzaak.nl/collections/sale?filter.p.m.custom.config_group=Vinyl&page={page}"
+ )
+ response.raise_for_status()
+ page_products = list(parse_page(response.text))
+ logger.info("Scraped page", page=page, products=len(page_products))
+ if not page_products:
+ break
+ products.extend(page_products)
+ page += 1
+ return pd.DataFrame(products)