parse platenzaak deals

This commit is contained in:
2025-08-22 10:35:34 +02:00
parent 1d9bd68612
commit 55e8b31223
4 changed files with 112 additions and 22 deletions

View File

@@ -24,3 +24,80 @@ def borg_repos(context: dg.SensorEvaluationContext) -> dg.SensorResult:
borg_repo_partitions_def.build_add_request(new_repos),
],
)
# @dg.sensor(job=jobs.raw_html_job, minimum_interval_seconds=4 * 60 * 60)
# def list_archives(context: dg.SensorEvaluationContext) -> Iterator[dg.RunRequest]:
# ic(context.cursor)
#
# response = requests.get(URL)
# response.raise_for_status()
#
# try:
# date_obj = next(extract_date(response.text))
# date_str = date_obj.strftime("%Y-%m-%d")
# context.log.info(f"Found date: {date_str}")
# if date_str > context.cursor:
# context.update_cursor(date_str)
# yield dg.RunRequest()
# return
# except Exception as e:
# context.log.error(f"Parsing error: {e}")
#
# now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# file = f"{now_str} stocks.html"
# context.log.info(f"Saving file: {file}")
# with open(f"/cache/{file}", "w") as fp:
# fp.write(response.text)
#
#
# @dg.sensor(job=jobs.extract_job, minimum_interval_seconds=2 * 60 * 60)
# def parse_raw(context: dg.SensorEvaluationContext):
# # TODO: use cursor from sensor to filter materialization events
#
# # Get the known materialized partitions of daily_tables
# daily_partitions = context.instance.get_materialized_partitions(
# assets.daily_table.key
# )
# dates = [x.split("|")[0] for x in daily_partitions]
# ic(daily_partitions, dates)
#
# # Get metadata for the raw asset (assumes it's tracked or logged with metadata)
# events = list(
# context.instance.get_event_records(
# event_records_filter=dg.EventRecordsFilter(
# event_type=dg.DagsterEventType.ASSET_MATERIALIZATION,
# asset_key=assets.raw_html.key,
# ),
# limit=100,
# )
# )
#
# # Track unique dates found in raw that are not materialized in daily_tables
# unknown_dates = set()
# for event in events:
# metadata = event.event_log_entry.asset_materialization.metadata
# date_str = None
# ic(metadata)
# for key, entry in metadata.items():
# # TODO: move this general logic
# if key.lower() in {"date", "partition", "partition_date"}:
# date_str = entry.value
# break
# if not date_str:
# continue
#
# # Normalize and validate the date
# try:
# dt = pendulum.from_timestamp(int(date_str))
# date_str = dt.strftime("%Y-%m-%d")
# except Exception as e:
# logger.error(f"Failed to parse date: {date_str}", input=date_str, e=e)
# continue
#
# if date_str not in dates:
# unknown_dates.add(date_str)
#
# ic(unknown_dates)
# for date_str in sorted(unknown_dates):
# yield dg.RunRequest(partition_key=date_str)

View File

@@ -12,6 +12,7 @@ from dagster_polars.patito import patito_model_to_dagster_type
from jinja2 import Environment, FileSystemLoader
from models import Deal
from partitions import daily_partitions_def, multi_partitions_def
from platenzaak.parse import parse as parse_platenzaak
from platenzaak.scrape import scrape as scrape_platenzaak
from plato.parse import parse as parse_plato
from plato.scrape import scrape as scrape_plato
@@ -65,26 +66,24 @@ def deals(context: dg.AssetExecutionContext) -> pl.DataFrame:
logger.error("Failed to load CSV file!", error=e)
raise dg.Failure(f"Cannot materialize for the past: {date.date()}")
if source == "plato":
logger.info("Scraping Plato")
df = scrape_plato()
logger.info("Scraped Plato", rows=len(df), head=df.head().to_markdown())
ic(df.columns)
return pl.from_pandas(df.assign(**partition_key))
if source == "sounds":
logger.info("Scraping Sounds")
df = scrape_sounds()
logger.info("Scraped Sounds", rows=len(df), head=df.head().to_markdown())
ic(df.columns)
return pl.from_pandas(df.assign(**partition_key))
if source == "platenzaak":
logger.info("Scraping Platenzaak")
df = scrape_platenzaak(logger=logger)
logger.info("Scraped Sounds", rows=len(df), head=df.head().to_markdown())
ic(df.columns)
return pl.from_pandas(df.assign(**partition_key))
match source:
case "plato":
logger.info("Scraping Plato")
df = scrape_plato()
logger.info("Scraped Plato", rows=len(df), head=df.head().to_markdown())
case "sounds":
logger.info("Scraping Sounds")
df = scrape_sounds()
logger.info("Scraped Sounds", rows=len(df), head=df.head().to_markdown())
case "platenzaak":
logger.info("Scraping Platenzaak")
df = scrape_platenzaak(logger=logger)
logger.info("Scraped Sounds", rows=len(df), head=df.head().to_markdown())
case _:
raise ValueError(f"Unknown source: {source}!")
raise NotImplementedError(f"No implementation for source {source}")
ic(df.columns)
return pl.from_pandas(df.assign(**partition_key))
@asset(
@@ -111,9 +110,10 @@ def cleaned_deals(
parsed_df = parse_plato(df)
case "sounds":
parsed_df = parse_sounds(df)
case "platenzaak":
parsed_df = parse_platenzaak(df)
case _:
context.log.warning(f"Unknown source: {source}!")
return
raise ValueError(f"Unknown source: {source}!")
ic(parsed_df.collect_schema())

View File

@@ -0,0 +1,13 @@
import polars as pl
def parse(df: pl.LazyFrame) -> pl.LazyFrame:
"""Parse the Platenzaak DataFrame."""
return df.with_columns(
date=pl.col("date").cast(pl.Date),
artist=pl.col("artist").str.strip_chars().str.to_lowercase(),
title=pl.col("album").str.strip_chars().str.to_lowercase(),
release=pl.lit(None),
price=pl.col("current_price").cast(pl.Float64),
url=pl.format("https://platenzaak.nl{}", pl.col("id")),
)

View File

@@ -3,7 +3,7 @@ from utils.parse import parse_date
def parse(df: pl.LazyFrame) -> pl.LazyFrame:
"""Parse the Plato DataFrame."""
"""Parse the Sounds DataFrame."""
return df.with_columns(
date=pl.col("date").cast(pl.Date),
artist=pl.coalesce(pl.col("artist"), pl.col("name").str.split("-").list.get(1))