Compare commits

...

26 Commits

Author SHA1 Message Date
87924620fd move storage dir 2026-01-08 15:52:11 +01:00
b15aaaa0dc update stocks scraper 2025-11-09 18:30:10 +01:00
3f99f354de align raw weather output files 2025-10-29 13:59:21 +01:00
204565118b run request for 3 new locations 2025-10-29 11:17:12 +01:00
aa4a2fa5b1 add all new locations 2025-10-29 11:11:26 +01:00
af913e258a test ssh access 2025-10-29 10:15:57 +01:00
7a8f15b1d6 remove sample code 2025-10-29 10:15:35 +01:00
e9ad1677ef warn for missing partitions 2025-10-29 10:15:10 +01:00
2a4da9abb9 adjust port mapping 2025-10-29 10:12:37 +01:00
a9b9197150 adjust automation condition 2025-10-29 10:12:21 +01:00
883ecf86be adjust deals job 2025-09-23 19:22:49 +02:00
7a600f6264 cast release column 2025-09-23 19:17:14 +02:00
127a773c82 allow empty release 2025-09-23 19:11:55 +02:00
67a7e2dacf add logger for borg 2025-09-15 16:56:38 +02:00
fc6f120c53 pin some packages 2025-08-24 09:45:23 +02:00
55e8b31223 parse platenzaak deals 2025-08-22 10:35:34 +02:00
1d9bd68612 scrape platenzaak 2025-08-22 10:22:16 +02:00
e0cda85d20 unused snippet 2025-08-22 09:43:22 +02:00
da55030498 move stocks specific Dockerfile 2025-08-22 09:43:10 +02:00
316fe03be9 add backup repo to dagster and docker files 2025-08-22 09:42:19 +02:00
bf537c86a4 inventory of borg backups 2025-08-22 09:41:08 +02:00
d2e34bca1c inventory of borg backups 2025-08-22 09:40:30 +02:00
65593e5421 lint 2025-08-17 17:49:01 +02:00
4242638818 refactor 2025-08-16 13:56:18 +02:00
4593b97bc2 linting 2025-08-16 13:49:41 +02:00
a0a0bbd110 refactor 2025-08-16 13:48:34 +02:00
36 changed files with 588 additions and 382 deletions

11
apps/backup/Dockerfile Normal file
View File

@@ -0,0 +1,11 @@
FROM dagster-code-backup-base
RUN apt-get update \
&& apt-get install --no-install-recommends --yes \
borgbackup openssh-client \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*
RUN mkdir -p /root/.ssh && chmod 0700 /root/.ssh/
COPY --chmod=0600 id_rsa /root/.ssh/
ADD --chmod=0600 ssh_config /root/.ssh/config

146
apps/backup/src/assets.py Normal file
View File

@@ -0,0 +1,146 @@
import json
import os
import subprocess
import sys
from datetime import datetime
from functools import partial
from zoneinfo import ZoneInfo
import structlog
from config import APP, BORG_HOST, BORG_ROOT
from partitions import borg_repo_partitions_def, daily_partitions_def
from shared.utils import get_partition_keys
import dagster as dg
asset = partial(dg.asset, key_prefix=APP)
logger = structlog.get_logger()
@asset(
partitions_def=dg.MultiPartitionsDefinition(
{
"date": daily_partitions_def,
"repo": borg_repo_partitions_def,
}
)
)
def borg_archive(context: dg.AssetExecutionContext) -> None:
pass
@asset(
deps=[borg_archive],
partitions_def=dg.MultiPartitionsDefinition(
{
"date": daily_partitions_def,
"repo": borg_repo_partitions_def,
}
),
automation_condition=dg.AutomationCondition.eager(),
)
def borg_archive_info(context: dg.AssetExecutionContext) -> dg.Output[None]:
partition_keys = get_partition_keys(context)
ic(partition_keys)
location = f"ssh://{BORG_HOST}{BORG_ROOT}{partition_keys['repo']}::{partition_keys['date']}"
ic(location)
try:
result = subprocess.run(
["borg", "info", "--json", location],
capture_output=True,
text=True,
check=True,
env={"BORG_UNKNOWN_UNENCRYPTED_REPO_ACCESS_IS_OK": "yes"},
)
except subprocess.CalledProcessError as e:
logger.error("borg list failed", exc_info=e, code=e.returncode)
sys.stderr.write("borg list failed\n" + e.stderr)
data = json.loads(result.stdout)
ic(data)
tmp = data["archives"][0]
def parse_date(date_str, tz: str | None = None) -> dg.MetadataValue.timestamp:
return dg.MetadataValue.timestamp(
datetime.fromisoformat(date_str).replace(
tzinfo=ZoneInfo(tz or os.environ.get("TZ", "CET"))
)
)
return dg.Output(
None,
metadata={
"start": parse_date(tmp["start"]),
"end": parse_date(tmp["end"]),
"duration": dg.MetadataValue.float(tmp["duration"]),
"compressed_size": dg.MetadataValue.int(tmp["stats"]["compressed_size"]),
"deduplicated_size": dg.MetadataValue.int(
tmp["stats"]["deduplicated_size"]
),
"nfiles": dg.MetadataValue.int(tmp["stats"]["nfiles"]),
"original_size": dg.MetadataValue.int(tmp["stats"]["original_size"]),
},
)
# now run borg info ssh://shuttle/mnt/yotta/xenon/borg/opt/::2025-07-27 --json and register info
@asset(
partitions_def=borg_repo_partitions_def,
)
def borg_repo(context: dg.AssetExecutionContext) -> None:
location = f"ssh://{BORG_HOST}{BORG_ROOT}{context.partition_key}"
ic(location)
repo = context.partition_key
# Get Borg backup list
try:
result = subprocess.run(
["borg", "list", "--json", location],
capture_output=True,
text=True,
check=True,
env={"BORG_UNKNOWN_UNENCRYPTED_REPO_ACCESS_IS_OK": "yes"},
)
except subprocess.CalledProcessError as e:
logger.error("borg list failed", exc_info=e, code=e.returncode)
sys.stderr.write("borg list failed\n" + e.stderr)
data = json.loads(result.stdout)
ic(data)
for entry in data.get("archives", []):
partition = f"{entry['archive']}|{repo}"
context.log_event(
dg.AssetMaterialization(
asset_key=borg_archive.key,
partition=partition,
metadata={
"id": dg.MetadataValue.text(entry["id"]),
},
)
)
# context.
# snapshots = data.get("archives", [])
#
# # Find latest backup for this day
# match = next(
# (s for s in reversed(snapshots)
# if datetime.fromisoformat(s["end"]).date() == expected_date),
# None
# )
#
# if match:
# context.log_event(
# dg.AssetMaterialization(
# asset_key=one.key, partition="2025-07-27"
# ) # this works!
# )
#
# return {
# "name": match["name"],
# "end": match["end"],
# "size": match.get("size", 0)
# }
# else:
# raise Exception(f"No backup found for {expected_date}")

View File

@@ -0,0 +1,7 @@
import os
from pathlib import Path
APP = os.environ.get("APP", Path(__file__).parent.parent.name)
BORG_HOST = "backup"
BORG_ROOT: str = "/mnt/yotta/xenon/borg/"

View File

@@ -0,0 +1,22 @@
import assets
import sensors
from config import APP
from icecream import install
import dagster as dg
install()
definitions = dg.Definitions(
assets=[
asset.with_attributes(
group_names_by_key={asset.key: APP},
tags_by_key={asset.key: {"app": APP}},
)
for asset in dg.load_assets_from_modules([assets])
],
resources={},
jobs=[],
schedules=[],
sensors=[sensors.borg_repos],
)

0
apps/backup/src/jobs.py Normal file
View File

View File

@@ -0,0 +1,8 @@
import os
import dagster as dg
borg_repo_partitions_def = dg.DynamicPartitionsDefinition(name="borg_repo")
daily_partitions_def = dg.DailyPartitionsDefinition(
start_date="2025-01-01", end_offset=1, timezone=os.environ.get("TZ", "UTC")
)

View File

@@ -0,0 +1,26 @@
import structlog
from partitions import borg_repo_partitions_def
from utils.borg import get_ssh_client, list_repos
import dagster as dg
logger = structlog.get_logger()
@dg.sensor()
def borg_repos(context: dg.SensorEvaluationContext) -> dg.SensorResult:
existing_repos = set(
context.instance.get_dynamic_partitions(borg_repo_partitions_def.name)
)
with get_ssh_client() as client:
parent = "/mnt/yotta/xenon/borg/"
repos = set(list_repos(client, parent))
new_repos = list(set(repos) - existing_repos)
return dg.SensorResult(
# run_requests=[dg.RunRequest(partition_key=repo) for repo in new_repos],
dynamic_partitions_requests=[
borg_repo_partitions_def.build_add_request(new_repos),
],
)

11
apps/backup/src/test.py Normal file
View File

@@ -0,0 +1,11 @@
import structlog
from utils.borg import get_ssh_client, list_repos
logger = structlog.get_logger()
if __name__ == "__main__":
with get_ssh_client() as client:
parent = "/mnt/yotta/xenon/borg/"
repos = set(list_repos(client, parent))
print(repos)

View File

View File

@@ -0,0 +1,59 @@
from collections.abc import Iterator
from configparser import ConfigParser
from contextlib import contextmanager
from io import StringIO
from pathlib import Path
import paramiko
import structlog
logger = structlog.get_logger(__name__)
@contextmanager
def get_ssh_client():
ssh_config_file = Path.home() / ".ssh/config"
with open(ssh_config_file) as f:
ssh_config = paramiko.SSHConfig()
ssh_config.parse(f)
host_config = ssh_config.lookup("backup") # the host alias in ~/.ssh/config
hostname = host_config.get("hostname", "localhost")
port = int(host_config.get("port", 22))
username = host_config.get("user")
key_filename = host_config.get("identityfile", [None])[0]
# Connect using Paramiko
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(
hostname=hostname, port=port, username=username, key_filename=key_filename
)
yield client
client.close()
def list_repos(client, parent) -> Iterator[str]:
command = f"ls {parent}*/config"
stdin, stdout, stderr = client.exec_command(command)
paths = [line.strip() for line in stdout.readlines()]
sftp = client.open_sftp()
for path in paths:
name = Path(path).parent.name
logger.info("Opening path", name=name)
with sftp.open(path, "r") as f:
try:
content = f.read().decode()
config = ConfigParser()
config.read_file(StringIO(content))
config.get("repository", "version")
yield name
except Exception as e:
logger.warning("Not a borg repository!", e=e)
sftp.close()

4
apps/backup/ssh_config Normal file
View File

@@ -0,0 +1,4 @@
Host backup
HostName rik.veenboer.xyz
User backup
StrictHostKeyChecking no

View File

@@ -8,31 +8,33 @@ async def scrape(url: str) -> str:
await page.goto(url, timeout=60000) await page.goto(url, timeout=60000)
# Wait until at least one toggle button is present # Wait until buttons are available
await page.wait_for_selector(".toggle-btn", timeout=20000) await page.wait_for_selector('div[role="button"][aria-expanded]', timeout=20000)
# Set zoom # Zoom out for full view
await page.evaluate("document.body.style.zoom='50%'") await page.evaluate("document.body.style.zoom='50%'")
# Find all toggle buttons # Find collapsible buttons
toggle_buttons = await page.query_selector_all(".toggle-btn") toggle_buttons = await page.query_selector_all(
print(f"Found {len(toggle_buttons)} toggle buttons") 'div[role="button"][aria-expanded]'
)
print(f"Found {len(toggle_buttons)} expandable buttons")
for i, btn in enumerate(toggle_buttons): for i, btn in enumerate(toggle_buttons):
try: try:
# Ensure it's visible and enabled aria_expanded = await btn.get_attribute("aria-expanded")
if await btn.is_visible() and await btn.is_enabled(): if aria_expanded == "false":
await btn.click() if await btn.is_visible() and await btn.is_enabled():
await page.wait_for_timeout(1000) await btn.click()
await page.wait_for_timeout(1000)
if i == len(toggle_buttons) - 1: if i == len(toggle_buttons) - 1:
break break
# Scroll down gradually # Scroll gradually
scroll_step = 500 scroll_step = 500
total_height = await page.evaluate("() => document.body.scrollHeight") total_height = await page.evaluate("() => document.body.scrollHeight")
current_position = 0 current_position = 0
while current_position < total_height: while current_position < total_height:
await page.evaluate(f"window.scrollTo(0, {current_position});") await page.evaluate(f"window.scrollTo(0, {current_position});")
await page.wait_for_timeout(100) await page.wait_for_timeout(100)
@@ -44,17 +46,14 @@ async def scrape(url: str) -> str:
except Exception as e: except Exception as e:
print(f"Skipped button due to error: {e}") print(f"Skipped button due to error: {e}")
# Get the page content # Capture expanded HTML
page_source = await page.content() page_source = await page.content()
# Close the browser
await browser.close() await browser.close()
# Continue scraping logic here... # Save to file
print("Scraping done")
# Save the page content to a file
with open("/cache/scraped_page.html", "w") as fp: with open("/cache/scraped_page.html", "w") as fp:
fp.write(page_source) fp.write(page_source)
print("Scraping done")
return page_source return page_source

View File

@@ -12,17 +12,20 @@ from dagster_polars.patito import patito_model_to_dagster_type
from jinja2 import Environment, FileSystemLoader from jinja2 import Environment, FileSystemLoader
from models import Deal from models import Deal
from partitions import daily_partitions_def, multi_partitions_def from partitions import daily_partitions_def, multi_partitions_def
from plato.fetch import scrape_plato from platenzaak.parse import parse as parse_platenzaak
from platenzaak.scrape import scrape as scrape_platenzaak
from plato.parse import parse as parse_plato from plato.parse import parse as parse_plato
from plato.scrape import scrape as scrape_plato
from shared.utils import get_partition_keys, parse_partition_keys from shared.utils import get_partition_keys, parse_partition_keys
from sounds.fetch import fetch_deals
from sounds.parse import parse as parse_sounds from sounds.parse import parse as parse_sounds
from sounds.scrape import scrape as scrape_sounds
from structlog.stdlib import BoundLogger
from utils.email import EmailService from utils.email import EmailService
import dagster as dg import dagster as dg
asset = partial(dg.asset, key_prefix=APP) asset = partial(dg.asset, key_prefix=APP)
logger = structlog.get_logger() logger: BoundLogger = structlog.get_logger()
@asset( @asset(
@@ -63,22 +66,24 @@ def deals(context: dg.AssetExecutionContext) -> pl.DataFrame:
logger.error("Failed to load CSV file!", error=e) logger.error("Failed to load CSV file!", error=e)
raise dg.Failure(f"Cannot materialize for the past: {date.date()}") raise dg.Failure(f"Cannot materialize for the past: {date.date()}")
if source == "plato": match source:
logger.info("Scraping Plato") case "plato":
df = scrape_plato() logger.info("Scraping Plato")
logger.info("Scraped Plato", rows=len(df), head=df.head().to_markdown()) df = scrape_plato()
ic(df.columns) logger.info("Scraped Plato", rows=len(df), head=df.head().to_markdown())
return pl.from_pandas(df.assign(**partition_key)) case "sounds":
if source == "sounds": logger.info("Scraping Sounds")
logger.info("Scraping Sounds") df = scrape_sounds()
df = fetch_deals() logger.info("Scraped Sounds", rows=len(df), head=df.head().to_markdown())
ic(df.columns) case "platenzaak":
logger.info("Scraped Sounds", rows=len(df), head=df.head().to_markdown()) logger.info("Scraping Platenzaak")
return pl.from_pandas(df.assign(**partition_key)) df = scrape_platenzaak(logger=logger)
logger.info("Scraped Sounds", rows=len(df), head=df.head().to_markdown())
case _:
raise ValueError(f"Unknown source: {source}!")
return pl.DataFrame( ic(df.columns)
[{"date": context.partition_key, "data": f"Data for {context.partition_key}"}] return pl.from_pandas(df.assign(**partition_key))
)
@asset( @asset(
@@ -105,9 +110,10 @@ def cleaned_deals(
parsed_df = parse_plato(df) parsed_df = parse_plato(df)
case "sounds": case "sounds":
parsed_df = parse_sounds(df) parsed_df = parse_sounds(df)
case "platenzaak":
parsed_df = parse_platenzaak(df)
case _: case _:
context.log.warning(f"Unknown source: {source}!") raise ValueError(f"Unknown source: {source}!")
return
ic(parsed_df.collect_schema()) ic(parsed_df.collect_schema())
@@ -170,9 +176,7 @@ def works(context: dg.AssetExecutionContext) -> Iterator[dg.Output[pl.DataFrame]
}, },
output_required=False, output_required=False,
dagster_type=patito_model_to_dagster_type(Deal), dagster_type=patito_model_to_dagster_type(Deal),
automation_condition=dg.AutomationCondition.on_missing().ignore( automation_condition=dg.AutomationCondition.eager(),
dg.AssetSelection.assets(cleaned_deals.key)
),
) )
def new_deals( def new_deals(
context: dg.AssetExecutionContext, partitions: dict[str, pl.LazyFrame | None] context: dg.AssetExecutionContext, partitions: dict[str, pl.LazyFrame | None]
@@ -192,7 +196,6 @@ def new_deals(
if len(partition_keys := sorted(partitions.keys())) < 2: if len(partition_keys := sorted(partitions.keys())) < 2:
context.log.warning("Not enough partitions to fetch new deals!") context.log.warning("Not enough partitions to fetch new deals!")
return return
before, after = partition_keys[-2:] before, after = partition_keys[-2:]
@@ -214,7 +217,9 @@ def new_deals(
new_df = df_after.join(df_before.select("id"), on="id", how="anti").collect() new_df = df_after.join(df_before.select("id"), on="id", how="anti").collect()
if new_df.height: if new_df.height:
context.log.info(f"New deals found ({new_df.height}x)!") context.log.info(f"New deals found ({new_df.height}x)!")
yield dg.Output(Deal.DataFrame(new_df)) yield dg.Output(
Deal.DataFrame(new_df.with_columns(pl.col("release").cast(pl.Date)))
)
else: else:
context.log.info("No new deals found!") context.log.info("No new deals found!")
@@ -227,7 +232,9 @@ def new_deals(
}, },
ins={"partitions": dg.AssetIn(key=new_deals.key)}, ins={"partitions": dg.AssetIn(key=new_deals.key)},
output_required=False, output_required=False,
automation_condition=dg.AutomationCondition.eager(), automation_condition=dg.AutomationCondition.eager().without(
~dg.AutomationCondition.any_deps_missing()
),
) )
def good_deals( def good_deals(
context: dg.AssetExecutionContext, context: dg.AssetExecutionContext,
@@ -237,6 +244,9 @@ def good_deals(
parsed_partition_keys = parse_partition_keys(context, "partitions") parsed_partition_keys = parse_partition_keys(context, "partitions")
ic(parsed_partition_keys) ic(parsed_partition_keys)
if not partitions:
logger.warning("Partitions are empty!")
return
df = pl.concat(partitions.values(), how="vertical_relaxed").collect() df = pl.concat(partitions.values(), how="vertical_relaxed").collect()
counts = dict(df.group_by("source").len().iter_rows()) counts = dict(df.group_by("source").len().iter_rows())

View File

@@ -12,8 +12,8 @@ define_asset_job = partial(dg.define_asset_job, **kwargs)
deals_job = dg.define_asset_job( deals_job = dg.define_asset_job(
"deals_job", "deals_job",
selection=[assets.deals.key], selection=dg.AssetSelection.assets(assets.new_deals.key).upstream(),
partitions_def=assets.deals.partitions_def, partitions_def=assets.new_deals.partitions_def,
) )

View File

@@ -10,5 +10,7 @@ class Deal(pt.Model):
title: str = pt.Field(description="Title of the deal.") title: str = pt.Field(description="Title of the deal.")
url: str = pt.Field(description="URL to the deal.") url: str = pt.Field(description="URL to the deal.")
date: datetime.date = pt.Field(description="Day the deal was listed.") date: datetime.date = pt.Field(description="Day the deal was listed.")
release: datetime.date = pt.Field(description="Release date.") release: datetime.date | None = pt.Field(
description="Release date.", allow_missing=True
)
price: float = pt.Field(description="Price of the deal in EUR.") price: float = pt.Field(description="Price of the deal in EUR.")

View File

@@ -2,7 +2,7 @@ import os
import dagster as dg import dagster as dg
SOURCES = ["plato", "sounds"] SOURCES = ["plato", "sounds", "platenzaak"]
daily_partitions_def = dg.DailyPartitionsDefinition( daily_partitions_def = dg.DailyPartitionsDefinition(
start_date="2024-09-01", end_offset=1, timezone=os.environ.get("TZ", "UTC") start_date="2024-09-01", end_offset=1, timezone=os.environ.get("TZ", "UTC")
) )

View File

View File

@@ -0,0 +1,13 @@
import polars as pl
def parse(df: pl.LazyFrame) -> pl.LazyFrame:
"""Parse the Platenzaak DataFrame."""
return df.with_columns(
date=pl.col("date").cast(pl.Date),
artist=pl.col("artist").str.strip_chars().str.to_lowercase(),
title=pl.col("album").str.strip_chars().str.to_lowercase(),
release=pl.lit(None),
price=pl.col("current_price").cast(pl.Float64),
url=pl.format("https://platenzaak.nl{}", pl.col("id")),
)

View File

@@ -0,0 +1,90 @@
from collections.abc import Iterator
import pandas as pd
import requests
from bs4 import BeautifulSoup
from structlog.stdlib import BoundLogger
def parse_price(price_block):
"""
Convert a price block like:
<span class="amount theme-money">€ 30<sup>99</sup></span>
into a float: 30.99
"""
if not price_block:
return None
# Extract the main number (before <sup>)
main = price_block.find(string=True, recursive=False)
main = main.strip().replace("", "").replace(",", ".").strip()
# Extract the <sup> part (cents)
sup = price_block.find("sup")
cents = sup.get_text(strip=True) if sup else "00"
try:
return float(f"{main}.{cents}")
except ValueError:
return None
def parse_page(html) -> Iterator[dict]:
soup = BeautifulSoup(html, "html.parser")
for block in soup.select("div.product-block__inner"):
# Wishlist button holds most metadata
wishlist = block.select_one("[data-wlh-id]")
if not wishlist:
continue
product = {
"id": wishlist.get("data-wlh-id"),
"variant_id": wishlist.get("data-wlh-variantid"),
"name": wishlist.get("data-wlh-name"),
"price": wishlist.get("data-wlh-price"),
"url": wishlist.get("data-wlh-link"),
"image": wishlist.get("data-wlh-image"),
}
# Artist + Title (in the title link)
title_block = block.select_one(".product-block__title-price .title")
if title_block:
artist = title_block.find("span")
if artist:
product["artist"] = artist.get_text(strip=True)
# The text after <br> is the album title
product["album"] = (
title_block.get_text(separator="|").split("|")[-1].strip()
)
# Current price (might include discounts)
price_block = block.select_one(".price .amount")
product["current_price"] = parse_price(price_block)
# Original price if on sale
old_price_block = block.select_one(".price del .theme-money")
product["original_price"] = parse_price(old_price_block)
# Sale label
sale_label = block.select_one(".product-label--sale")
product["on_sale"] = bool(sale_label)
yield product
def scrape(logger: BoundLogger) -> pd.DataFrame:
page = 1
products = []
while True:
response = requests.get(
f"https://www.platenzaak.nl/collections/sale?filter.p.m.custom.config_group=Vinyl&page={page}"
)
response.raise_for_status()
page_products = list(parse_page(response.text))
logger.info("Scraped page", page=page, products=len(page_products))
if not page_products:
break
products.extend(page_products)
page += 1
return pd.DataFrame(products)

View File

@@ -1,154 +0,0 @@
import os
import boto3
import pandas as pd
from botocore.exceptions import NoCredentialsError, PartialCredentialsError
from dotenv import load_dotenv
from fetch import scrape_plato
from utils import get
def update_database(articles_df=None, database_file="/home/user/plato.parquet"):
if os.path.exists(database_file):
database_df = pd.read_parquet(database_file)
else:
database_df = None
if articles_df is None:
new_df = None if database_df is None else database_df.head(0)
else:
if database_df is None:
articles_df.to_parquet(database_file)
return articles_df, articles_df
compare = ["ean", "_price"]
check_df = pd.merge(
database_df[compare], articles_df[compare], how="right", indicator=True
)
new_df = (
check_df[check_df["_merge"] == "right_only"]
.drop(columns="_merge")
.merge(articles_df)
)
database_df = (
pd.concat([database_df, new_df])
.sort_values("_date")
.groupby("ean")
.last()
.reset_index()
)
database_df.to_parquet(database_file)
return database_df, new_df
def send_email(lines):
# Define the email parameters
SENDER = "mail@veenboer.xyz"
RECIPIENT = "rik.veenboer@gmail.com"
SUBJECT = "Aanbieding op plato!"
# The email body for recipients with non-HTML email clients
BODY_TEXT = ""
# The HTML body of the email
tmp = "\n".join(lines)
BODY_HTML = f"""<html>
<head></head>
<body>
{tmp}
</html>
"""
# The character encoding for the email
CHARSET = "UTF-8"
# Try to send the email
try:
client = boto3.client(
"ses", region_name="eu-west-1"
) # Change the region as needed
# Provide the contents of the email
response = client.send_email(
Destination={
"ToAddresses": [
RECIPIENT,
],
},
Message={
"Body": {
"Html": {
"Charset": CHARSET,
"Data": BODY_HTML,
},
"Text": {
"Charset": CHARSET,
"Data": BODY_TEXT,
},
},
"Subject": {
"Charset": CHARSET,
"Data": SUBJECT,
},
},
Source=SENDER,
)
# Display an error if something goes wrong.
except NoCredentialsError:
print("Credentials not available")
except PartialCredentialsError:
print("Incomplete credentials provided")
except Exception as e:
print(f"Error: {e}")
else:
print("Email sent! Message ID:"),
print(response["MessageId"])
def main(dry=False):
load_dotenv("/opt/.env")
local_ip = get("http://ifconfig.me", False).text
get_ip = get("http://ifconfig.me").text
print(f"Local IP = {local_ip}")
print(f"Request IP = {get_ip}")
assert local_ip != get_ip
artists = open("/home/user/artists.txt").read().strip().splitlines()
print(f"Number of known artists = {len(artists)}")
if dry:
articles_df = None
else:
articles_df = scrape_plato(get=get)
database_df, new_df = update_database(articles_df)
if dry:
new_df = database_df.sample(20)
print(f"Database size = {len(database_df)}")
print(f"New = {len(new_df)}")
# new_df = new_df[new_df['_artist'].isin(artists)].query('_price <= 25')
new_df = new_df.query('_price <= 25 and ean != ""')
print(f"Interesting = {len(new_df)}")
if new_df is not None and len(new_df):
message = []
for _, row in new_df.head(10).iterrows():
message.append(
f'<a href="https://www.platomania.nl{row.url}"><h1>NEW</h1></a>'
)
message.append("<ul>")
message.append(f"<li>[artist] {row.artist}</li>")
message.append(f"<li>[title] {row.title}</li>")
message.append(f"<li>[price] {row.price}</li>")
message.append(f"<li>[release] {row.release_date}</li>")
message.append("</ul>")
send_email(message)
if __name__ == "__main__":
cwd = os.path.dirname(__file__)
main(dry=False)

View File

@@ -1,52 +0,0 @@
#!/root/.pyenv/versions/dev/bin/python
import re
from datetime import datetime
import pandas as pd
from .scrape import get_soup, scrape_page, scrape_page_links
def scrape_plato(get=None):
ic()
url = "https://www.platomania.nl/vinyl-aanbiedingen?page=1"
ic(url)
soup = get_soup(url=url, get=get)
articles_info = scrape_page(soup)
ic(len(articles_info))
links = sorted(set(scrape_page_links(soup)), key=lambda x: int(x.split("=")[-1]))
for link in links:
ic(link)
soup = get_soup(url=link, get=get)
tmp = scrape_page(soup)
ic(len(tmp))
articles_info.extend(tmp)
def clean(name):
tmp = " ".join(reversed(name.split(", ")))
tmp = tmp.lower()
tmp = re.sub(r"\s+\([^)]*\)", "", tmp)
return tmp
articles_df = pd.DataFrame(articles_info).reindex(
columns=[
"artist",
"title",
"url",
"label",
"release_date",
"origin",
"item_number",
"ean",
"delivery_info",
"price",
]
)
articles_df["_artist"] = articles_df["artist"].map(clean)
articles_df["_price"] = articles_df["price"].map(lambda x: float(x.split(" ")[-1]))
articles_df["_date"] = datetime.now()
return articles_df

60
apps/vinyl/src/plato/scrape.py Normal file → Executable file
View File

@@ -1,21 +1,61 @@
import re
from datetime import datetime
import pandas as pd
import requests import requests
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
def scrape(get=None):
ic()
url = "https://www.platomania.nl/vinyl-aanbiedingen?page=1"
ic(url)
soup = get_soup(url=url, get=get)
articles_info = scrape_page(soup)
ic(len(articles_info))
links = sorted(set(scrape_page_links(soup)), key=lambda x: int(x.split("=")[-1]))
for link in links:
ic(link)
soup = get_soup(url=link, get=get)
tmp = scrape_page(soup)
ic(len(tmp))
articles_info.extend(tmp)
def clean(name):
tmp = " ".join(reversed(name.split(", ")))
tmp = tmp.lower()
tmp = re.sub(r"\s+\([^)]*\)", "", tmp)
return tmp
articles_df = pd.DataFrame(articles_info).reindex(
columns=[
"artist",
"title",
"url",
"label",
"release_date",
"origin",
"item_number",
"ean",
"delivery_info",
"price",
]
)
articles_df["_artist"] = articles_df["artist"].map(clean)
articles_df["_price"] = articles_df["price"].map(lambda x: float(x.split(" ")[-1]))
articles_df["_date"] = datetime.now()
return articles_df
def get_soup(url, get=None): def get_soup(url, get=None):
# Send a GET request to the specified URL
if get is None: if get is None:
get = requests.get get = requests.get
response = get(url) response = get(url)
response.raise_for_status()
# Check if the request was successful return BeautifulSoup(response.content, "html.parser")
if response.status_code == 200:
# Parse the HTML content of the page
return BeautifulSoup(response.content, "html.parser")
else:
raise ValueError(
f"Failed to retrieve the page. Status code: {response.status_code}"
)
def scrape_page_links(soup): def scrape_page_links(soup):

View File

@@ -1,80 +0,0 @@
#!/usr/bin/python3
import glob
import os
from datetime import datetime
import pandas as pd
def get_csvs(directory, n):
# List all files matching the pattern *_sounds.csv
suffix = "_sounds.csv"
files = glob.glob(os.path.join(directory, f"*{suffix}"))
# Function to extract date from filename
def extract_date_from_filename(filename):
# Extract the date string
basename = os.path.basename(filename)
date_str = basename.split(suffix)[0]
try:
return datetime.strptime(date_str, "%Y-%m-%d_%H:%M:%S")
except ValueError:
# The date string cannot be parsed
return None
# Create a list of tuples (date, filename), ignoring files with unparsable dates
result = [(extract_date_from_filename(file), file) for file in files]
result = [item for item in result if item[0] is not None]
# Sort the list by date in descending order (most recent first)
result.sort(key=lambda x: x[0], reverse=True)
# Return the two most recent files
return [x[1] for x in result[:n]]
def analyze(df1, df2):
df1 = df1.drop_duplicates(subset="id")
df2 = df2.drop_duplicates(subset="id")
combined_df = pd.merge(
df1[["id", "price"]], df2, on="id", how="right", indicator=True
)
combined_df["discount"] = combined_df.price_y - combined_df.price_x
combined_df.drop(columns=["price_x"], inplace=True)
combined_df.rename(columns={"price_y": "price"}, inplace=True)
deals = combined_df.query("discount < 0").sort_values(by="discount")[
["id", "name", "price", "discount"]
]
new = combined_df.query("_merge == 'right_only'").sort_values(by="price")[
["id", "name", "price"]
]
return deals, new
if __name__ == "__main__":
csvs = get_csvs(".", 100)
for i in range(1, len(csvs)):
print(f"Comparing {csvs[i]} with {csvs[0]}")
df_previous = pd.read_csv(csvs[i], index_col=0)
df_latest = pd.read_csv(csvs[0], index_col=0)
deals, new = analyze(df_previous, df_latest)
done = False
if len(deals) > 0:
print()
print("New items:")
print(new)
print()
done = True
if len(deals) > 0:
print("Discounted items:")
print(deals)
done = True
if done:
break

View File

@@ -3,7 +3,7 @@ from utils.parse import parse_date
def parse(df: pl.LazyFrame) -> pl.LazyFrame: def parse(df: pl.LazyFrame) -> pl.LazyFrame:
"""Parse the Plato DataFrame.""" """Parse the Sounds DataFrame."""
return df.with_columns( return df.with_columns(
date=pl.col("date").cast(pl.Date), date=pl.col("date").cast(pl.Date),
artist=pl.coalesce(pl.col("artist"), pl.col("name").str.split("-").list.get(1)) artist=pl.coalesce(pl.col("artist"), pl.col("name").str.split("-").list.get(1))

View File

@@ -1,7 +1,4 @@
#!/usr/bin/python3
import time import time
from datetime import datetime
import pandas as pd import pandas as pd
import requests import requests
@@ -74,11 +71,11 @@ def parse_page(html_content):
) )
def fetch_deals(): def scrape():
# Get page count # Get page count
page_count = get_page_count( response = requests.get("https://www.sounds.nl/uitverkoop/1/lp/all/art")
requests.get("https://www.sounds.nl/uitverkoop/1/lp/all/art").text response.raise_for_status()
) page_count = get_page_count(response.text)
time.sleep(1) time.sleep(1)
print(f"Number of pages: {page_count}") print(f"Number of pages: {page_count}")
@@ -86,25 +83,11 @@ def fetch_deals():
base_url = "https://www.sounds.nl/uitverkoop/{page_number}/lp/all" base_url = "https://www.sounds.nl/uitverkoop/{page_number}/lp/all"
dfs = [] dfs = []
for i in tqdm(range(page_count)): for i in tqdm(range(page_count)):
df = parse_page(requests.get(base_url.format(page_number=i)).text) response = requests.get(base_url.format(page_number=i))
response.raise_for_status()
df = parse_page(response.text)
dfs.append(df) dfs.append(df)
time.sleep(2) time.sleep(2)
# Combine dfs # Combine dfs
return pd.concat(dfs) if dfs else pd.DataFrame(columns=["id", "name", "price"]) return pd.concat(dfs) if dfs else pd.DataFrame(columns=["id", "name", "price"])
if __name__ == "__main__":
df = fetch_deals()
print(f"Found {len(df)} deals")
# Show current deals
print(df.sort_values(by="price").head(10))
# Write to file
now = datetime.now()
prefix = now.strftime("%Y-%m-%d_%H:%M:%S")
directory = "/home/bram/src/python"
filepath = f"{directory}/{prefix}_sounds.csv"
print(f"Writing data to {filepath}")
df.to_csv(filepath)

View File

@@ -102,13 +102,15 @@ def raw_weather(context: dg.AssetExecutionContext) -> Any:
now = datetime.now(tz=timezone.utc) now = datetime.now(tz=timezone.utc)
date_str = now.strftime("%Y-%m-%d") date_str = now.strftime("%Y-%m-%d")
time_str = now.strftime("%H:%M:%S") time_str = now.strftime("%H:%M:%S")
latitude_str, longitude_str = partition_key[:5], partition_key[5:]
yield dg.Output( yield dg.Output(
data, data,
metadata={ metadata={
"date": dg.MetadataValue.timestamp(now), "date": dg.MetadataValue.timestamp(now),
"latitude": dg.MetadataValue.float(latitude), "latitude": dg.MetadataValue.float(latitude),
"longitude": dg.MetadataValue.float(longitude), "longitude": dg.MetadataValue.float(longitude),
"path_suffix": [date_str, time_str], "path": [APP, "raw", date_str, latitude_str, longitude_str, time_str],
}, },
) )
@@ -144,6 +146,7 @@ def raw_weather_batch_latitude(context: dg.AssetExecutionContext) -> None:
fetcher = WeatherFetcher() fetcher = WeatherFetcher()
latitude, longitude = parse_coordinate_str(location) latitude, longitude = parse_coordinate_str(location)
ic(latitude, longitude)
data = fetcher.fetch(latitude=latitude, longitude=longitude) data = fetcher.fetch(latitude=latitude, longitude=longitude)
now = datetime.now(tz=timezone.utc) now = datetime.now(tz=timezone.utc)
@@ -176,6 +179,7 @@ def raw_weather_batch_latitude(context: dg.AssetExecutionContext) -> None:
io_manager_key="polars_parquet_io_manager", io_manager_key="polars_parquet_io_manager",
partitions_def=daily_partitions_def, partitions_def=daily_partitions_def,
output_required=False, output_required=False,
automation_condition=dg.AutomationCondition.eager(),
) )
def parsed_weather( def parsed_weather(
context: dg.AssetExecutionContext, context: dg.AssetExecutionContext,

View File

@@ -39,12 +39,12 @@ def list_locations(context: dg.SensorEvaluationContext) -> dg.SensorResult:
if new_locations: if new_locations:
context.log.info(f"Discovered {len(new_locations)} new locations.") context.log.info(f"Discovered {len(new_locations)} new locations.")
# Limit to 3 new locations
selected = new_locations[:3]
return dg.SensorResult( return dg.SensorResult(
run_requests=[], # dg.RunRequest(partition_key=location) for location in locations], run_requests=[
dg.RunRequest(partition_key=location) for location in new_locations[:3]
],
dynamic_partitions_requests=[ dynamic_partitions_requests=[
location_partitions_def.build_add_request(selected), location_partitions_def.build_add_request(new_locations),
latitude_partitions_def.build_add_request(new_latitudes), latitude_partitions_def.build_add_request(new_latitudes),
longitude_partitions_def.build_add_request(new_longitudes), longitude_partitions_def.build_add_request(new_longitudes),
], ],

View File

@@ -35,8 +35,8 @@ services:
- /opt/dagster/apps/:/code/apps/:ro - /opt/dagster/apps/:/code/apps/:ro
- /opt/dagster/shared/:/code/shared/:ro - /opt/dagster/shared/:/code/shared/:ro
- /opt/dagster/logs/:/logs:rw - /opt/dagster/logs/:/logs:rw
- /opt/dagster/storage/import/:/storage/import/:ro # - /mnt/mezzo/scratch/dagster/import/:/storage/import/:ro
- /opt/dagster/storage/deals/:/storage/deals/:rw - /mnt/mezzo/scratch/dagster/deals/:/storage/deals/:rw
networks: networks:
- dagster - dagster
@@ -64,12 +64,21 @@ services:
dagster-code-stocks-playwright: dagster-code-stocks-playwright:
build: build:
context: apps/stocks context: apps/stocks
dockerfile: ../../Dockerfile.code.playwright dockerfile: Dockerfile.code.playwright
args: args:
- APP=stocks - APP=stocks
image: dagster-code-stocks-playwright image: dagster-code-stocks-playwright
profiles: [ "never" ] profiles: [ "never" ]
dagster-code-backup-base:
build:
context: apps/backup
dockerfile: ../../Dockerfile.code
args:
- APP=backup
image: dagster-code-backup-base
profiles: [ "never" ]
dagster-code-tesla: dagster-code-tesla:
build: build:
context: apps/tesla context: apps/tesla
@@ -108,6 +117,22 @@ services:
networks: networks:
- dagster - dagster
dagster-code-backup:
build:
context: apps/backup
container_name: dagster-code-backup
image: dagster-code-backup
restart: always
environment:
<<: [ *dagster_env ]
DAGSTER_CURRENT_IMAGE: dagster-code-backup
volumes:
- /opt/dagster/apps/:/code/apps/:ro
- /opt/dagster/shared/:/code/shared/:ro
- /opt/dagster/logs/:/logs:rw
networks:
- dagster
dagster-code-other: dagster-code-other:
build: build:
context: apps/other context: apps/other

View File

@@ -6,6 +6,7 @@ x-postgres-env: &postgres_env
POSTGRES_DB: ${POSTGRES_DB} POSTGRES_DB: ${POSTGRES_DB}
x-system-env: &system_env x-system-env: &system_env
TZ: Europe/Amsterdam TZ: Europe/Amsterdam
DATA_DIR: ${DATA_DIR}
CACHE_DIR: /tmp/cache CACHE_DIR: /tmp/cache
x-dagster-env: &dagster_env x-dagster-env: &dagster_env
DAGSTER_POSTGRES_HOST: ${POSTGRES_HOST} DAGSTER_POSTGRES_HOST: ${POSTGRES_HOST}
@@ -26,7 +27,7 @@ x-volumes: &volumes
- /opt/dagster/dagster.yaml:/opt/dagster/home/dagster.yaml.template:ro - /opt/dagster/dagster.yaml:/opt/dagster/home/dagster.yaml.template:ro
- /opt/dagster/workspace.yaml:/opt/dagster/home/workspace.yaml:ro - /opt/dagster/workspace.yaml:/opt/dagster/home/workspace.yaml:ro
- /opt/dagster/system/:/code/system/:ro - /opt/dagster/system/:/code/system/:ro
- /opt/dagster/storage/:/storage/:rw - /mnt/mezzo/scratch/dagster/:/storage/:rw
- /opt/dagster/logs/:/logs:rw - /opt/dagster/logs/:/logs:rw
- /var/run/docker.sock:/var/run/docker.sock:rw - /var/run/docker.sock:/var/run/docker.sock:rw
@@ -40,6 +41,8 @@ services:
<<: *postgres_env <<: *postgres_env
networks: networks:
- dagster - dagster
ports:
- '25432:5432'
volumes: volumes:
- /opt/dagster/db/:/var/lib/postgresql/data/ - /opt/dagster/db/:/var/lib/postgresql/data/

View File

@@ -15,11 +15,13 @@ run_launcher:
class: CustomDockerRunLauncher class: CustomDockerRunLauncher
config: config:
env_vars: env_vars:
- TZ
- DAGSTER_POSTGRES_HOST - DAGSTER_POSTGRES_HOST
- DAGSTER_POSTGRES_PORT - DAGSTER_POSTGRES_PORT
- DAGSTER_POSTGRES_USER - DAGSTER_POSTGRES_USER
- DAGSTER_POSTGRES_PASSWORD - DAGSTER_POSTGRES_PASSWORD
- DAGSTER_POSTGRES_DB - DAGSTER_POSTGRES_DB
- DATA_DIR
- SMTP_SERVER - SMTP_SERVER
- SMTP_PORT - SMTP_PORT
- SMTP_USERNAME - SMTP_USERNAME
@@ -32,8 +34,8 @@ run_launcher:
volumes: volumes:
- /opt/dagster/apps/:/code/apps/:ro - /opt/dagster/apps/:/code/apps/:ro
- /opt/dagster/shared/:/code/shared/:ro - /opt/dagster/shared/:/code/shared/:ro
- /opt/dagster/storage/:/storage/:rw
- /opt/dagster/logs/:/logs:rw - /opt/dagster/logs/:/logs:rw
- ${DATA_DIR}:/storage/:rw
- ${CACHE_DIR}:/cache:rw - ${CACHE_DIR}:/cache:rw
run_storage: run_storage:

View File

@@ -16,6 +16,7 @@ dependencies = [
"openpyxl", "openpyxl",
"pandas", "pandas",
"patito", "patito",
"polars==1.32.0",
"pyarrow", "pyarrow",
"pydantic[email]", "pydantic[email]",
"pydantic-settings", "pydantic-settings",
@@ -43,12 +44,12 @@ local = [
"ipywidgets" "ipywidgets"
] ]
dagster = [ dagster = [
"dagster", "dagster==1.11.4",
"dagster-graphql", "dagster-graphql",
"dagster-postgres", "dagster-postgres",
"dagster-docker", "dagster-docker",
"dagster-aws", "dagster-aws",
"dagster-polars[patito]", "dagster-polars[patito]==0.27.4",
"dagster-duckdb", "dagster-duckdb",
"dagster-duckdb-pandas", "dagster-duckdb-pandas",
"dagit" "dagit"
@@ -65,6 +66,9 @@ weather = [
"requests_cache", "requests_cache",
"retry_requests" "retry_requests"
] ]
backup = [
"paramiko"
]
other = [ other = [
# "deltalake>=1.0.0", # "deltalake>=1.0.0",
# "dagster-deltalake-pandas", # "dagster-deltalake-pandas",

View File

@@ -5,4 +5,5 @@ uv pip compile pyproject.toml --extra=dagster --extra=vinyl > apps/vinyl/require
uv pip compile pyproject.toml --extra=dagster --extra=stocks > apps/stocks/requirements.txt uv pip compile pyproject.toml --extra=dagster --extra=stocks > apps/stocks/requirements.txt
uv pip compile pyproject.toml --extra=dagster --extra=tesla > apps/tesla/requirements.txt uv pip compile pyproject.toml --extra=dagster --extra=tesla > apps/tesla/requirements.txt
uv pip compile pyproject.toml --extra=dagster --extra=weather > apps/weather/requirements.txt uv pip compile pyproject.toml --extra=dagster --extra=weather > apps/weather/requirements.txt
uv pip compile pyproject.toml --extra=dagster --extra=backup > apps/backup/requirements.txt
uv pip compile pyproject.toml --extra=dagster --extra=other > apps/other/requirements.txt uv pip compile pyproject.toml --extra=dagster --extra=other > apps/other/requirements.txt

View File

@@ -6,6 +6,10 @@ from pydantic import Field, PrivateAttr
from upath import UPath from upath import UPath
import dagster as dg import dagster as dg
from dagster import (
InputContext,
OutputContext,
)
def _process_env_vars(config: dict[str, Any]) -> dict[str, Any]: def _process_env_vars(config: dict[str, Any]) -> dict[str, Any]:
@@ -60,12 +64,26 @@ class BaseIOManager(dg.ConfigurableIOManager, dg.UPathIOManager, ABC):
with path.open("r") as fp: with path.open("r") as fp:
return json.load(fp) return json.load(fp)
def get_path_for_partition(
self, context: InputContext | OutputContext, path: "UPath", partition: str
) -> UPath:
"""Use path from metadata when provided."""
ic()
context_metadata = context.output_metadata or {}
ic(context_metadata)
if "path" in context_metadata:
return UPath(*context_metadata["path"].value)
return super().get_path_for_partition(context)
def get_asset_relative_path( def get_asset_relative_path(
self, context: dg.InputContext | dg.OutputContext self, context: dg.InputContext | dg.OutputContext
) -> UPath: ) -> UPath:
"""Get the relative path for the asset based on context metadata.""" """Get the relative path for the asset based on context metadata."""
ic()
context_metadata = context.output_metadata or {} context_metadata = context.output_metadata or {}
ic(context_metadata) ic(context_metadata)
path_prefix = ( path_prefix = (
context_metadata["path_prefix"].value context_metadata["path_prefix"].value
if "path_prefix" in context_metadata if "path_prefix" in context_metadata

View File

@@ -15,6 +15,10 @@ load_from:
location_name: weather location_name: weather
host: dagster-code-weather host: dagster-code-weather
port: 4000 port: 4000
- grpc_server:
location_name: backup
host: dagster-code-backup
port: 4000
- grpc_server: - grpc_server:
location_name: other location_name: other
host: dagster-code-other host: dagster-code-other