Compare commits

...

41 Commits

Author SHA1 Message Date
87924620fd move storage dir 2026-01-08 15:52:11 +01:00
b15aaaa0dc update stocks scraper 2025-11-09 18:30:10 +01:00
3f99f354de align raw weather output files 2025-10-29 13:59:21 +01:00
204565118b run request for 3 new locations 2025-10-29 11:17:12 +01:00
aa4a2fa5b1 add all new locations 2025-10-29 11:11:26 +01:00
af913e258a test ssh access 2025-10-29 10:15:57 +01:00
7a8f15b1d6 remove sample code 2025-10-29 10:15:35 +01:00
e9ad1677ef warn for missing partitions 2025-10-29 10:15:10 +01:00
2a4da9abb9 adjust port mapping 2025-10-29 10:12:37 +01:00
a9b9197150 adjust automation condition 2025-10-29 10:12:21 +01:00
883ecf86be adjust deals job 2025-09-23 19:22:49 +02:00
7a600f6264 cast release column 2025-09-23 19:17:14 +02:00
127a773c82 allow empty release 2025-09-23 19:11:55 +02:00
67a7e2dacf add logger for borg 2025-09-15 16:56:38 +02:00
fc6f120c53 pin some packages 2025-08-24 09:45:23 +02:00
55e8b31223 parse platenzaak deals 2025-08-22 10:35:34 +02:00
1d9bd68612 scrape platenzaak 2025-08-22 10:22:16 +02:00
e0cda85d20 unused snippet 2025-08-22 09:43:22 +02:00
da55030498 move stocks specific Dockerfile 2025-08-22 09:43:10 +02:00
316fe03be9 add backup repo to dagster and docker files 2025-08-22 09:42:19 +02:00
bf537c86a4 inventory of borg backups 2025-08-22 09:41:08 +02:00
d2e34bca1c inventory of borg backups 2025-08-22 09:40:30 +02:00
65593e5421 lint 2025-08-17 17:49:01 +02:00
4242638818 refactor 2025-08-16 13:56:18 +02:00
4593b97bc2 linting 2025-08-16 13:49:41 +02:00
a0a0bbd110 refactor 2025-08-16 13:48:34 +02:00
2b4e34ec2f linting 2025-08-06 21:39:05 +02:00
eaf469d68f add cache for stocks code 2025-08-06 21:38:49 +02:00
3c7f46fb4f untrack requirements files
(cherry picked from commit f7f5c9d7a7)
2025-08-06 21:36:39 +02:00
866e190ed0 ignore logs folder
(cherry picked from commit be608ffaa3)
2025-08-06 21:36:39 +02:00
968d5c34de tweaks on shuttle
(cherry picked from commit 38f8830521)
2025-08-06 21:36:39 +02:00
030424e124 delete old files 2025-08-05 09:01:19 +02:00
81c2035d02 store old files 2025-08-05 09:01:07 +02:00
2d20cdf256 remove polars-lts-cpu 2025-08-05 08:36:49 +02:00
28a195a256 tweaks 2025-08-04 20:05:20 +02:00
629bc6c648 put entrypoint in own file 2025-08-04 19:17:32 +02:00
4bc5770cce no need for lock 2025-08-04 19:10:50 +02:00
b26ba7aa35 tweaks 2025-08-04 19:09:32 +02:00
17ca8669ef parse stocks pages 2025-08-04 18:05:53 +02:00
f7318e85cd move utils 2025-08-04 18:02:00 +02:00
a3d931c1b3 update requirements files 2025-08-04 18:01:47 +02:00
62 changed files with 854 additions and 3398 deletions

2
.gitignore vendored
View File

@@ -9,4 +9,6 @@ src/history/
src/logs/ src/logs/
src/schedules/ src/schedules/
db/ db/
logs/
.DS_Store .DS_Store
*requirements.txt

View File

@@ -14,8 +14,7 @@ RUN curl -LsSf https://astral.sh/uv/install.sh | sh
WORKDIR /opt/dagster/home WORKDIR /opt/dagster/home
COPY requirements.txt . COPY requirements.txt .
RUN uv pip install -r requirements.txt --system \ RUN uv pip install -r requirements.txt --system
&& uv pip install polars-lts-cpu --system
ARG APP ARG APP
ENV APP=$APP ENV APP=$APP

View File

@@ -15,22 +15,12 @@ ENV DAGSTER_HOME=/opt/dagster/home/
WORKDIR $DAGSTER_HOME WORKDIR $DAGSTER_HOME
COPY dagster-requirements.txt requirements.txt COPY dagster-requirements.txt requirements.txt
RUN uv pip install -r requirements.txt --system \ RUN uv pip install -r requirements.txt --system
&& uv pip install polars-lts-cpu --system
RUN mkdir -p $DAGSTER_HOME RUN mkdir -p $DAGSTER_HOME
# Create entrypoint that renders the dagster.yaml from a template # Create entrypoint that renders the dagster.yaml from a template
RUN cat << 'EOF' > /entrypoint.sh COPY entrypoint.sh /entrypoint.sh
#!/bin/sh
set -e
echo "Rendering dagster.yaml from template..."
envsubst < dagster.yaml.template > dagster.yaml
echo "Starting Dagster: $@"
exec "$@"
EOF
RUN chmod +x /entrypoint.sh RUN chmod +x /entrypoint.sh
ENTRYPOINT ["/entrypoint.sh"] ENTRYPOINT ["/entrypoint.sh"]

11
apps/backup/Dockerfile Normal file
View File

@@ -0,0 +1,11 @@
FROM dagster-code-backup-base
RUN apt-get update \
&& apt-get install --no-install-recommends --yes \
borgbackup openssh-client \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*
RUN mkdir -p /root/.ssh && chmod 0700 /root/.ssh/
COPY --chmod=0600 id_rsa /root/.ssh/
ADD --chmod=0600 ssh_config /root/.ssh/config

146
apps/backup/src/assets.py Normal file
View File

@@ -0,0 +1,146 @@
import json
import os
import subprocess
import sys
from datetime import datetime
from functools import partial
from zoneinfo import ZoneInfo
import structlog
from config import APP, BORG_HOST, BORG_ROOT
from partitions import borg_repo_partitions_def, daily_partitions_def
from shared.utils import get_partition_keys
import dagster as dg
asset = partial(dg.asset, key_prefix=APP)
logger = structlog.get_logger()
@asset(
partitions_def=dg.MultiPartitionsDefinition(
{
"date": daily_partitions_def,
"repo": borg_repo_partitions_def,
}
)
)
def borg_archive(context: dg.AssetExecutionContext) -> None:
pass
@asset(
deps=[borg_archive],
partitions_def=dg.MultiPartitionsDefinition(
{
"date": daily_partitions_def,
"repo": borg_repo_partitions_def,
}
),
automation_condition=dg.AutomationCondition.eager(),
)
def borg_archive_info(context: dg.AssetExecutionContext) -> dg.Output[None]:
partition_keys = get_partition_keys(context)
ic(partition_keys)
location = f"ssh://{BORG_HOST}{BORG_ROOT}{partition_keys['repo']}::{partition_keys['date']}"
ic(location)
try:
result = subprocess.run(
["borg", "info", "--json", location],
capture_output=True,
text=True,
check=True,
env={"BORG_UNKNOWN_UNENCRYPTED_REPO_ACCESS_IS_OK": "yes"},
)
except subprocess.CalledProcessError as e:
logger.error("borg list failed", exc_info=e, code=e.returncode)
sys.stderr.write("borg list failed\n" + e.stderr)
data = json.loads(result.stdout)
ic(data)
tmp = data["archives"][0]
def parse_date(date_str, tz: str | None = None) -> dg.MetadataValue.timestamp:
return dg.MetadataValue.timestamp(
datetime.fromisoformat(date_str).replace(
tzinfo=ZoneInfo(tz or os.environ.get("TZ", "CET"))
)
)
return dg.Output(
None,
metadata={
"start": parse_date(tmp["start"]),
"end": parse_date(tmp["end"]),
"duration": dg.MetadataValue.float(tmp["duration"]),
"compressed_size": dg.MetadataValue.int(tmp["stats"]["compressed_size"]),
"deduplicated_size": dg.MetadataValue.int(
tmp["stats"]["deduplicated_size"]
),
"nfiles": dg.MetadataValue.int(tmp["stats"]["nfiles"]),
"original_size": dg.MetadataValue.int(tmp["stats"]["original_size"]),
},
)
# now run borg info ssh://shuttle/mnt/yotta/xenon/borg/opt/::2025-07-27 --json and register info
@asset(
partitions_def=borg_repo_partitions_def,
)
def borg_repo(context: dg.AssetExecutionContext) -> None:
location = f"ssh://{BORG_HOST}{BORG_ROOT}{context.partition_key}"
ic(location)
repo = context.partition_key
# Get Borg backup list
try:
result = subprocess.run(
["borg", "list", "--json", location],
capture_output=True,
text=True,
check=True,
env={"BORG_UNKNOWN_UNENCRYPTED_REPO_ACCESS_IS_OK": "yes"},
)
except subprocess.CalledProcessError as e:
logger.error("borg list failed", exc_info=e, code=e.returncode)
sys.stderr.write("borg list failed\n" + e.stderr)
data = json.loads(result.stdout)
ic(data)
for entry in data.get("archives", []):
partition = f"{entry['archive']}|{repo}"
context.log_event(
dg.AssetMaterialization(
asset_key=borg_archive.key,
partition=partition,
metadata={
"id": dg.MetadataValue.text(entry["id"]),
},
)
)
# context.
# snapshots = data.get("archives", [])
#
# # Find latest backup for this day
# match = next(
# (s for s in reversed(snapshots)
# if datetime.fromisoformat(s["end"]).date() == expected_date),
# None
# )
#
# if match:
# context.log_event(
# dg.AssetMaterialization(
# asset_key=one.key, partition="2025-07-27"
# ) # this works!
# )
#
# return {
# "name": match["name"],
# "end": match["end"],
# "size": match.get("size", 0)
# }
# else:
# raise Exception(f"No backup found for {expected_date}")

View File

@@ -0,0 +1,7 @@
import os
from pathlib import Path
APP = os.environ.get("APP", Path(__file__).parent.parent.name)
BORG_HOST = "backup"
BORG_ROOT: str = "/mnt/yotta/xenon/borg/"

View File

@@ -0,0 +1,22 @@
import assets
import sensors
from config import APP
from icecream import install
import dagster as dg
install()
definitions = dg.Definitions(
assets=[
asset.with_attributes(
group_names_by_key={asset.key: APP},
tags_by_key={asset.key: {"app": APP}},
)
for asset in dg.load_assets_from_modules([assets])
],
resources={},
jobs=[],
schedules=[],
sensors=[sensors.borg_repos],
)

View File

@@ -0,0 +1,8 @@
import os
import dagster as dg
borg_repo_partitions_def = dg.DynamicPartitionsDefinition(name="borg_repo")
daily_partitions_def = dg.DailyPartitionsDefinition(
start_date="2025-01-01", end_offset=1, timezone=os.environ.get("TZ", "UTC")
)

View File

@@ -0,0 +1,26 @@
import structlog
from partitions import borg_repo_partitions_def
from utils.borg import get_ssh_client, list_repos
import dagster as dg
logger = structlog.get_logger()
@dg.sensor()
def borg_repos(context: dg.SensorEvaluationContext) -> dg.SensorResult:
existing_repos = set(
context.instance.get_dynamic_partitions(borg_repo_partitions_def.name)
)
with get_ssh_client() as client:
parent = "/mnt/yotta/xenon/borg/"
repos = set(list_repos(client, parent))
new_repos = list(set(repos) - existing_repos)
return dg.SensorResult(
# run_requests=[dg.RunRequest(partition_key=repo) for repo in new_repos],
dynamic_partitions_requests=[
borg_repo_partitions_def.build_add_request(new_repos),
],
)

11
apps/backup/src/test.py Normal file
View File

@@ -0,0 +1,11 @@
import structlog
from utils.borg import get_ssh_client, list_repos
logger = structlog.get_logger()
if __name__ == "__main__":
with get_ssh_client() as client:
parent = "/mnt/yotta/xenon/borg/"
repos = set(list_repos(client, parent))
print(repos)

View File

View File

@@ -0,0 +1,59 @@
from collections.abc import Iterator
from configparser import ConfigParser
from contextlib import contextmanager
from io import StringIO
from pathlib import Path
import paramiko
import structlog
logger = structlog.get_logger(__name__)
@contextmanager
def get_ssh_client():
ssh_config_file = Path.home() / ".ssh/config"
with open(ssh_config_file) as f:
ssh_config = paramiko.SSHConfig()
ssh_config.parse(f)
host_config = ssh_config.lookup("backup") # the host alias in ~/.ssh/config
hostname = host_config.get("hostname", "localhost")
port = int(host_config.get("port", 22))
username = host_config.get("user")
key_filename = host_config.get("identityfile", [None])[0]
# Connect using Paramiko
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(
hostname=hostname, port=port, username=username, key_filename=key_filename
)
yield client
client.close()
def list_repos(client, parent) -> Iterator[str]:
command = f"ls {parent}*/config"
stdin, stdout, stderr = client.exec_command(command)
paths = [line.strip() for line in stdout.readlines()]
sftp = client.open_sftp()
for path in paths:
name = Path(path).parent.name
logger.info("Opening path", name=name)
with sftp.open(path, "r") as f:
try:
content = f.read().decode()
config = ConfigParser()
config.read_file(StringIO(content))
config.get("repository", "version")
yield name
except Exception as e:
logger.warning("Not a borg repository!", e=e)
sftp.close()

4
apps/backup/ssh_config Normal file
View File

@@ -0,0 +1,4 @@
Host backup
HostName rik.veenboer.xyz
User backup
StrictHostKeyChecking no

View File

@@ -1,368 +0,0 @@
# This file was autogenerated by uv via the following command:
# uv pip compile pyproject.toml --extra=dagster --extra=other
alembic==1.16.4
# via dagster
annotated-types==0.7.0
# via pydantic
antlr4-python3-runtime==4.13.2
# via dagster
anyio==4.9.0
# via
# gql
# starlette
# watchfiles
arro3-core==0.5.1
# via deltalake
asttokens==3.0.0
# via icecream
backoff==2.2.1
# via gql
beautifulsoup4==4.13.4
# via dev (pyproject.toml)
boto3==1.40.1
# via
# dev (pyproject.toml)
# dagster-aws
botocore==1.40.1
# via
# boto3
# s3fs
# s3transfer
certifi==2025.7.14
# via requests
charset-normalizer==3.4.2
# via requests
click==8.1.8
# via
# dagster
# dagster-webserver
# uvicorn
colorama==0.4.6
# via icecream
coloredlogs==14.0
# via dagster
contourpy==1.3.3
# via matplotlib
cramjam==2.11.0
# via fastparquet
cycler==0.12.1
# via matplotlib
dagit==1.11.4
# via dev (pyproject.toml)
dagster==1.11.4
# via
# dev (pyproject.toml)
# dagster-aws
# dagster-delta
# dagster-docker
# dagster-duckdb
# dagster-duckdb-pandas
# dagster-graphql
# dagster-polars
# dagster-postgres
# dagster-webserver
dagster-aws==0.27.4
# via dev (pyproject.toml)
dagster-delta @ git+https://github.com/ASML-Labs/dagster-delta.git@d28de7a7c13b7071f42231234eb9231269c7c1bf#subdirectory=libraries/dagster-delta
# via dev (pyproject.toml)
dagster-docker==0.27.4
# via dev (pyproject.toml)
dagster-duckdb==0.27.4
# via
# dev (pyproject.toml)
# dagster-duckdb-pandas
dagster-duckdb-pandas==0.27.4
# via dev (pyproject.toml)
dagster-graphql==1.11.4
# via
# dev (pyproject.toml)
# dagster-webserver
dagster-pipes==1.11.4
# via dagster
dagster-polars==0.27.4
# via dev (pyproject.toml)
dagster-postgres==0.27.4
# via dev (pyproject.toml)
dagster-shared==1.11.4
# via dagster
dagster-webserver==1.11.4
# via dagit
deltalake==1.1.3
# via dagster-delta
deprecated==1.2.18
# via deltalake
dnspython==2.7.0
# via email-validator
docker==7.1.0
# via dagster-docker
docker-image-py==0.1.13
# via dagster-docker
docstring-parser==0.17.0
# via dagster
duckdb==1.3.2
# via
# dev (pyproject.toml)
# dagster-duckdb
email-validator==2.2.0
# via pydantic
et-xmlfile==2.0.0
# via openpyxl
executing==2.2.0
# via icecream
fastparquet==2024.11.0
# via dev (pyproject.toml)
filelock==3.18.0
# via dagster
fonttools==4.59.0
# via matplotlib
fsspec==2025.7.0
# via
# fastparquet
# s3fs
# universal-pathlib
gitdb==4.0.12
# via gitpython
gitpython==3.1.45
# via dev (pyproject.toml)
gql==3.5.3
# via dagster-graphql
graphene==3.4.3
# via dagster-graphql
graphql-core==3.2.6
# via
# gql
# graphene
# graphql-relay
graphql-relay==3.2.0
# via graphene
grpcio==1.74.0
# via
# dagster
# grpcio-health-checking
grpcio-health-checking==1.71.2
# via dagster
h11==0.16.0
# via uvicorn
httptools==0.6.4
# via uvicorn
humanfriendly==10.0
# via coloredlogs
icecream==2.1.5
# via dev (pyproject.toml)
idna==3.10
# via
# anyio
# email-validator
# requests
# yarl
jinja2==3.1.6
# via dagster
jmespath==1.0.1
# via
# boto3
# botocore
kiwisolver==1.4.8
# via matplotlib
lxml==6.0.0
# via dev (pyproject.toml)
mako==1.3.10
# via alembic
markdown-it-py==3.0.0
# via rich
markupsafe==3.0.2
# via
# jinja2
# mako
matplotlib==3.10.5
# via seaborn
mdurl==0.1.2
# via markdown-it-py
multidict==6.6.3
# via yarl
numpy==2.3.2
# via
# contourpy
# fastparquet
# matplotlib
# pandas
# seaborn
openpyxl==3.1.5
# via dev (pyproject.toml)
packaging==25.0
# via
# dagster-aws
# dagster-shared
# fastparquet
# matplotlib
pandas==2.3.1
# via
# dev (pyproject.toml)
# dagster-duckdb-pandas
# fastparquet
# seaborn
patito==0.8.3
# via
# dev (pyproject.toml)
# dagster-polars
pendulum==3.1.0
# via dagster-delta
pillow==11.3.0
# via matplotlib
polars==1.32.0
# via
# dagster-polars
# patito
propcache==0.3.2
# via yarl
protobuf==5.29.5
# via
# dagster
# grpcio-health-checking
psycopg2-binary==2.9.10
# via dagster-postgres
pyarrow==21.0.0
# via
# dev (pyproject.toml)
# dagster-polars
pydantic==2.11.7
# via
# dev (pyproject.toml)
# dagster-shared
# patito
# pydantic-settings
pydantic-core==2.33.2
# via pydantic
pydantic-settings==2.10.1
# via dev (pyproject.toml)
pygments==2.19.2
# via
# icecream
# rich
pyparsing==3.2.3
# via matplotlib
pysocks==1.7.1
# via requests
python-dateutil==2.9.0.post0
# via
# botocore
# graphene
# matplotlib
# pandas
# pendulum
python-dotenv==1.1.1
# via
# dagster
# pydantic-settings
# uvicorn
pytz==2025.2
# via
# dagster
# pandas
pyyaml==6.0.2
# via
# dev (pyproject.toml)
# dagster-shared
# uvicorn
regex==2025.7.34
# via docker-image-py
requests==2.32.4
# via
# dev (pyproject.toml)
# dagster
# dagster-aws
# dagster-graphql
# docker
# gql
# requests-toolbelt
requests-toolbelt==1.0.0
# via gql
rich==14.1.0
# via dagster
s3fs==0.4.2
# via dev (pyproject.toml)
s3transfer==0.13.1
# via boto3
seaborn==0.13.2
# via dev (pyproject.toml)
setuptools==80.9.0
# via dagster
six==1.17.0
# via
# dagster
# python-dateutil
smmap==5.0.2
# via gitdb
sniffio==1.3.1
# via anyio
soupsieve==2.7
# via beautifulsoup4
sqlalchemy==2.0.42
# via
# alembic
# dagster
starlette==0.47.2
# via
# dagster-graphql
# dagster-webserver
structlog==25.4.0
# via
# dev (pyproject.toml)
# dagster
tabulate==0.9.0
# via dagster
tomli==2.2.1
# via dagster
tomlkit==0.13.3
# via dagster-shared
toposort==1.10
# via dagster
tqdm==4.67.1
# via dagster
typing-extensions==4.14.1
# via
# alembic
# anyio
# arro3-core
# beautifulsoup4
# dagster-polars
# dagster-shared
# graphene
# patito
# pydantic
# pydantic-core
# sqlalchemy
# starlette
# typing-inspection
typing-inspection==0.4.1
# via
# pydantic
# pydantic-settings
tzdata==2025.2
# via
# pandas
# pendulum
universal-pathlib==0.2.6
# via
# dagster
# dagster-polars
urllib3==2.5.0
# via
# botocore
# docker
# requests
uvicorn==0.35.0
# via dagster-webserver
uvloop==0.21.0
# via uvicorn
watchdog==5.0.3
# via dagster
watchfiles==1.1.0
# via uvicorn
websockets==15.0.1
# via uvicorn
wrapt==1.17.2
# via deprecated
xlsxwriter==3.2.5
# via dev (pyproject.toml)
yarl==1.20.1
# via gql

View File

@@ -1,8 +1,5 @@
FROM mcr.microsoft.com/playwright:v1.54.0-noble FROM mcr.microsoft.com/playwright:v1.54.0-noble
ARG APP
ENV APP=$APP
ENV PYTHONPATH=/apps/$APP/src/:/shared/src/:$PYTHONPATH
ENV PATH="/venv/bin:/root/.local/bin:$PATH" ENV PATH="/venv/bin:/root/.local/bin:$PATH"
WORKDIR /opt/dagster/home WORKDIR /opt/dagster/home
@@ -14,14 +11,15 @@ RUN curl -LsSf https://astral.sh/uv/install.sh | sh && \
COPY requirements.txt . COPY requirements.txt .
RUN . /venv/bin/activate && \ RUN . /venv/bin/activate && \
uv pip install -r requirements.txt && \ uv pip install -r requirements.txt
uv pip install polars-lts-cpu
RUN . /venv/bin/activate && \ RUN . /venv/bin/activate && \
uv pip install playwright && \ uv pip install playwright && \
playwright install playwright install
ARG APP
ENV APP=$APP
ENV PYTHONPATH=/code/apps/$APP/src/:/code/shared/src/:$PYTHONPATH
# Run dagster gRPC server on port 4000 # Run dagster gRPC server on port 4000
EXPOSE 4000 EXPOSE 4000

View File

@@ -1,391 +0,0 @@
# This file was autogenerated by uv via the following command:
# uv pip compile pyproject.toml --extra=dagster --extra=stocks
alembic==1.16.4
# via dagster
annotated-types==0.7.0
# via pydantic
antlr4-python3-runtime==4.13.2
# via dagster
anyio==4.9.0
# via
# gql
# starlette
# watchfiles
asttokens==3.0.0
# via icecream
attrs==25.3.0
# via
# outcome
# trio
backoff==2.2.1
# via gql
beautifulsoup4==4.13.4
# via dev (pyproject.toml)
boto3==1.40.1
# via
# dev (pyproject.toml)
# dagster-aws
botocore==1.40.1
# via
# boto3
# s3fs
# s3transfer
certifi==2025.7.14
# via
# requests
# selenium
charset-normalizer==3.4.2
# via requests
click==8.1.8
# via
# dagster
# dagster-webserver
# uvicorn
colorama==0.4.6
# via icecream
coloredlogs==14.0
# via dagster
contourpy==1.3.3
# via matplotlib
cramjam==2.11.0
# via fastparquet
cycler==0.12.1
# via matplotlib
dagit==1.11.4
# via dev (pyproject.toml)
dagster==1.11.4
# via
# dev (pyproject.toml)
# dagster-aws
# dagster-docker
# dagster-duckdb
# dagster-duckdb-pandas
# dagster-graphql
# dagster-polars
# dagster-postgres
# dagster-webserver
dagster-aws==0.27.4
# via dev (pyproject.toml)
dagster-docker==0.27.4
# via dev (pyproject.toml)
dagster-duckdb==0.27.4
# via
# dev (pyproject.toml)
# dagster-duckdb-pandas
dagster-duckdb-pandas==0.27.4
# via dev (pyproject.toml)
dagster-graphql==1.11.4
# via
# dev (pyproject.toml)
# dagster-webserver
dagster-pipes==1.11.4
# via dagster
dagster-polars==0.27.4
# via dev (pyproject.toml)
dagster-postgres==0.27.4
# via dev (pyproject.toml)
dagster-shared==1.11.4
# via dagster
dagster-webserver==1.11.4
# via dagit
dnspython==2.7.0
# via email-validator
docker==7.1.0
# via dagster-docker
docker-image-py==0.1.13
# via dagster-docker
docstring-parser==0.17.0
# via dagster
duckdb==1.3.2
# via
# dev (pyproject.toml)
# dagster-duckdb
email-validator==2.2.0
# via pydantic
et-xmlfile==2.0.0
# via openpyxl
executing==2.2.0
# via icecream
fastparquet==2024.11.0
# via dev (pyproject.toml)
filelock==3.18.0
# via dagster
fonttools==4.59.0
# via matplotlib
fsspec==2025.7.0
# via
# fastparquet
# s3fs
# universal-pathlib
gitdb==4.0.12
# via gitpython
gitpython==3.1.45
# via dev (pyproject.toml)
gql==3.5.3
# via dagster-graphql
graphene==3.4.3
# via dagster-graphql
graphql-core==3.2.6
# via
# gql
# graphene
# graphql-relay
graphql-relay==3.2.0
# via graphene
greenlet==3.2.3
# via playwright
grpcio==1.74.0
# via
# dagster
# grpcio-health-checking
grpcio-health-checking==1.71.2
# via dagster
h11==0.16.0
# via
# uvicorn
# wsproto
httptools==0.6.4
# via uvicorn
humanfriendly==10.0
# via coloredlogs
icecream==2.1.5
# via dev (pyproject.toml)
idna==3.10
# via
# anyio
# email-validator
# requests
# trio
# yarl
jinja2==3.1.6
# via dagster
jmespath==1.0.1
# via
# boto3
# botocore
kiwisolver==1.4.8
# via matplotlib
lxml==6.0.0
# via dev (pyproject.toml)
mako==1.3.10
# via alembic
markdown-it-py==3.0.0
# via rich
markupsafe==3.0.2
# via
# jinja2
# mako
matplotlib==3.10.5
# via seaborn
mdurl==0.1.2
# via markdown-it-py
multidict==6.6.3
# via yarl
numpy==2.3.2
# via
# contourpy
# fastparquet
# matplotlib
# pandas
# seaborn
openpyxl==3.1.5
# via dev (pyproject.toml)
outcome==1.3.0.post0
# via
# trio
# trio-websocket
packaging==25.0
# via
# dagster-aws
# dagster-shared
# fastparquet
# matplotlib
pandas==2.3.1
# via
# dev (pyproject.toml)
# dagster-duckdb-pandas
# fastparquet
# seaborn
patito==0.8.3
# via
# dev (pyproject.toml)
# dagster-polars
pillow==11.3.0
# via matplotlib
playwright==1.54.0
# via dev (pyproject.toml)
polars==1.32.0
# via
# dagster-polars
# patito
propcache==0.3.2
# via yarl
protobuf==5.29.5
# via
# dagster
# grpcio-health-checking
psycopg2-binary==2.9.10
# via dagster-postgres
pyarrow==21.0.0
# via
# dev (pyproject.toml)
# dagster-polars
pydantic==2.11.7
# via
# dev (pyproject.toml)
# dagster-shared
# patito
# pydantic-settings
pydantic-core==2.33.2
# via pydantic
pydantic-settings==2.10.1
# via dev (pyproject.toml)
pyee==13.0.0
# via playwright
pygments==2.19.2
# via
# icecream
# rich
pyparsing==3.2.3
# via matplotlib
pysocks==1.7.1
# via
# requests
# urllib3
python-dateutil==2.9.0.post0
# via
# botocore
# graphene
# matplotlib
# pandas
python-dotenv==1.1.1
# via
# dagster
# pydantic-settings
# uvicorn
pytz==2025.2
# via
# dagster
# pandas
pyyaml==6.0.2
# via
# dev (pyproject.toml)
# dagster-shared
# uvicorn
regex==2025.7.34
# via docker-image-py
requests==2.32.4
# via
# dev (pyproject.toml)
# dagster
# dagster-aws
# dagster-graphql
# docker
# gql
# requests-toolbelt
requests-toolbelt==1.0.0
# via gql
rich==14.1.0
# via dagster
s3fs==0.4.2
# via dev (pyproject.toml)
s3transfer==0.13.1
# via boto3
seaborn==0.13.2
# via dev (pyproject.toml)
selenium==4.34.2
# via dev (pyproject.toml)
setuptools==80.9.0
# via dagster
six==1.17.0
# via
# dagster
# python-dateutil
smmap==5.0.2
# via gitdb
sniffio==1.3.1
# via
# anyio
# trio
sortedcontainers==2.4.0
# via trio
soupsieve==2.7
# via beautifulsoup4
sqlalchemy==2.0.42
# via
# alembic
# dagster
starlette==0.47.2
# via
# dagster-graphql
# dagster-webserver
structlog==25.4.0
# via
# dev (pyproject.toml)
# dagster
tabulate==0.9.0
# via dagster
tomli==2.2.1
# via dagster
tomlkit==0.13.3
# via dagster-shared
toposort==1.10
# via dagster
tqdm==4.67.1
# via dagster
trio==0.30.0
# via
# selenium
# trio-websocket
trio-websocket==0.12.2
# via selenium
typing-extensions==4.14.1
# via
# alembic
# anyio
# beautifulsoup4
# dagster-polars
# dagster-shared
# graphene
# patito
# pydantic
# pydantic-core
# pyee
# selenium
# sqlalchemy
# starlette
# typing-inspection
typing-inspection==0.4.1
# via
# pydantic
# pydantic-settings
tzdata==2025.2
# via pandas
universal-pathlib==0.2.6
# via
# dagster
# dagster-polars
urllib3==2.5.0
# via
# botocore
# docker
# requests
# selenium
uvicorn==0.35.0
# via dagster-webserver
uvloop==0.21.0
# via uvicorn
watchdog==5.0.3
# via dagster
watchfiles==1.1.0
# via uvicorn
websocket-client==1.8.0
# via selenium
websockets==15.0.1
# via uvicorn
wsproto==1.2.0
# via trio-websocket
xlsxwriter==3.2.5
# via dev (pyproject.toml)
yarl==1.20.1
# via gql

View File

@@ -2,81 +2,30 @@ import asyncio
from collections.abc import Iterator from collections.abc import Iterator
from datetime import datetime, timezone from datetime import datetime, timezone
from functools import partial from functools import partial
from pathlib import Path
import structlog
from config import APP, URL from config import APP, URL
from partitions import daily_partitions_def from partitions import (
from playwright.async_api import async_playwright daily_partitions_def,
from utils import extract_date daily_table_partitions_def,
table_partitions_def,
)
from utils.extracter import extract_date, extract_tables
from utils.scraper import scrape
from utils.text import slugify
import dagster as dg import dagster as dg
TAGS = {"app": APP} TAGS = {"app": APP}
asset = partial(dg.asset, key_prefix=APP, tags=TAGS) asset = partial(dg.asset, key_prefix=APP, tags=TAGS)
logger = structlog.get_logger()
async def main() -> str:
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True)
context = await browser.new_context(viewport={"width": 1000, "height": 2000})
page = await context.new_page()
await page.goto(URL, timeout=60000)
# Wait until at least one toggle button is present
await page.wait_for_selector(".toggle-btn", timeout=20000)
# Set zoom
await page.evaluate("document.body.style.zoom='50%'")
# Find all toggle buttons
toggle_buttons = await page.query_selector_all(".toggle-btn")
print(f"Found {len(toggle_buttons)} toggle buttons")
for i, btn in enumerate(toggle_buttons):
try:
# Ensure it's visible and enabled
if await btn.is_visible() and await btn.is_enabled():
await btn.click()
await page.wait_for_timeout(1000)
if i == len(toggle_buttons) - 1:
break
# Scroll down gradually
scroll_step = 500
total_height = await page.evaluate("() => document.body.scrollHeight")
current_position = 0
while current_position < total_height:
await page.evaluate(f"window.scrollTo(0, {current_position});")
await page.wait_for_timeout(100)
current_position += scroll_step
total_height = await page.evaluate(
"() => document.body.scrollHeight"
)
except Exception as e:
print(f"Skipped button due to error: {e}")
# Get the page content
page_source = await page.content()
# Close the browser
await browser.close()
# Continue scraping logic here...
print("Scraping done")
# Save the page content to a file
with open("/cache/scraped_page.html", "w") as fp:
fp.write(page_source)
return page_source
@asset(io_manager_key="html_io_manager", name="raw") @asset(io_manager_key="html_io_manager", name="raw")
def raw_html(context: dg.AssetExecutionContext) -> Iterator[dg.Output[str]]: def raw_html(context: dg.AssetExecutionContext) -> Iterator[dg.Output[str]]:
page_source = asyncio.run(main()) page_source = asyncio.run(scrape(url=URL))
now = datetime.now(tz=timezone.utc) now = datetime.now(tz=timezone.utc)
date_str = now.strftime("%Y-%m-%d") date_str = now.strftime("%Y-%m-%d")
@@ -94,18 +43,67 @@ def raw_html(context: dg.AssetExecutionContext) -> Iterator[dg.Output[str]]:
date_str = date_obj.strftime("%Y-%m-%d") date_str = date_obj.strftime("%Y-%m-%d")
context.log.info(f"Found date: {date_str}") context.log.info(f"Found date: {date_str}")
context.log_event( context.log_event(
dg.AssetMaterialization(asset_key=daily_html.key, partition=date_str) dg.AssetMaterialization(asset_key=raw_html.key, partition=date_str)
) )
except Exception as e: except Exception as e:
context.log.error(f"Parsing error: {e}") context.log.error(f"Parsing error: {e}")
@asset(deps=[raw_html], partitions_def=daily_table_partitions_def)
def daily_table() -> None: ...
@asset( @asset(
io_manager_key="html_io_manager", deps=[raw_html],
io_manager_key="json_io_manager",
partitions_def=daily_partitions_def, partitions_def=daily_partitions_def,
automation_condition=dg.AutomationCondition.eager(),
output_required=False,
) )
def daily_html() -> str: ... def raw_daily(context: dg.AssetExecutionContext) -> None:
base = (
Path(context.resources.json_io_manager.base_dir).joinpath(*raw_html.key.path)
/ context.partition_key
)
if files := list(base.glob("*.html")):
logger.info(f"Found {len(files)} html files")
page_source = open(files[-1]).read()
for title, description, df in extract_tables(page_source):
# TODO: when scraping click the "View Strategy Criteria" texts and record the
# information
if not title:
logger.info(
"No title!",
description=description,
num_rows=0 if df is None else len(df),
)
continue
class MyAssetConfig(dg.Config): if df is None:
image: str = "bla" logger.info("No data!", title=title, description=description)
continue
slug = slugify(title)
output_context = dg.build_output_context(
asset_key=dg.AssetKey(
[APP, "daily", context.partition_key, slug],
),
resources=context.resources.original_resource_dict,
)
context.resources.json_io_manager.handle_output(
output_context, df.to_dict(orient="records")
)
context.log_event(
dg.AssetMaterialization(
asset_key=daily_table.key,
partition=f"{context.partition_key}|{slug}",
metadata={
"title": dg.MetadataValue.text(title),
"slug": dg.MetadataValue.text(slug),
"description": dg.MetadataValue.text(description),
"rows": dg.MetadataValue.int(len(df)),
},
)
)
context.instance.add_dynamic_partitions(table_partitions_def.name, [slug])

View File

@@ -4,6 +4,7 @@ import sensors
from dagster_polars import PolarsParquetIOManager from dagster_polars import PolarsParquetIOManager
from icecream import install from icecream import install
from shared.config import APP, STORAGE_DIR from shared.config import APP, STORAGE_DIR
from shared.io_manager import JsonIOManager
from shared.io_manager.html import HtmlIOManager from shared.io_manager.html import HtmlIOManager
import dagster as dg import dagster as dg
@@ -20,8 +21,9 @@ definitions = dg.Definitions(
], ],
resources={ resources={
"html_io_manager": HtmlIOManager(base_dir=STORAGE_DIR), "html_io_manager": HtmlIOManager(base_dir=STORAGE_DIR),
"json_io_manager": JsonIOManager(base_dir=STORAGE_DIR),
"polars_parquet_io_manager": PolarsParquetIOManager(base_dir=STORAGE_DIR), "polars_parquet_io_manager": PolarsParquetIOManager(base_dir=STORAGE_DIR),
}, },
jobs=[jobs.raw_html_job], jobs=[jobs.raw_html_job],
sensors=[sensors.check_update], sensors=[sensors.check_update, sensors.parse_raw],
) )

View File

@@ -7,3 +7,8 @@ raw_html_job = dg.define_asset_job(
selection=[assets.raw_html.key], selection=[assets.raw_html.key],
tags={"docker/image": "dagster-code-stocks-playwright"}, tags={"docker/image": "dagster-code-stocks-playwright"},
) )
extract_job = dg.define_asset_job(
"extract_job",
selection=[assets.raw_daily.key],
)

View File

@@ -5,3 +5,9 @@ import dagster as dg
daily_partitions_def = dg.DailyPartitionsDefinition( daily_partitions_def = dg.DailyPartitionsDefinition(
start_date="2025-07-01", end_offset=1, timezone=os.environ.get("TZ", "UTC") start_date="2025-07-01", end_offset=1, timezone=os.environ.get("TZ", "UTC")
) )
table_partitions_def = dg.DynamicPartitionsDefinition(name="tables")
daily_table_partitions_def = dg.MultiPartitionsDefinition(
{"date": daily_partitions_def, "source": table_partitions_def}
)

View File

@@ -1,13 +1,18 @@
from collections.abc import Iterator from collections.abc import Iterator
from datetime import datetime from datetime import datetime
import assets
import jobs import jobs
import pendulum
import requests import requests
import structlog
from config import URL from config import URL
from utils import extract_date from utils.extracter import extract_date
import dagster as dg import dagster as dg
logger = structlog.get_logger()
@dg.sensor(job=jobs.raw_html_job, minimum_interval_seconds=4 * 60 * 60) @dg.sensor(job=jobs.raw_html_job, minimum_interval_seconds=4 * 60 * 60)
def check_update(context: dg.SensorEvaluationContext) -> Iterator[dg.RunRequest]: def check_update(context: dg.SensorEvaluationContext) -> Iterator[dg.RunRequest]:
@@ -30,5 +35,57 @@ def check_update(context: dg.SensorEvaluationContext) -> Iterator[dg.RunRequest]
now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S") now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
file = f"{now_str} stocks.html" file = f"{now_str} stocks.html"
context.log.info(f"Saving file: {file}") context.log.info(f"Saving file: {file}")
with open(f"/cache/{file}") as fp: with open(f"/cache/{file}", "w") as fp:
fp.write(response.text) fp.write(response.text)
@dg.sensor(job=jobs.extract_job, minimum_interval_seconds=2 * 60 * 60)
def parse_raw(context: dg.SensorEvaluationContext):
# TODO: use cursor from sensor to filter materialization events
# Get the known materialized partitions of daily_tables
daily_partitions = context.instance.get_materialized_partitions(
assets.daily_table.key
)
dates = [x.split("|")[0] for x in daily_partitions]
ic(daily_partitions, dates)
# Get metadata for the raw asset (assumes it's tracked or logged with metadata)
events = list(
context.instance.get_event_records(
event_records_filter=dg.EventRecordsFilter(
event_type=dg.DagsterEventType.ASSET_MATERIALIZATION,
asset_key=assets.raw_html.key,
),
limit=100,
)
)
# Track unique dates found in raw that are not materialized in daily_tables
unknown_dates = set()
for event in events:
metadata = event.event_log_entry.asset_materialization.metadata
date_str = None
ic(metadata)
for key, entry in metadata.items():
# TODO: move this general logic
if key.lower() in {"date", "partition", "partition_date"}:
date_str = entry.value
break
if not date_str:
continue
# Normalize and validate the date
try:
dt = pendulum.from_timestamp(int(date_str))
date_str = dt.strftime("%Y-%m-%d")
except Exception as e:
logger.error(f"Failed to parse date: {date_str}", input=date_str, e=e)
continue
if date_str not in dates:
unknown_dates.add(date_str)
ic(unknown_dates)
for date_str in sorted(unknown_dates):
yield dg.RunRequest(partition_key=date_str)

View File

@@ -1,19 +0,0 @@
import re
from collections.abc import Iterator
from datetime import date, datetime
from bs4 import BeautifulSoup
def extract_date(page_source: str) -> Iterator[date]:
# Parse with BeautifulSoup
soup = BeautifulSoup(page_source, "html.parser")
# Find the first <div> after </header>
if (header := soup.find("header")) and (div := header.find_next_sibling("div")):
# Extract date part using regex
match = re.search(r"(\d{1,2})(st|nd|rd|th)?\s+([A-Za-z]+)\s+(\d{4})", div.text)
if match:
day, _, month, year = match.groups()
date_obj = datetime.strptime(f"{day} {month} {year}", "%d %B %Y")
yield date_obj

View File

View File

@@ -0,0 +1,55 @@
import re
from collections.abc import Iterator
from datetime import date, datetime
import pandas as pd
from bs4 import BeautifulSoup
from pandas import DataFrame
def extract_date(page_source: str) -> Iterator[date]:
# Parse with BeautifulSoup
soup = BeautifulSoup(page_source, "html.parser")
# Find the first <div> after </header>
if (header := soup.find("header")) and (div := header.find_next_sibling("div")):
# Extract date part using regex
match = re.search(r"(\d{1,2})(st|nd|rd|th)?\s+([A-Za-z]+)\s+(\d{4})", div.text)
if match:
day, _, month, year = match.groups()
date_obj = datetime.strptime(f"{day} {month} {year}", "%d %B %Y")
yield date_obj
def extract_tables(
page_source: str,
) -> Iterator[tuple[str | None, str | None, DataFrame]]:
soup = BeautifulSoup(page_source, "html.parser")
accordion_items = soup.find_all("div", class_="accordion-item")
for item in accordion_items:
# Extract the title
header = item.find("div", class_="accordion-header")
title = header.find("h2").get_text(strip=True) if header else None
# Extract the description
description_block = item.find("div", class_="accordion-description")
description = (
description_block.find("p").get_text(strip=True)
if description_block
else None
)
# Extract the table
table = item.find("table")
if table:
rows = []
for row in table.find_all("tr"):
cells = [
cell.get_text(strip=True) for cell in row.find_all(["th", "td"])
]
rows.append(cells)
if rows:
df = pd.DataFrame(rows[1:], columns=rows[0])
yield title, description, df

View File

@@ -0,0 +1,59 @@
async def scrape(url: str) -> str:
from playwright.async_api import async_playwright
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True)
context = await browser.new_context(viewport={"width": 1000, "height": 2000})
page = await context.new_page()
await page.goto(url, timeout=60000)
# Wait until buttons are available
await page.wait_for_selector('div[role="button"][aria-expanded]', timeout=20000)
# Zoom out for full view
await page.evaluate("document.body.style.zoom='50%'")
# Find collapsible buttons
toggle_buttons = await page.query_selector_all(
'div[role="button"][aria-expanded]'
)
print(f"Found {len(toggle_buttons)} expandable buttons")
for i, btn in enumerate(toggle_buttons):
try:
aria_expanded = await btn.get_attribute("aria-expanded")
if aria_expanded == "false":
if await btn.is_visible() and await btn.is_enabled():
await btn.click()
await page.wait_for_timeout(1000)
if i == len(toggle_buttons) - 1:
break
# Scroll gradually
scroll_step = 500
total_height = await page.evaluate("() => document.body.scrollHeight")
current_position = 0
while current_position < total_height:
await page.evaluate(f"window.scrollTo(0, {current_position});")
await page.wait_for_timeout(100)
current_position += scroll_step
total_height = await page.evaluate(
"() => document.body.scrollHeight"
)
except Exception as e:
print(f"Skipped button due to error: {e}")
# Capture expanded HTML
page_source = await page.content()
await browser.close()
# Save to file
with open("/cache/scraped_page.html", "w") as fp:
fp.write(page_source)
print("Scraping done")
return page_source

View File

@@ -0,0 +1,11 @@
import re
import unicodedata
def slugify(text: str) -> str:
# Normalize unicode characters
text = unicodedata.normalize("NFKD", text).encode("ascii", "ignore").decode("ascii")
# Replace non-word characters with hyphens
text = re.sub(r"[^\w\s-]", "", text).strip().lower()
# Replace spaces and repeated hyphens with a single hyphen
return re.sub(r"[-\s]+", "-", text)

View File

@@ -1,351 +0,0 @@
# This file was autogenerated by uv via the following command:
# uv pip compile pyproject.toml --extra=dagster --extra=tesla
alembic==1.16.4
# via dagster
annotated-types==0.7.0
# via pydantic
antlr4-python3-runtime==4.13.2
# via dagster
anyio==4.9.0
# via
# gql
# starlette
# watchfiles
asttokens==3.0.0
# via icecream
backoff==2.2.1
# via gql
beautifulsoup4==4.13.4
# via dev (pyproject.toml)
boto3==1.40.1
# via
# dev (pyproject.toml)
# dagster-aws
botocore==1.40.1
# via
# boto3
# s3fs
# s3transfer
certifi==2025.7.14
# via requests
charset-normalizer==3.4.2
# via requests
click==8.1.8
# via
# dagster
# dagster-webserver
# uvicorn
colorama==0.4.6
# via icecream
coloredlogs==14.0
# via dagster
contourpy==1.3.3
# via matplotlib
cramjam==2.11.0
# via fastparquet
cycler==0.12.1
# via matplotlib
dagit==1.11.4
# via dev (pyproject.toml)
dagster==1.11.4
# via
# dev (pyproject.toml)
# dagster-aws
# dagster-docker
# dagster-duckdb
# dagster-duckdb-pandas
# dagster-graphql
# dagster-polars
# dagster-postgres
# dagster-webserver
dagster-aws==0.27.4
# via dev (pyproject.toml)
dagster-docker==0.27.4
# via dev (pyproject.toml)
dagster-duckdb==0.27.4
# via
# dev (pyproject.toml)
# dagster-duckdb-pandas
dagster-duckdb-pandas==0.27.4
# via dev (pyproject.toml)
dagster-graphql==1.11.4
# via
# dev (pyproject.toml)
# dagster-webserver
dagster-pipes==1.11.4
# via dagster
dagster-polars==0.27.4
# via dev (pyproject.toml)
dagster-postgres==0.27.4
# via dev (pyproject.toml)
dagster-shared==1.11.4
# via dagster
dagster-webserver==1.11.4
# via dagit
dnspython==2.7.0
# via email-validator
docker==7.1.0
# via dagster-docker
docker-image-py==0.1.13
# via dagster-docker
docstring-parser==0.17.0
# via dagster
duckdb==1.3.2
# via
# dev (pyproject.toml)
# dagster-duckdb
email-validator==2.2.0
# via pydantic
et-xmlfile==2.0.0
# via openpyxl
executing==2.2.0
# via icecream
fastparquet==2024.11.0
# via dev (pyproject.toml)
filelock==3.18.0
# via dagster
fonttools==4.59.0
# via matplotlib
fsspec==2025.7.0
# via
# fastparquet
# s3fs
# universal-pathlib
gitdb==4.0.12
# via gitpython
gitpython==3.1.45
# via dev (pyproject.toml)
gql==3.5.3
# via dagster-graphql
graphene==3.4.3
# via dagster-graphql
graphql-core==3.2.6
# via
# gql
# graphene
# graphql-relay
graphql-relay==3.2.0
# via graphene
grpcio==1.74.0
# via
# dagster
# grpcio-health-checking
grpcio-health-checking==1.71.2
# via dagster
h11==0.16.0
# via uvicorn
httptools==0.6.4
# via uvicorn
humanfriendly==10.0
# via coloredlogs
icecream==2.1.5
# via dev (pyproject.toml)
idna==3.10
# via
# anyio
# email-validator
# requests
# yarl
jinja2==3.1.6
# via dagster
jmespath==1.0.1
# via
# boto3
# botocore
kiwisolver==1.4.8
# via matplotlib
lxml==6.0.0
# via dev (pyproject.toml)
mako==1.3.10
# via alembic
markdown-it-py==3.0.0
# via rich
markupsafe==3.0.2
# via
# jinja2
# mako
matplotlib==3.10.5
# via seaborn
mdurl==0.1.2
# via markdown-it-py
multidict==6.6.3
# via yarl
numpy==2.3.2
# via
# contourpy
# fastparquet
# matplotlib
# pandas
# seaborn
openpyxl==3.1.5
# via dev (pyproject.toml)
packaging==25.0
# via
# dagster-aws
# dagster-shared
# fastparquet
# matplotlib
pandas==2.3.1
# via
# dev (pyproject.toml)
# dagster-duckdb-pandas
# fastparquet
# seaborn
patito==0.8.3
# via
# dev (pyproject.toml)
# dagster-polars
pillow==11.3.0
# via matplotlib
polars==1.32.0
# via
# dagster-polars
# patito
propcache==0.3.2
# via yarl
protobuf==5.29.5
# via
# dagster
# grpcio-health-checking
psycopg2-binary==2.9.10
# via dagster-postgres
pyarrow==21.0.0
# via
# dev (pyproject.toml)
# dagster-polars
pydantic==2.11.7
# via
# dev (pyproject.toml)
# dagster-shared
# patito
# pydantic-settings
pydantic-core==2.33.2
# via pydantic
pydantic-settings==2.10.1
# via dev (pyproject.toml)
pygments==2.19.2
# via
# icecream
# rich
pyparsing==3.2.3
# via matplotlib
pysocks==1.7.1
# via requests
python-dateutil==2.9.0.post0
# via
# botocore
# graphene
# matplotlib
# pandas
python-dotenv==1.1.1
# via
# dagster
# pydantic-settings
# uvicorn
pytz==2025.2
# via
# dagster
# pandas
pyyaml==6.0.2
# via
# dev (pyproject.toml)
# dagster-shared
# uvicorn
regex==2025.7.34
# via docker-image-py
requests==2.32.4
# via
# dev (pyproject.toml)
# dagster
# dagster-aws
# dagster-graphql
# docker
# gql
# requests-toolbelt
requests-toolbelt==1.0.0
# via gql
rich==14.1.0
# via dagster
s3fs==0.4.2
# via dev (pyproject.toml)
s3transfer==0.13.1
# via boto3
seaborn==0.13.2
# via dev (pyproject.toml)
setuptools==80.9.0
# via dagster
six==1.17.0
# via
# dagster
# python-dateutil
smmap==5.0.2
# via gitdb
sniffio==1.3.1
# via anyio
soupsieve==2.7
# via beautifulsoup4
sqlalchemy==2.0.42
# via
# alembic
# dagster
starlette==0.47.2
# via
# dagster-graphql
# dagster-webserver
structlog==25.4.0
# via
# dev (pyproject.toml)
# dagster
tabulate==0.9.0
# via dagster
tomli==2.2.1
# via dagster
tomlkit==0.13.3
# via dagster-shared
toposort==1.10
# via dagster
tqdm==4.67.1
# via dagster
typing-extensions==4.14.1
# via
# alembic
# anyio
# beautifulsoup4
# dagster-polars
# dagster-shared
# graphene
# patito
# pydantic
# pydantic-core
# sqlalchemy
# starlette
# typing-inspection
typing-inspection==0.4.1
# via
# pydantic
# pydantic-settings
tzdata==2025.2
# via pandas
universal-pathlib==0.2.6
# via
# dagster
# dagster-polars
urllib3==2.5.0
# via
# botocore
# docker
# requests
uvicorn==0.35.0
# via dagster-webserver
uvloop==0.21.0
# via uvicorn
watchdog==5.0.3
# via dagster
watchfiles==1.1.0
# via uvicorn
websockets==15.0.1
# via uvicorn
xlsxwriter==3.2.5
# via dev (pyproject.toml)
yarl==1.20.1
# via gql

View File

@@ -1,22 +0,0 @@
from typing import Any, Optional
class MyIOManager(PolarsParquetIOManager):
def _load_partition_from_path(
self,
context: InputContext,
partition_key: str,
path: "UPath",
backcompat_path: Optional["UPath"] = None,
) -> Any:
try:
return super()._load_partition_from_path(
context, partition_key, path, backcompat_path
)
except FileNotFoundError:
# Handle the case where the partition file does not exist
context.log.warning(
f"Partition file not found for key {partition_key} at path {path}. "
"Returning an empty DataFrame."
)
return None

View File

@@ -1,51 +0,0 @@
import os
from pathlib import Path
from dotenv import find_dotenv, load_dotenv
from icecream import ic
from dagster import AssetKey, DagsterInstance
def delete_partition(instance, partition_def_name, partition_key):
try:
# This does not seem to work, perhaps because it is not a dynamic partition?
# All materializations can be deleted through the UI, but not one by one
instance.delete_dynamic_partition(partition_def_name, partition_key)
except Exception as e:
print(f"Error deleting partition: {e}")
def main(instance):
print(f"Partition '{partition_key}' deleted successfully.")
def detect_previous_partition(instance, name):
ic(name)
records = instance.get_latest_materialization_events(
(AssetKey(name),),
# event_type="ASSET_MATERIALIZATION",
# asset_key=(partition_key,),
# limit=100,
)
print(records)
if __name__ == "__main__":
partition_def_name = "asset_single_1"
partition_key = "2025-07-20" # Example partition key
load_dotenv(find_dotenv())
os.environ["DAGSTER_HOME"] = str(Path(__file__).parent.parent.parent)
for k, v in os.environ.items():
if k.startswith("POSTGRES_"):
os.environ[f"DAGSTER_{k}"] = v
os.environ["DAGSTER_POSTGRES_HOST"] = "localhost"
instance = DagsterInstance.get()
# delete_partition(instance, partition_def_name, partition_key)
detect_previous_partition(instance, partition_def_name)

View File

@@ -1,8 +0,0 @@
#!/usr/bin/env bash
rsync -av /opt/dagster/src/app/vinyl/ \
/Volumes/dagster/src/app/vinyl/ \
--include='*.py' \
--include='*requirements.txt' \
--exclude='__pycache__/' \
-progress \
--delete $*

View File

@@ -1,34 +0,0 @@
import time
from dagster import AssetMaterialization, Output, config_mapping, job, op
@op(config_schema={"config_param": str})
def hello(context):
time.sleep(1)
print("halllo")
return Output(123, metadata={"aa": context.op_config["config_param"]})
@op
def goodbye(context, x: int):
time.sleep(2)
print("doooei", x)
context.log_event(
AssetMaterialization(
asset_key="my_asset",
metadata={"my_meta": 444},
description="A very useful value!",
)
)
return 2
@config_mapping(config_schema={"simplified_param": str})
def simplified_config(val):
return {"ops": {"hello": {"config": {"config_param": val["simplified_param"]}}}}
@job
def my_job():
goodbye(hello())

View File

@@ -1,353 +0,0 @@
# This file was autogenerated by uv via the following command:
# uv pip compile pyproject.toml --extra=dagster --extra=vinyl
alembic==1.16.4
# via dagster
annotated-types==0.7.0
# via pydantic
antlr4-python3-runtime==4.13.2
# via dagster
anyio==4.9.0
# via
# gql
# starlette
# watchfiles
asttokens==3.0.0
# via icecream
backoff==2.2.1
# via gql
beautifulsoup4==4.13.4
# via dev (pyproject.toml)
boto3==1.40.1
# via
# dev (pyproject.toml)
# dagster-aws
botocore==1.40.1
# via
# boto3
# s3fs
# s3transfer
certifi==2025.7.14
# via requests
charset-normalizer==3.4.2
# via requests
click==8.1.8
# via
# dagster
# dagster-webserver
# uvicorn
colorama==0.4.6
# via icecream
coloredlogs==14.0
# via dagster
contourpy==1.3.3
# via matplotlib
cramjam==2.11.0
# via fastparquet
cycler==0.12.1
# via matplotlib
dagit==1.11.4
# via dev (pyproject.toml)
dagster==1.11.4
# via
# dev (pyproject.toml)
# dagster-aws
# dagster-docker
# dagster-duckdb
# dagster-duckdb-pandas
# dagster-graphql
# dagster-polars
# dagster-postgres
# dagster-webserver
dagster-aws==0.27.4
# via dev (pyproject.toml)
dagster-docker==0.27.4
# via dev (pyproject.toml)
dagster-duckdb==0.27.4
# via
# dev (pyproject.toml)
# dagster-duckdb-pandas
dagster-duckdb-pandas==0.27.4
# via dev (pyproject.toml)
dagster-graphql==1.11.4
# via
# dev (pyproject.toml)
# dagster-webserver
dagster-pipes==1.11.4
# via dagster
dagster-polars==0.27.4
# via dev (pyproject.toml)
dagster-postgres==0.27.4
# via dev (pyproject.toml)
dagster-shared==1.11.4
# via dagster
dagster-webserver==1.11.4
# via dagit
dnspython==2.7.0
# via email-validator
docker==7.1.0
# via dagster-docker
docker-image-py==0.1.13
# via dagster-docker
docstring-parser==0.17.0
# via dagster
duckdb==1.3.2
# via
# dev (pyproject.toml)
# dagster-duckdb
email-validator==2.2.0
# via pydantic
et-xmlfile==2.0.0
# via openpyxl
executing==2.2.0
# via icecream
fastparquet==2024.11.0
# via dev (pyproject.toml)
filelock==3.18.0
# via dagster
fonttools==4.59.0
# via matplotlib
fsspec==2025.7.0
# via
# fastparquet
# s3fs
# universal-pathlib
gitdb==4.0.12
# via gitpython
gitpython==3.1.45
# via dev (pyproject.toml)
gql==3.5.3
# via dagster-graphql
graphene==3.4.3
# via dagster-graphql
graphql-core==3.2.6
# via
# gql
# graphene
# graphql-relay
graphql-relay==3.2.0
# via graphene
grpcio==1.74.0
# via
# dagster
# grpcio-health-checking
grpcio-health-checking==1.71.2
# via dagster
h11==0.16.0
# via uvicorn
httptools==0.6.4
# via uvicorn
humanfriendly==10.0
# via coloredlogs
icecream==2.1.5
# via dev (pyproject.toml)
idna==3.10
# via
# anyio
# email-validator
# requests
# yarl
jinja2==3.1.6
# via
# dev (pyproject.toml)
# dagster
jmespath==1.0.1
# via
# boto3
# botocore
kiwisolver==1.4.8
# via matplotlib
lxml==6.0.0
# via dev (pyproject.toml)
mako==1.3.10
# via alembic
markdown-it-py==3.0.0
# via rich
markupsafe==3.0.2
# via
# jinja2
# mako
matplotlib==3.10.5
# via seaborn
mdurl==0.1.2
# via markdown-it-py
multidict==6.6.3
# via yarl
numpy==2.3.2
# via
# contourpy
# fastparquet
# matplotlib
# pandas
# seaborn
openpyxl==3.1.5
# via dev (pyproject.toml)
packaging==25.0
# via
# dagster-aws
# dagster-shared
# fastparquet
# matplotlib
pandas==2.3.1
# via
# dev (pyproject.toml)
# dagster-duckdb-pandas
# fastparquet
# seaborn
patito==0.8.3
# via
# dev (pyproject.toml)
# dagster-polars
pillow==11.3.0
# via matplotlib
polars==1.32.0
# via
# dagster-polars
# patito
propcache==0.3.2
# via yarl
protobuf==5.29.5
# via
# dagster
# grpcio-health-checking
psycopg2-binary==2.9.10
# via dagster-postgres
pyarrow==21.0.0
# via
# dev (pyproject.toml)
# dagster-polars
pydantic==2.11.7
# via
# dev (pyproject.toml)
# dagster-shared
# patito
# pydantic-settings
pydantic-core==2.33.2
# via pydantic
pydantic-settings==2.10.1
# via dev (pyproject.toml)
pygments==2.19.2
# via
# icecream
# rich
pyparsing==3.2.3
# via matplotlib
pysocks==1.7.1
# via requests
python-dateutil==2.9.0.post0
# via
# botocore
# graphene
# matplotlib
# pandas
python-dotenv==1.1.1
# via
# dagster
# pydantic-settings
# uvicorn
pytz==2025.2
# via
# dagster
# pandas
pyyaml==6.0.2
# via
# dev (pyproject.toml)
# dagster-shared
# uvicorn
regex==2025.7.34
# via docker-image-py
requests==2.32.4
# via
# dev (pyproject.toml)
# dagster
# dagster-aws
# dagster-graphql
# docker
# gql
# requests-toolbelt
requests-toolbelt==1.0.0
# via gql
rich==14.1.0
# via dagster
s3fs==0.4.2
# via dev (pyproject.toml)
s3transfer==0.13.1
# via boto3
seaborn==0.13.2
# via dev (pyproject.toml)
setuptools==80.9.0
# via dagster
six==1.17.0
# via
# dagster
# python-dateutil
smmap==5.0.2
# via gitdb
sniffio==1.3.1
# via anyio
soupsieve==2.7
# via beautifulsoup4
sqlalchemy==2.0.42
# via
# alembic
# dagster
starlette==0.47.2
# via
# dagster-graphql
# dagster-webserver
structlog==25.4.0
# via
# dev (pyproject.toml)
# dagster
tabulate==0.9.0
# via dagster
tomli==2.2.1
# via dagster
tomlkit==0.13.3
# via dagster-shared
toposort==1.10
# via dagster
tqdm==4.67.1
# via dagster
typing-extensions==4.14.1
# via
# alembic
# anyio
# beautifulsoup4
# dagster-polars
# dagster-shared
# graphene
# patito
# pydantic
# pydantic-core
# sqlalchemy
# starlette
# typing-inspection
typing-inspection==0.4.1
# via
# pydantic
# pydantic-settings
tzdata==2025.2
# via pandas
universal-pathlib==0.2.6
# via
# dagster
# dagster-polars
urllib3==2.5.0
# via
# botocore
# docker
# requests
uvicorn==0.35.0
# via dagster-webserver
uvloop==0.21.0
# via uvicorn
watchdog==5.0.3
# via dagster
watchfiles==1.1.0
# via uvicorn
websockets==15.0.1
# via uvicorn
xlsxwriter==3.2.5
# via dev (pyproject.toml)
yarl==1.20.1
# via gql

View File

@@ -12,17 +12,20 @@ from dagster_polars.patito import patito_model_to_dagster_type
from jinja2 import Environment, FileSystemLoader from jinja2 import Environment, FileSystemLoader
from models import Deal from models import Deal
from partitions import daily_partitions_def, multi_partitions_def from partitions import daily_partitions_def, multi_partitions_def
from plato.fetch import scrape_plato from platenzaak.parse import parse as parse_platenzaak
from platenzaak.scrape import scrape as scrape_platenzaak
from plato.parse import parse as parse_plato from plato.parse import parse as parse_plato
from plato.scrape import scrape as scrape_plato
from shared.utils import get_partition_keys, parse_partition_keys from shared.utils import get_partition_keys, parse_partition_keys
from sounds.fetch import fetch_deals
from sounds.parse import parse as parse_sounds from sounds.parse import parse as parse_sounds
from sounds.scrape import scrape as scrape_sounds
from structlog.stdlib import BoundLogger
from utils.email import EmailService from utils.email import EmailService
import dagster as dg import dagster as dg
asset = partial(dg.asset, key_prefix=APP) asset = partial(dg.asset, key_prefix=APP)
logger = structlog.get_logger() logger: BoundLogger = structlog.get_logger()
@asset( @asset(
@@ -63,22 +66,24 @@ def deals(context: dg.AssetExecutionContext) -> pl.DataFrame:
logger.error("Failed to load CSV file!", error=e) logger.error("Failed to load CSV file!", error=e)
raise dg.Failure(f"Cannot materialize for the past: {date.date()}") raise dg.Failure(f"Cannot materialize for the past: {date.date()}")
if source == "plato": match source:
logger.info("Scraping Plato") case "plato":
df = scrape_plato() logger.info("Scraping Plato")
logger.info("Scraped Plato", rows=len(df), head=df.head().to_markdown()) df = scrape_plato()
ic(df.columns) logger.info("Scraped Plato", rows=len(df), head=df.head().to_markdown())
return pl.from_pandas(df.assign(**partition_key)) case "sounds":
if source == "sounds": logger.info("Scraping Sounds")
logger.info("Scraping Sounds") df = scrape_sounds()
df = fetch_deals() logger.info("Scraped Sounds", rows=len(df), head=df.head().to_markdown())
ic(df.columns) case "platenzaak":
logger.info("Scraped Sounds", rows=len(df), head=df.head().to_markdown()) logger.info("Scraping Platenzaak")
return pl.from_pandas(df.assign(**partition_key)) df = scrape_platenzaak(logger=logger)
logger.info("Scraped Sounds", rows=len(df), head=df.head().to_markdown())
case _:
raise ValueError(f"Unknown source: {source}!")
return pl.DataFrame( ic(df.columns)
[{"date": context.partition_key, "data": f"Data for {context.partition_key}"}] return pl.from_pandas(df.assign(**partition_key))
)
@asset( @asset(
@@ -105,9 +110,10 @@ def cleaned_deals(
parsed_df = parse_plato(df) parsed_df = parse_plato(df)
case "sounds": case "sounds":
parsed_df = parse_sounds(df) parsed_df = parse_sounds(df)
case "platenzaak":
parsed_df = parse_platenzaak(df)
case _: case _:
context.log.warning(f"Unknown source: {source}!") raise ValueError(f"Unknown source: {source}!")
return
ic(parsed_df.collect_schema()) ic(parsed_df.collect_schema())
@@ -155,7 +161,7 @@ def works(context: dg.AssetExecutionContext) -> Iterator[dg.Output[pl.DataFrame]
"date": dg.DimensionPartitionMapping( "date": dg.DimensionPartitionMapping(
dimension_name="date", dimension_name="date",
partition_mapping=dg.TimeWindowPartitionMapping( partition_mapping=dg.TimeWindowPartitionMapping(
start_offset=-10, start_offset=-3,
end_offset=0, end_offset=0,
allow_nonexistent_upstream_partitions=True, allow_nonexistent_upstream_partitions=True,
), ),
@@ -190,7 +196,6 @@ def new_deals(
if len(partition_keys := sorted(partitions.keys())) < 2: if len(partition_keys := sorted(partitions.keys())) < 2:
context.log.warning("Not enough partitions to fetch new deals!") context.log.warning("Not enough partitions to fetch new deals!")
return return
before, after = partition_keys[-2:] before, after = partition_keys[-2:]
@@ -212,7 +217,9 @@ def new_deals(
new_df = df_after.join(df_before.select("id"), on="id", how="anti").collect() new_df = df_after.join(df_before.select("id"), on="id", how="anti").collect()
if new_df.height: if new_df.height:
context.log.info(f"New deals found ({new_df.height}x)!") context.log.info(f"New deals found ({new_df.height}x)!")
yield dg.Output(Deal.DataFrame(new_df)) yield dg.Output(
Deal.DataFrame(new_df.with_columns(pl.col("release").cast(pl.Date)))
)
else: else:
context.log.info("No new deals found!") context.log.info("No new deals found!")
@@ -225,7 +232,9 @@ def new_deals(
}, },
ins={"partitions": dg.AssetIn(key=new_deals.key)}, ins={"partitions": dg.AssetIn(key=new_deals.key)},
output_required=False, output_required=False,
automation_condition=dg.AutomationCondition.eager(), automation_condition=dg.AutomationCondition.eager().without(
~dg.AutomationCondition.any_deps_missing()
),
) )
def good_deals( def good_deals(
context: dg.AssetExecutionContext, context: dg.AssetExecutionContext,
@@ -235,6 +244,9 @@ def good_deals(
parsed_partition_keys = parse_partition_keys(context, "partitions") parsed_partition_keys = parse_partition_keys(context, "partitions")
ic(parsed_partition_keys) ic(parsed_partition_keys)
if not partitions:
logger.warning("Partitions are empty!")
return
df = pl.concat(partitions.values(), how="vertical_relaxed").collect() df = pl.concat(partitions.values(), how="vertical_relaxed").collect()
counts = dict(df.group_by("source").len().iter_rows()) counts = dict(df.group_by("source").len().iter_rows())
@@ -257,7 +269,7 @@ def good_deals(
] ]
# Render HTML from Jinja template # Render HTML from Jinja template
env = Environment(loader=FileSystemLoader(f"/apps/{APP}")) env = Environment(loader=FileSystemLoader(f"/code/apps/{APP}"))
template = env.get_template("email.html") template = env.get_template("email.html")
html_content = template.render(deals=deals) html_content = template.render(deals=deals)

View File

@@ -12,8 +12,8 @@ define_asset_job = partial(dg.define_asset_job, **kwargs)
deals_job = dg.define_asset_job( deals_job = dg.define_asset_job(
"deals_job", "deals_job",
selection=[assets.deals.key], selection=dg.AssetSelection.assets(assets.new_deals.key).upstream(),
partitions_def=assets.deals.partitions_def, partitions_def=assets.new_deals.partitions_def,
) )

View File

@@ -10,5 +10,7 @@ class Deal(pt.Model):
title: str = pt.Field(description="Title of the deal.") title: str = pt.Field(description="Title of the deal.")
url: str = pt.Field(description="URL to the deal.") url: str = pt.Field(description="URL to the deal.")
date: datetime.date = pt.Field(description="Day the deal was listed.") date: datetime.date = pt.Field(description="Day the deal was listed.")
release: datetime.date = pt.Field(description="Release date.") release: datetime.date | None = pt.Field(
description="Release date.", allow_missing=True
)
price: float = pt.Field(description="Price of the deal in EUR.") price: float = pt.Field(description="Price of the deal in EUR.")

View File

@@ -2,7 +2,7 @@ import os
import dagster as dg import dagster as dg
SOURCES = ["plato", "sounds"] SOURCES = ["plato", "sounds", "platenzaak"]
daily_partitions_def = dg.DailyPartitionsDefinition( daily_partitions_def = dg.DailyPartitionsDefinition(
start_date="2024-09-01", end_offset=1, timezone=os.environ.get("TZ", "UTC") start_date="2024-09-01", end_offset=1, timezone=os.environ.get("TZ", "UTC")
) )

View File

View File

@@ -0,0 +1,13 @@
import polars as pl
def parse(df: pl.LazyFrame) -> pl.LazyFrame:
"""Parse the Platenzaak DataFrame."""
return df.with_columns(
date=pl.col("date").cast(pl.Date),
artist=pl.col("artist").str.strip_chars().str.to_lowercase(),
title=pl.col("album").str.strip_chars().str.to_lowercase(),
release=pl.lit(None),
price=pl.col("current_price").cast(pl.Float64),
url=pl.format("https://platenzaak.nl{}", pl.col("id")),
)

View File

@@ -0,0 +1,90 @@
from collections.abc import Iterator
import pandas as pd
import requests
from bs4 import BeautifulSoup
from structlog.stdlib import BoundLogger
def parse_price(price_block):
"""
Convert a price block like:
<span class="amount theme-money">€ 30<sup>99</sup></span>
into a float: 30.99
"""
if not price_block:
return None
# Extract the main number (before <sup>)
main = price_block.find(string=True, recursive=False)
main = main.strip().replace("", "").replace(",", ".").strip()
# Extract the <sup> part (cents)
sup = price_block.find("sup")
cents = sup.get_text(strip=True) if sup else "00"
try:
return float(f"{main}.{cents}")
except ValueError:
return None
def parse_page(html) -> Iterator[dict]:
soup = BeautifulSoup(html, "html.parser")
for block in soup.select("div.product-block__inner"):
# Wishlist button holds most metadata
wishlist = block.select_one("[data-wlh-id]")
if not wishlist:
continue
product = {
"id": wishlist.get("data-wlh-id"),
"variant_id": wishlist.get("data-wlh-variantid"),
"name": wishlist.get("data-wlh-name"),
"price": wishlist.get("data-wlh-price"),
"url": wishlist.get("data-wlh-link"),
"image": wishlist.get("data-wlh-image"),
}
# Artist + Title (in the title link)
title_block = block.select_one(".product-block__title-price .title")
if title_block:
artist = title_block.find("span")
if artist:
product["artist"] = artist.get_text(strip=True)
# The text after <br> is the album title
product["album"] = (
title_block.get_text(separator="|").split("|")[-1].strip()
)
# Current price (might include discounts)
price_block = block.select_one(".price .amount")
product["current_price"] = parse_price(price_block)
# Original price if on sale
old_price_block = block.select_one(".price del .theme-money")
product["original_price"] = parse_price(old_price_block)
# Sale label
sale_label = block.select_one(".product-label--sale")
product["on_sale"] = bool(sale_label)
yield product
def scrape(logger: BoundLogger) -> pd.DataFrame:
page = 1
products = []
while True:
response = requests.get(
f"https://www.platenzaak.nl/collections/sale?filter.p.m.custom.config_group=Vinyl&page={page}"
)
response.raise_for_status()
page_products = list(parse_page(response.text))
logger.info("Scraped page", page=page, products=len(page_products))
if not page_products:
break
products.extend(page_products)
page += 1
return pd.DataFrame(products)

View File

@@ -1,154 +0,0 @@
import os
import boto3
import pandas as pd
from botocore.exceptions import NoCredentialsError, PartialCredentialsError
from dotenv import load_dotenv
from fetch import scrape_plato
from utils import get
def update_database(articles_df=None, database_file="/home/user/plato.parquet"):
if os.path.exists(database_file):
database_df = pd.read_parquet(database_file)
else:
database_df = None
if articles_df is None:
new_df = None if database_df is None else database_df.head(0)
else:
if database_df is None:
articles_df.to_parquet(database_file)
return articles_df, articles_df
compare = ["ean", "_price"]
check_df = pd.merge(
database_df[compare], articles_df[compare], how="right", indicator=True
)
new_df = (
check_df[check_df["_merge"] == "right_only"]
.drop(columns="_merge")
.merge(articles_df)
)
database_df = (
pd.concat([database_df, new_df])
.sort_values("_date")
.groupby("ean")
.last()
.reset_index()
)
database_df.to_parquet(database_file)
return database_df, new_df
def send_email(lines):
# Define the email parameters
SENDER = "mail@veenboer.xyz"
RECIPIENT = "rik.veenboer@gmail.com"
SUBJECT = "Aanbieding op plato!"
# The email body for recipients with non-HTML email clients
BODY_TEXT = ""
# The HTML body of the email
tmp = "\n".join(lines)
BODY_HTML = f"""<html>
<head></head>
<body>
{tmp}
</html>
"""
# The character encoding for the email
CHARSET = "UTF-8"
# Try to send the email
try:
client = boto3.client(
"ses", region_name="eu-west-1"
) # Change the region as needed
# Provide the contents of the email
response = client.send_email(
Destination={
"ToAddresses": [
RECIPIENT,
],
},
Message={
"Body": {
"Html": {
"Charset": CHARSET,
"Data": BODY_HTML,
},
"Text": {
"Charset": CHARSET,
"Data": BODY_TEXT,
},
},
"Subject": {
"Charset": CHARSET,
"Data": SUBJECT,
},
},
Source=SENDER,
)
# Display an error if something goes wrong.
except NoCredentialsError:
print("Credentials not available")
except PartialCredentialsError:
print("Incomplete credentials provided")
except Exception as e:
print(f"Error: {e}")
else:
print("Email sent! Message ID:"),
print(response["MessageId"])
def main(dry=False):
load_dotenv("/opt/.env")
local_ip = get("http://ifconfig.me", False).text
get_ip = get("http://ifconfig.me").text
print(f"Local IP = {local_ip}")
print(f"Request IP = {get_ip}")
assert local_ip != get_ip
artists = open("/home/user/artists.txt").read().strip().splitlines()
print(f"Number of known artists = {len(artists)}")
if dry:
articles_df = None
else:
articles_df = scrape_plato(get=get)
database_df, new_df = update_database(articles_df)
if dry:
new_df = database_df.sample(20)
print(f"Database size = {len(database_df)}")
print(f"New = {len(new_df)}")
# new_df = new_df[new_df['_artist'].isin(artists)].query('_price <= 25')
new_df = new_df.query('_price <= 25 and ean != ""')
print(f"Interesting = {len(new_df)}")
if new_df is not None and len(new_df):
message = []
for _, row in new_df.head(10).iterrows():
message.append(
f'<a href="https://www.platomania.nl{row.url}"><h1>NEW</h1></a>'
)
message.append("<ul>")
message.append(f"<li>[artist] {row.artist}</li>")
message.append(f"<li>[title] {row.title}</li>")
message.append(f"<li>[price] {row.price}</li>")
message.append(f"<li>[release] {row.release_date}</li>")
message.append("</ul>")
send_email(message)
if __name__ == "__main__":
cwd = os.path.dirname(__file__)
main(dry=False)

View File

@@ -1,52 +0,0 @@
#!/root/.pyenv/versions/dev/bin/python
import re
from datetime import datetime
import pandas as pd
from .scrape import get_soup, scrape_page, scrape_page_links
def scrape_plato(get=None):
ic()
url = "https://www.platomania.nl/vinyl-aanbiedingen?page=1"
ic(url)
soup = get_soup(url=url, get=get)
articles_info = scrape_page(soup)
ic(len(articles_info))
links = sorted(set(scrape_page_links(soup)), key=lambda x: int(x.split("=")[-1]))
for link in links:
ic(link)
soup = get_soup(url=link, get=get)
tmp = scrape_page(soup)
ic(len(tmp))
articles_info.extend(tmp)
def clean(name):
tmp = " ".join(reversed(name.split(", ")))
tmp = tmp.lower()
tmp = re.sub(r"\s+\([^)]*\)", "", tmp)
return tmp
articles_df = pd.DataFrame(articles_info).reindex(
columns=[
"artist",
"title",
"url",
"label",
"release_date",
"origin",
"item_number",
"ean",
"delivery_info",
"price",
]
)
articles_df["_artist"] = articles_df["artist"].map(clean)
articles_df["_price"] = articles_df["price"].map(lambda x: float(x.split(" ")[-1]))
articles_df["_date"] = datetime.now()
return articles_df

60
apps/vinyl/src/plato/scrape.py Normal file → Executable file
View File

@@ -1,21 +1,61 @@
import re
from datetime import datetime
import pandas as pd
import requests import requests
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
def scrape(get=None):
ic()
url = "https://www.platomania.nl/vinyl-aanbiedingen?page=1"
ic(url)
soup = get_soup(url=url, get=get)
articles_info = scrape_page(soup)
ic(len(articles_info))
links = sorted(set(scrape_page_links(soup)), key=lambda x: int(x.split("=")[-1]))
for link in links:
ic(link)
soup = get_soup(url=link, get=get)
tmp = scrape_page(soup)
ic(len(tmp))
articles_info.extend(tmp)
def clean(name):
tmp = " ".join(reversed(name.split(", ")))
tmp = tmp.lower()
tmp = re.sub(r"\s+\([^)]*\)", "", tmp)
return tmp
articles_df = pd.DataFrame(articles_info).reindex(
columns=[
"artist",
"title",
"url",
"label",
"release_date",
"origin",
"item_number",
"ean",
"delivery_info",
"price",
]
)
articles_df["_artist"] = articles_df["artist"].map(clean)
articles_df["_price"] = articles_df["price"].map(lambda x: float(x.split(" ")[-1]))
articles_df["_date"] = datetime.now()
return articles_df
def get_soup(url, get=None): def get_soup(url, get=None):
# Send a GET request to the specified URL
if get is None: if get is None:
get = requests.get get = requests.get
response = get(url) response = get(url)
response.raise_for_status()
# Check if the request was successful return BeautifulSoup(response.content, "html.parser")
if response.status_code == 200:
# Parse the HTML content of the page
return BeautifulSoup(response.content, "html.parser")
else:
raise ValueError(
f"Failed to retrieve the page. Status code: {response.status_code}"
)
def scrape_page_links(soup): def scrape_page_links(soup):

View File

@@ -1,80 +0,0 @@
#!/usr/bin/python3
import glob
import os
from datetime import datetime
import pandas as pd
def get_csvs(directory, n):
# List all files matching the pattern *_sounds.csv
suffix = "_sounds.csv"
files = glob.glob(os.path.join(directory, f"*{suffix}"))
# Function to extract date from filename
def extract_date_from_filename(filename):
# Extract the date string
basename = os.path.basename(filename)
date_str = basename.split(suffix)[0]
try:
return datetime.strptime(date_str, "%Y-%m-%d_%H:%M:%S")
except ValueError:
# The date string cannot be parsed
return None
# Create a list of tuples (date, filename), ignoring files with unparsable dates
result = [(extract_date_from_filename(file), file) for file in files]
result = [item for item in result if item[0] is not None]
# Sort the list by date in descending order (most recent first)
result.sort(key=lambda x: x[0], reverse=True)
# Return the two most recent files
return [x[1] for x in result[:n]]
def analyze(df1, df2):
df1 = df1.drop_duplicates(subset="id")
df2 = df2.drop_duplicates(subset="id")
combined_df = pd.merge(
df1[["id", "price"]], df2, on="id", how="right", indicator=True
)
combined_df["discount"] = combined_df.price_y - combined_df.price_x
combined_df.drop(columns=["price_x"], inplace=True)
combined_df.rename(columns={"price_y": "price"}, inplace=True)
deals = combined_df.query("discount < 0").sort_values(by="discount")[
["id", "name", "price", "discount"]
]
new = combined_df.query("_merge == 'right_only'").sort_values(by="price")[
["id", "name", "price"]
]
return deals, new
if __name__ == "__main__":
csvs = get_csvs(".", 100)
for i in range(1, len(csvs)):
print(f"Comparing {csvs[i]} with {csvs[0]}")
df_previous = pd.read_csv(csvs[i], index_col=0)
df_latest = pd.read_csv(csvs[0], index_col=0)
deals, new = analyze(df_previous, df_latest)
done = False
if len(deals) > 0:
print()
print("New items:")
print(new)
print()
done = True
if len(deals) > 0:
print("Discounted items:")
print(deals)
done = True
if done:
break

View File

@@ -3,7 +3,7 @@ from utils.parse import parse_date
def parse(df: pl.LazyFrame) -> pl.LazyFrame: def parse(df: pl.LazyFrame) -> pl.LazyFrame:
"""Parse the Plato DataFrame.""" """Parse the Sounds DataFrame."""
return df.with_columns( return df.with_columns(
date=pl.col("date").cast(pl.Date), date=pl.col("date").cast(pl.Date),
artist=pl.coalesce(pl.col("artist"), pl.col("name").str.split("-").list.get(1)) artist=pl.coalesce(pl.col("artist"), pl.col("name").str.split("-").list.get(1))

View File

@@ -1,7 +1,4 @@
#!/usr/bin/python3
import time import time
from datetime import datetime
import pandas as pd import pandas as pd
import requests import requests
@@ -74,11 +71,11 @@ def parse_page(html_content):
) )
def fetch_deals(): def scrape():
# Get page count # Get page count
page_count = get_page_count( response = requests.get("https://www.sounds.nl/uitverkoop/1/lp/all/art")
requests.get("https://www.sounds.nl/uitverkoop/1/lp/all/art").text response.raise_for_status()
) page_count = get_page_count(response.text)
time.sleep(1) time.sleep(1)
print(f"Number of pages: {page_count}") print(f"Number of pages: {page_count}")
@@ -86,25 +83,11 @@ def fetch_deals():
base_url = "https://www.sounds.nl/uitverkoop/{page_number}/lp/all" base_url = "https://www.sounds.nl/uitverkoop/{page_number}/lp/all"
dfs = [] dfs = []
for i in tqdm(range(page_count)): for i in tqdm(range(page_count)):
df = parse_page(requests.get(base_url.format(page_number=i)).text) response = requests.get(base_url.format(page_number=i))
response.raise_for_status()
df = parse_page(response.text)
dfs.append(df) dfs.append(df)
time.sleep(2) time.sleep(2)
# Combine dfs # Combine dfs
return pd.concat(dfs) if dfs else pd.DataFrame(columns=["id", "name", "price"]) return pd.concat(dfs) if dfs else pd.DataFrame(columns=["id", "name", "price"])
if __name__ == "__main__":
df = fetch_deals()
print(f"Found {len(df)} deals")
# Show current deals
print(df.sort_values(by="price").head(10))
# Write to file
now = datetime.now()
prefix = now.strftime("%Y-%m-%d_%H:%M:%S")
directory = "/home/bram/src/python"
filepath = f"{directory}/{prefix}_sounds.csv"
print(f"Writing data to {filepath}")
df.to_csv(filepath)

View File

@@ -1,391 +0,0 @@
# This file was autogenerated by uv via the following command:
# uv pip compile pyproject.toml --extra=dagster --extra=weather
alembic==1.16.4
# via dagster
annotated-types==0.7.0
# via pydantic
antlr4-python3-runtime==4.13.2
# via dagster
anyio==4.9.0
# via
# gql
# starlette
# watchfiles
asttokens==3.0.0
# via icecream
attrs==25.3.0
# via
# cattrs
# requests-cache
backoff==2.2.1
# via gql
beautifulsoup4==4.13.4
# via dev (pyproject.toml)
boto3==1.40.1
# via
# dev (pyproject.toml)
# dagster-aws
botocore==1.40.1
# via
# boto3
# s3fs
# s3transfer
cattrs==25.1.1
# via requests-cache
certifi==2025.7.14
# via requests
charset-normalizer==3.4.2
# via
# niquests
# requests
click==8.1.8
# via
# dagster
# dagster-webserver
# uvicorn
colorama==0.4.6
# via icecream
coloredlogs==14.0
# via dagster
contourpy==1.3.3
# via matplotlib
cramjam==2.11.0
# via fastparquet
cycler==0.12.1
# via matplotlib
dagit==1.11.4
# via dev (pyproject.toml)
dagster==1.11.4
# via
# dev (pyproject.toml)
# dagster-aws
# dagster-docker
# dagster-duckdb
# dagster-duckdb-pandas
# dagster-graphql
# dagster-polars
# dagster-postgres
# dagster-webserver
dagster-aws==0.27.4
# via dev (pyproject.toml)
dagster-docker==0.27.4
# via dev (pyproject.toml)
dagster-duckdb==0.27.4
# via
# dev (pyproject.toml)
# dagster-duckdb-pandas
dagster-duckdb-pandas==0.27.4
# via dev (pyproject.toml)
dagster-graphql==1.11.4
# via
# dev (pyproject.toml)
# dagster-webserver
dagster-pipes==1.11.4
# via dagster
dagster-polars==0.27.4
# via dev (pyproject.toml)
dagster-postgres==0.27.4
# via dev (pyproject.toml)
dagster-shared==1.11.4
# via dagster
dagster-webserver==1.11.4
# via dagit
dnspython==2.7.0
# via email-validator
docker==7.1.0
# via dagster-docker
docker-image-py==0.1.13
# via dagster-docker
docstring-parser==0.17.0
# via dagster
duckdb==1.3.2
# via
# dev (pyproject.toml)
# dagster-duckdb
email-validator==2.2.0
# via pydantic
et-xmlfile==2.0.0
# via openpyxl
executing==2.2.0
# via icecream
fastparquet==2024.11.0
# via dev (pyproject.toml)
filelock==3.18.0
# via dagster
flatbuffers==25.2.10
# via openmeteo-sdk
fonttools==4.59.0
# via matplotlib
fsspec==2025.7.0
# via
# fastparquet
# s3fs
# universal-pathlib
gitdb==4.0.12
# via gitpython
gitpython==3.1.45
# via dev (pyproject.toml)
gql==3.5.3
# via dagster-graphql
graphene==3.4.3
# via dagster-graphql
graphql-core==3.2.6
# via
# gql
# graphene
# graphql-relay
graphql-relay==3.2.0
# via graphene
grpcio==1.74.0
# via
# dagster
# grpcio-health-checking
grpcio-health-checking==1.71.2
# via dagster
h11==0.16.0
# via
# urllib3-future
# uvicorn
httptools==0.6.4
# via uvicorn
humanfriendly==10.0
# via coloredlogs
icecream==2.1.5
# via dev (pyproject.toml)
idna==3.10
# via
# anyio
# email-validator
# requests
# url-normalize
# yarl
jh2==5.0.9
# via urllib3-future
jinja2==3.1.6
# via dagster
jmespath==1.0.1
# via
# boto3
# botocore
kiwisolver==1.4.8
# via matplotlib
lxml==6.0.0
# via dev (pyproject.toml)
mako==1.3.10
# via alembic
markdown-it-py==3.0.0
# via rich
markupsafe==3.0.2
# via
# jinja2
# mako
matplotlib==3.10.5
# via seaborn
mdurl==0.1.2
# via markdown-it-py
multidict==6.6.3
# via yarl
niquests==3.14.1
# via openmeteo-requests
numpy==2.3.2
# via
# contourpy
# fastparquet
# matplotlib
# pandas
# seaborn
openmeteo-requests==1.6.0
# via dev (pyproject.toml)
openmeteo-sdk==1.20.1
# via openmeteo-requests
openpyxl==3.1.5
# via dev (pyproject.toml)
packaging==25.0
# via
# dagster-aws
# dagster-shared
# fastparquet
# matplotlib
pandas==2.3.1
# via
# dev (pyproject.toml)
# dagster-duckdb-pandas
# fastparquet
# seaborn
patito==0.8.3
# via
# dev (pyproject.toml)
# dagster-polars
pillow==11.3.0
# via matplotlib
platformdirs==4.3.8
# via requests-cache
polars==1.32.0
# via
# dagster-polars
# patito
propcache==0.3.2
# via yarl
protobuf==5.29.5
# via
# dagster
# grpcio-health-checking
psycopg2-binary==2.9.10
# via dagster-postgres
pyarrow==21.0.0
# via
# dev (pyproject.toml)
# dagster-polars
pydantic==2.11.7
# via
# dev (pyproject.toml)
# dagster-shared
# patito
# pydantic-settings
pydantic-core==2.33.2
# via pydantic
pydantic-settings==2.10.1
# via dev (pyproject.toml)
pygments==2.19.2
# via
# icecream
# rich
pyparsing==3.2.3
# via matplotlib
pysocks==1.7.1
# via requests
python-dateutil==2.9.0.post0
# via
# botocore
# graphene
# matplotlib
# pandas
python-dotenv==1.1.1
# via
# dagster
# pydantic-settings
# uvicorn
pytz==2025.2
# via
# dagster
# pandas
pyyaml==6.0.2
# via
# dev (pyproject.toml)
# dagster-shared
# uvicorn
qh3==1.5.3
# via urllib3-future
regex==2025.7.34
# via docker-image-py
requests==2.32.4
# via
# dev (pyproject.toml)
# dagster
# dagster-aws
# dagster-graphql
# docker
# gql
# requests-cache
# requests-toolbelt
# retry-requests
requests-cache==1.2.1
# via dev (pyproject.toml)
requests-toolbelt==1.0.0
# via gql
retry-requests==2.0.0
# via dev (pyproject.toml)
rich==14.1.0
# via dagster
s3fs==0.4.2
# via dev (pyproject.toml)
s3transfer==0.13.1
# via boto3
seaborn==0.13.2
# via dev (pyproject.toml)
setuptools==80.9.0
# via dagster
six==1.17.0
# via
# dagster
# python-dateutil
smmap==5.0.2
# via gitdb
sniffio==1.3.1
# via anyio
soupsieve==2.7
# via beautifulsoup4
sqlalchemy==2.0.42
# via
# alembic
# dagster
starlette==0.47.2
# via
# dagster-graphql
# dagster-webserver
structlog==25.4.0
# via
# dev (pyproject.toml)
# dagster
tabulate==0.9.0
# via dagster
tomli==2.2.1
# via dagster
tomlkit==0.13.3
# via dagster-shared
toposort==1.10
# via dagster
tqdm==4.67.1
# via dagster
typing-extensions==4.14.1
# via
# alembic
# anyio
# beautifulsoup4
# cattrs
# dagster-polars
# dagster-shared
# graphene
# patito
# pydantic
# pydantic-core
# sqlalchemy
# starlette
# typing-inspection
typing-inspection==0.4.1
# via
# pydantic
# pydantic-settings
tzdata==2025.2
# via pandas
universal-pathlib==0.2.6
# via
# dagster
# dagster-polars
url-normalize==2.2.1
# via requests-cache
urllib3==2.5.0
# via
# botocore
# docker
# requests
# requests-cache
# retry-requests
urllib3-future==2.13.901
# via niquests
uvicorn==0.35.0
# via dagster-webserver
uvloop==0.21.0
# via uvicorn
wassima==1.2.2
# via niquests
watchdog==5.0.3
# via dagster
watchfiles==1.1.0
# via uvicorn
websockets==15.0.1
# via uvicorn
xlsxwriter==3.2.5
# via dev (pyproject.toml)
yarl==1.20.1
# via gql

View File

@@ -102,13 +102,15 @@ def raw_weather(context: dg.AssetExecutionContext) -> Any:
now = datetime.now(tz=timezone.utc) now = datetime.now(tz=timezone.utc)
date_str = now.strftime("%Y-%m-%d") date_str = now.strftime("%Y-%m-%d")
time_str = now.strftime("%H:%M:%S") time_str = now.strftime("%H:%M:%S")
latitude_str, longitude_str = partition_key[:5], partition_key[5:]
yield dg.Output( yield dg.Output(
data, data,
metadata={ metadata={
"date": dg.MetadataValue.timestamp(now), "date": dg.MetadataValue.timestamp(now),
"latitude": dg.MetadataValue.float(latitude), "latitude": dg.MetadataValue.float(latitude),
"longitude": dg.MetadataValue.float(longitude), "longitude": dg.MetadataValue.float(longitude),
"path_suffix": [date_str, time_str], "path": [APP, "raw", date_str, latitude_str, longitude_str, time_str],
}, },
) )
@@ -144,6 +146,7 @@ def raw_weather_batch_latitude(context: dg.AssetExecutionContext) -> None:
fetcher = WeatherFetcher() fetcher = WeatherFetcher()
latitude, longitude = parse_coordinate_str(location) latitude, longitude = parse_coordinate_str(location)
ic(latitude, longitude)
data = fetcher.fetch(latitude=latitude, longitude=longitude) data = fetcher.fetch(latitude=latitude, longitude=longitude)
now = datetime.now(tz=timezone.utc) now = datetime.now(tz=timezone.utc)
@@ -176,6 +179,7 @@ def raw_weather_batch_latitude(context: dg.AssetExecutionContext) -> None:
io_manager_key="polars_parquet_io_manager", io_manager_key="polars_parquet_io_manager",
partitions_def=daily_partitions_def, partitions_def=daily_partitions_def,
output_required=False, output_required=False,
automation_condition=dg.AutomationCondition.eager(),
) )
def parsed_weather( def parsed_weather(
context: dg.AssetExecutionContext, context: dg.AssetExecutionContext,

View File

@@ -39,12 +39,12 @@ def list_locations(context: dg.SensorEvaluationContext) -> dg.SensorResult:
if new_locations: if new_locations:
context.log.info(f"Discovered {len(new_locations)} new locations.") context.log.info(f"Discovered {len(new_locations)} new locations.")
# Limit to 3 new locations
selected = new_locations[:3]
return dg.SensorResult( return dg.SensorResult(
run_requests=[], # dg.RunRequest(partition_key=location) for location in locations], run_requests=[
dg.RunRequest(partition_key=location) for location in new_locations[:3]
],
dynamic_partitions_requests=[ dynamic_partitions_requests=[
location_partitions_def.build_add_request(selected), location_partitions_def.build_add_request(new_locations),
latitude_partitions_def.build_add_request(new_latitudes), latitude_partitions_def.build_add_request(new_latitudes),
longitude_partitions_def.build_add_request(new_longitudes), longitude_partitions_def.build_add_request(new_longitudes),
], ],

View File

@@ -35,8 +35,8 @@ services:
- /opt/dagster/apps/:/code/apps/:ro - /opt/dagster/apps/:/code/apps/:ro
- /opt/dagster/shared/:/code/shared/:ro - /opt/dagster/shared/:/code/shared/:ro
- /opt/dagster/logs/:/logs:rw - /opt/dagster/logs/:/logs:rw
- /opt/dagster/storage/import/:/storage/import/:ro # - /mnt/mezzo/scratch/dagster/import/:/storage/import/:ro
- /opt/dagster/storage/deals/:/storage/deals/:rw - /mnt/mezzo/scratch/dagster/deals/:/storage/deals/:rw
networks: networks:
- dagster - dagster
@@ -53,21 +53,32 @@ services:
<<: [ *dagster_env ] <<: [ *dagster_env ]
DAGSTER_CURRENT_IMAGE: dagster-code-stocks DAGSTER_CURRENT_IMAGE: dagster-code-stocks
volumes: volumes:
- /tmp/cache:/cache:rw
- /opt/dagster/apps/:/code/apps/:ro - /opt/dagster/apps/:/code/apps/:ro
- /opt/dagster/shared/:/code/shared/:ro - /opt/dagster/shared/:/code/shared/:ro
- /opt/dagster/logs/:/logs:rw - /opt/dagster/logs/:/logs:rw
- /tmp/cache:/cache:rw
networks: networks:
- dagster - dagster
dagster-code-stocks-playwright: dagster-code-stocks-playwright:
build: build:
context: apps/stocks context: apps/stocks
dockerfile: ../../Dockerfile.code.playwright dockerfile: Dockerfile.code.playwright
args: args:
- APP=stocks - APP=stocks
image: dagster-code-stocks-playwright image: dagster-code-stocks-playwright
profiles: [ "never" ] profiles: [ "never" ]
dagster-code-backup-base:
build:
context: apps/backup
dockerfile: ../../Dockerfile.code
args:
- APP=backup
image: dagster-code-backup-base
profiles: [ "never" ]
dagster-code-tesla: dagster-code-tesla:
build: build:
context: apps/tesla context: apps/tesla
@@ -106,6 +117,22 @@ services:
networks: networks:
- dagster - dagster
dagster-code-backup:
build:
context: apps/backup
container_name: dagster-code-backup
image: dagster-code-backup
restart: always
environment:
<<: [ *dagster_env ]
DAGSTER_CURRENT_IMAGE: dagster-code-backup
volumes:
- /opt/dagster/apps/:/code/apps/:ro
- /opt/dagster/shared/:/code/shared/:ro
- /opt/dagster/logs/:/logs:rw
networks:
- dagster
dagster-code-other: dagster-code-other:
build: build:
context: apps/other context: apps/other

View File

@@ -6,6 +6,7 @@ x-postgres-env: &postgres_env
POSTGRES_DB: ${POSTGRES_DB} POSTGRES_DB: ${POSTGRES_DB}
x-system-env: &system_env x-system-env: &system_env
TZ: Europe/Amsterdam TZ: Europe/Amsterdam
DATA_DIR: ${DATA_DIR}
CACHE_DIR: /tmp/cache CACHE_DIR: /tmp/cache
x-dagster-env: &dagster_env x-dagster-env: &dagster_env
DAGSTER_POSTGRES_HOST: ${POSTGRES_HOST} DAGSTER_POSTGRES_HOST: ${POSTGRES_HOST}
@@ -26,7 +27,7 @@ x-volumes: &volumes
- /opt/dagster/dagster.yaml:/opt/dagster/home/dagster.yaml.template:ro - /opt/dagster/dagster.yaml:/opt/dagster/home/dagster.yaml.template:ro
- /opt/dagster/workspace.yaml:/opt/dagster/home/workspace.yaml:ro - /opt/dagster/workspace.yaml:/opt/dagster/home/workspace.yaml:ro
- /opt/dagster/system/:/code/system/:ro - /opt/dagster/system/:/code/system/:ro
- /opt/dagster/storage/:/storage/:rw - /mnt/mezzo/scratch/dagster/:/storage/:rw
- /opt/dagster/logs/:/logs:rw - /opt/dagster/logs/:/logs:rw
- /var/run/docker.sock:/var/run/docker.sock:rw - /var/run/docker.sock:/var/run/docker.sock:rw
@@ -40,6 +41,8 @@ services:
<<: *postgres_env <<: *postgres_env
networks: networks:
- dagster - dagster
ports:
- '25432:5432'
volumes: volumes:
- /opt/dagster/db/:/var/lib/postgresql/data/ - /opt/dagster/db/:/var/lib/postgresql/data/

View File

@@ -1,351 +0,0 @@
# This file was autogenerated by uv via the following command:
# uv pip compile pyproject.toml --extra=dagster
alembic==1.16.4
# via dagster
annotated-types==0.7.0
# via pydantic
antlr4-python3-runtime==4.13.2
# via dagster
anyio==4.10.0
# via
# gql
# starlette
# watchfiles
asttokens==3.0.0
# via icecream
backoff==2.2.1
# via gql
beautifulsoup4==4.13.4
# via dev (pyproject.toml)
boto3==1.40.1
# via
# dev (pyproject.toml)
# dagster-aws
botocore==1.40.1
# via
# boto3
# s3fs
# s3transfer
certifi==2025.8.3
# via requests
charset-normalizer==3.4.2
# via requests
click==8.1.8
# via
# dagster
# dagster-webserver
# uvicorn
colorama==0.4.6
# via icecream
coloredlogs==14.0
# via dagster
contourpy==1.3.3
# via matplotlib
cramjam==2.11.0
# via fastparquet
cycler==0.12.1
# via matplotlib
dagit==1.11.4
# via dev (pyproject.toml)
dagster==1.11.4
# via
# dev (pyproject.toml)
# dagster-aws
# dagster-docker
# dagster-duckdb
# dagster-duckdb-pandas
# dagster-graphql
# dagster-polars
# dagster-postgres
# dagster-webserver
dagster-aws==0.27.4
# via dev (pyproject.toml)
dagster-docker==0.27.4
# via dev (pyproject.toml)
dagster-duckdb==0.27.4
# via
# dev (pyproject.toml)
# dagster-duckdb-pandas
dagster-duckdb-pandas==0.27.4
# via dev (pyproject.toml)
dagster-graphql==1.11.4
# via
# dev (pyproject.toml)
# dagster-webserver
dagster-pipes==1.11.4
# via dagster
dagster-polars==0.27.4
# via dev (pyproject.toml)
dagster-postgres==0.27.4
# via dev (pyproject.toml)
dagster-shared==1.11.4
# via dagster
dagster-webserver==1.11.4
# via dagit
dnspython==2.7.0
# via email-validator
docker==7.1.0
# via dagster-docker
docker-image-py==0.1.13
# via dagster-docker
docstring-parser==0.17.0
# via dagster
duckdb==1.3.2
# via
# dev (pyproject.toml)
# dagster-duckdb
email-validator==2.2.0
# via pydantic
et-xmlfile==2.0.0
# via openpyxl
executing==2.2.0
# via icecream
fastparquet==2024.11.0
# via dev (pyproject.toml)
filelock==3.18.0
# via dagster
fonttools==4.59.0
# via matplotlib
fsspec==2025.7.0
# via
# fastparquet
# s3fs
# universal-pathlib
gitdb==4.0.12
# via gitpython
gitpython==3.1.45
# via dev (pyproject.toml)
gql==3.5.3
# via dagster-graphql
graphene==3.4.3
# via dagster-graphql
graphql-core==3.2.6
# via
# gql
# graphene
# graphql-relay
graphql-relay==3.2.0
# via graphene
grpcio==1.74.0
# via
# dagster
# grpcio-health-checking
grpcio-health-checking==1.71.2
# via dagster
h11==0.16.0
# via uvicorn
httptools==0.6.4
# via uvicorn
humanfriendly==10.0
# via coloredlogs
icecream==2.1.5
# via dev (pyproject.toml)
idna==3.10
# via
# anyio
# email-validator
# requests
# yarl
jinja2==3.1.6
# via dagster
jmespath==1.0.1
# via
# boto3
# botocore
kiwisolver==1.4.8
# via matplotlib
lxml==6.0.0
# via dev (pyproject.toml)
mako==1.3.10
# via alembic
markdown-it-py==3.0.0
# via rich
markupsafe==3.0.2
# via
# jinja2
# mako
matplotlib==3.10.5
# via seaborn
mdurl==0.1.2
# via markdown-it-py
multidict==6.6.3
# via yarl
numpy==2.3.2
# via
# contourpy
# fastparquet
# matplotlib
# pandas
# seaborn
openpyxl==3.1.5
# via dev (pyproject.toml)
packaging==25.0
# via
# dagster-aws
# dagster-shared
# fastparquet
# matplotlib
pandas==2.3.1
# via
# dev (pyproject.toml)
# dagster-duckdb-pandas
# fastparquet
# seaborn
patito==0.8.3
# via
# dev (pyproject.toml)
# dagster-polars
pillow==11.3.0
# via matplotlib
polars==1.32.0
# via
# dagster-polars
# patito
propcache==0.3.2
# via yarl
protobuf==5.29.5
# via
# dagster
# grpcio-health-checking
psycopg2-binary==2.9.10
# via dagster-postgres
pyarrow==21.0.0
# via
# dev (pyproject.toml)
# dagster-polars
pydantic==2.11.7
# via
# dev (pyproject.toml)
# dagster-shared
# patito
# pydantic-settings
pydantic-core==2.33.2
# via pydantic
pydantic-settings==2.10.1
# via dev (pyproject.toml)
pygments==2.19.2
# via
# icecream
# rich
pyparsing==3.2.3
# via matplotlib
pysocks==1.7.1
# via requests
python-dateutil==2.9.0.post0
# via
# botocore
# graphene
# matplotlib
# pandas
python-dotenv==1.1.1
# via
# dagster
# pydantic-settings
# uvicorn
pytz==2025.2
# via
# dagster
# pandas
pyyaml==6.0.2
# via
# dev (pyproject.toml)
# dagster-shared
# uvicorn
regex==2025.7.34
# via docker-image-py
requests==2.32.4
# via
# dev (pyproject.toml)
# dagster
# dagster-aws
# dagster-graphql
# docker
# gql
# requests-toolbelt
requests-toolbelt==1.0.0
# via gql
rich==14.1.0
# via dagster
s3fs==0.4.2
# via dev (pyproject.toml)
s3transfer==0.13.1
# via boto3
seaborn==0.13.2
# via dev (pyproject.toml)
setuptools==80.9.0
# via dagster
six==1.17.0
# via
# dagster
# python-dateutil
smmap==5.0.2
# via gitdb
sniffio==1.3.1
# via anyio
soupsieve==2.7
# via beautifulsoup4
sqlalchemy==2.0.42
# via
# alembic
# dagster
starlette==0.47.2
# via
# dagster-graphql
# dagster-webserver
structlog==25.4.0
# via
# dev (pyproject.toml)
# dagster
tabulate==0.9.0
# via dagster
tomli==2.2.1
# via dagster
tomlkit==0.13.3
# via dagster-shared
toposort==1.10
# via dagster
tqdm==4.67.1
# via dagster
typing-extensions==4.14.1
# via
# alembic
# anyio
# beautifulsoup4
# dagster-polars
# dagster-shared
# graphene
# patito
# pydantic
# pydantic-core
# sqlalchemy
# starlette
# typing-inspection
typing-inspection==0.4.1
# via
# pydantic
# pydantic-settings
tzdata==2025.2
# via pandas
universal-pathlib==0.2.6
# via
# dagster
# dagster-polars
urllib3==2.5.0
# via
# botocore
# docker
# requests
uvicorn==0.35.0
# via dagster-webserver
uvloop==0.21.0
# via uvicorn
watchdog==5.0.3
# via dagster
watchfiles==1.1.0
# via uvicorn
websockets==15.0.1
# via uvicorn
xlsxwriter==3.2.5
# via dev (pyproject.toml)
yarl==1.20.1
# via gql

View File

@@ -15,11 +15,13 @@ run_launcher:
class: CustomDockerRunLauncher class: CustomDockerRunLauncher
config: config:
env_vars: env_vars:
- TZ
- DAGSTER_POSTGRES_HOST - DAGSTER_POSTGRES_HOST
- DAGSTER_POSTGRES_PORT - DAGSTER_POSTGRES_PORT
- DAGSTER_POSTGRES_USER - DAGSTER_POSTGRES_USER
- DAGSTER_POSTGRES_PASSWORD - DAGSTER_POSTGRES_PASSWORD
- DAGSTER_POSTGRES_DB - DAGSTER_POSTGRES_DB
- DATA_DIR
- SMTP_SERVER - SMTP_SERVER
- SMTP_PORT - SMTP_PORT
- SMTP_USERNAME - SMTP_USERNAME
@@ -32,8 +34,8 @@ run_launcher:
volumes: volumes:
- /opt/dagster/apps/:/code/apps/:ro - /opt/dagster/apps/:/code/apps/:ro
- /opt/dagster/shared/:/code/shared/:ro - /opt/dagster/shared/:/code/shared/:ro
- /opt/dagster/storage/:/storage/:rw
- /opt/dagster/logs/:/logs:rw - /opt/dagster/logs/:/logs:rw
- ${DATA_DIR}:/storage/:rw
- ${CACHE_DIR}:/cache:rw - ${CACHE_DIR}:/cache:rw
run_storage: run_storage:

View File

@@ -1,435 +0,0 @@
# This file was autogenerated by uv via the following command:
# uv pip compile pyproject.toml --extra=dagster --extra=dev
alembic==1.16.4
# via dagster
annotated-types==0.7.0
# via pydantic
antlr4-python3-runtime==4.13.2
# via dagster
anyio==4.10.0
# via
# gql
# starlette
# watchfiles
asttokens==3.0.0
# via icecream
attrs==25.3.0
# via
# jsonschema
# referencing
backoff==2.2.1
# via gql
beautifulsoup4==4.13.4
# via dev (pyproject.toml)
black==25.1.0
# via dev (pyproject.toml)
boto3==1.40.1
# via
# dev (pyproject.toml)
# dagster-aws
botocore==1.40.1
# via
# boto3
# s3fs
# s3transfer
build==1.3.0
# via pip-tools
certifi==2025.8.3
# via requests
cfgv==3.4.0
# via pre-commit
charset-normalizer==3.4.2
# via requests
click==8.1.8
# via
# black
# dagster
# dagster-webserver
# pip-tools
# uvicorn
colorama==0.4.6
# via icecream
coloredlogs==14.0
# via dagster
contourpy==1.3.3
# via matplotlib
cramjam==2.11.0
# via fastparquet
cycler==0.12.1
# via matplotlib
dagit==1.11.4
# via dev (pyproject.toml)
dagster==1.11.4
# via
# dev (pyproject.toml)
# dagster-aws
# dagster-docker
# dagster-duckdb
# dagster-duckdb-pandas
# dagster-graphql
# dagster-polars
# dagster-postgres
# dagster-webserver
dagster-aws==0.27.4
# via dev (pyproject.toml)
dagster-docker==0.27.4
# via dev (pyproject.toml)
dagster-duckdb==0.27.4
# via
# dev (pyproject.toml)
# dagster-duckdb-pandas
dagster-duckdb-pandas==0.27.4
# via dev (pyproject.toml)
dagster-graphql==1.11.4
# via
# dev (pyproject.toml)
# dagster-webserver
dagster-pipes==1.11.4
# via dagster
dagster-polars==0.27.4
# via dev (pyproject.toml)
dagster-postgres==0.27.4
# via dev (pyproject.toml)
dagster-shared==1.11.4
# via dagster
dagster-webserver==1.11.4
# via dagit
distlib==0.4.0
# via virtualenv
dnspython==2.7.0
# via email-validator
docker==7.1.0
# via dagster-docker
docker-image-py==0.1.13
# via dagster-docker
docstring-parser==0.17.0
# via dagster
duckdb==1.3.2
# via
# dev (pyproject.toml)
# dagster-duckdb
email-validator==2.2.0
# via pydantic
et-xmlfile==2.0.0
# via openpyxl
executing==2.2.0
# via icecream
fastjsonschema==2.21.1
# via nbformat
fastparquet==2024.11.0
# via dev (pyproject.toml)
filelock==3.18.0
# via
# dagster
# virtualenv
fonttools==4.59.0
# via matplotlib
fsspec==2025.7.0
# via
# fastparquet
# s3fs
# universal-pathlib
gitdb==4.0.12
# via gitpython
gitpython==3.1.45
# via dev (pyproject.toml)
gql==3.5.3
# via dagster-graphql
graphene==3.4.3
# via dagster-graphql
graphql-core==3.2.6
# via
# gql
# graphene
# graphql-relay
graphql-relay==3.2.0
# via graphene
grpcio==1.74.0
# via
# dagster
# grpcio-health-checking
grpcio-health-checking==1.71.2
# via dagster
h11==0.16.0
# via uvicorn
httptools==0.6.4
# via uvicorn
humanfriendly==10.0
# via coloredlogs
icecream==2.1.5
# via dev (pyproject.toml)
identify==2.6.12
# via pre-commit
idna==3.10
# via
# anyio
# email-validator
# requests
# yarl
isort==6.0.1
# via dev (pyproject.toml)
jinja2==3.1.6
# via dagster
jmespath==1.0.1
# via
# boto3
# botocore
jsonschema==4.25.0
# via nbformat
jsonschema-specifications==2025.4.1
# via jsonschema
jupyter-core==5.8.1
# via nbformat
kiwisolver==1.4.8
# via matplotlib
lxml==6.0.0
# via dev (pyproject.toml)
mako==1.3.10
# via alembic
markdown-it-py==3.0.0
# via rich
markupsafe==3.0.2
# via
# jinja2
# mako
matplotlib==3.10.5
# via seaborn
mdurl==0.1.2
# via markdown-it-py
multidict==6.6.3
# via yarl
mypy==1.17.1
# via dev (pyproject.toml)
mypy-extensions==1.1.0
# via
# black
# mypy
nbformat==5.10.4
# via nbstripout
nbstripout==0.8.1
# via dev (pyproject.toml)
nodeenv==1.9.1
# via pre-commit
numpy==2.3.2
# via
# contourpy
# fastparquet
# matplotlib
# pandas
# seaborn
openpyxl==3.1.5
# via dev (pyproject.toml)
packaging==25.0
# via
# black
# build
# dagster-aws
# dagster-shared
# fastparquet
# matplotlib
pandas==2.3.1
# via
# dev (pyproject.toml)
# dagster-duckdb-pandas
# fastparquet
# seaborn
pathspec==0.12.1
# via
# black
# mypy
patito==0.8.3
# via
# dev (pyproject.toml)
# dagster-polars
pillow==11.3.0
# via matplotlib
pip==25.2
# via pip-tools
pip-tools==7.5.0
# via dev (pyproject.toml)
platformdirs==4.3.8
# via
# black
# jupyter-core
# virtualenv
polars==1.32.0
# via
# dagster-polars
# patito
pre-commit==4.2.0
# via dev (pyproject.toml)
propcache==0.3.2
# via yarl
protobuf==5.29.5
# via
# dagster
# grpcio-health-checking
psycopg2-binary==2.9.10
# via dagster-postgres
pyarrow==21.0.0
# via
# dev (pyproject.toml)
# dagster-polars
pydantic==2.11.7
# via
# dev (pyproject.toml)
# dagster-shared
# patito
# pydantic-settings
pydantic-core==2.33.2
# via pydantic
pydantic-settings==2.10.1
# via dev (pyproject.toml)
pygments==2.19.2
# via
# icecream
# rich
pyparsing==3.2.3
# via matplotlib
pyproject-hooks==1.2.0
# via
# build
# pip-tools
pysocks==1.7.1
# via requests
python-dateutil==2.9.0.post0
# via
# botocore
# graphene
# matplotlib
# pandas
python-dotenv==1.1.1
# via
# dagster
# pydantic-settings
# uvicorn
pytz==2025.2
# via
# dagster
# pandas
pyyaml==6.0.2
# via
# dev (pyproject.toml)
# dagster-shared
# pre-commit
# uvicorn
referencing==0.36.2
# via
# jsonschema
# jsonschema-specifications
regex==2025.7.34
# via docker-image-py
requests==2.32.4
# via
# dev (pyproject.toml)
# dagster
# dagster-aws
# dagster-graphql
# docker
# gql
# requests-toolbelt
requests-toolbelt==1.0.0
# via gql
rich==14.1.0
# via dagster
rpds-py==0.26.0
# via
# jsonschema
# referencing
ruff==0.12.7
# via dev (pyproject.toml)
s3fs==0.4.2
# via dev (pyproject.toml)
s3transfer==0.13.1
# via boto3
seaborn==0.13.2
# via dev (pyproject.toml)
setuptools==80.9.0
# via
# dagster
# pip-tools
six==1.17.0
# via
# dagster
# python-dateutil
smmap==5.0.2
# via gitdb
sniffio==1.3.1
# via anyio
soupsieve==2.7
# via beautifulsoup4
sqlalchemy==2.0.42
# via
# alembic
# dagster
starlette==0.47.2
# via
# dagster-graphql
# dagster-webserver
structlog==25.4.0
# via
# dev (pyproject.toml)
# dagster
tabulate==0.9.0
# via dagster
tomli==2.2.1
# via dagster
tomlkit==0.13.3
# via dagster-shared
toposort==1.10
# via dagster
tqdm==4.67.1
# via dagster
traitlets==5.14.3
# via
# jupyter-core
# nbformat
typing-extensions==4.14.1
# via
# alembic
# anyio
# beautifulsoup4
# dagster-polars
# dagster-shared
# graphene
# mypy
# patito
# pydantic
# pydantic-core
# referencing
# sqlalchemy
# starlette
# typing-inspection
typing-inspection==0.4.1
# via
# pydantic
# pydantic-settings
tzdata==2025.2
# via pandas
universal-pathlib==0.2.6
# via
# dagster
# dagster-polars
urllib3==2.5.0
# via
# botocore
# docker
# requests
uvicorn==0.35.0
# via dagster-webserver
uvloop==0.21.0
# via uvicorn
virtualenv==20.33.0
# via pre-commit
watchdog==5.0.3
# via dagster
watchfiles==1.1.0
# via uvicorn
websockets==15.0.1
# via uvicorn
wheel==0.45.1
# via pip-tools
xlsxwriter==3.2.5
# via dev (pyproject.toml)
yarl==1.20.1
# via gql

8
entrypoint.sh Normal file
View File

@@ -0,0 +1,8 @@
#!/bin/sh
set -e
echo "Rendering dagster.yaml from template..."
envsubst < dagster.yaml.template > dagster.yaml
echo "Starting Dagster: $@"
exec "$@"

19
poetry.lock generated
View File

@@ -1,19 +0,0 @@
# This file is automatically @generated by Poetry 1.7.1 and should not be changed by hand.
[[package]]
name = "seven"
version = "1.0.0"
description = "Python 2.5 compatibility wrapper for Python 2.7 code."
optional = false
python-versions = "*"
files = [
{file = "seven-1.0.0.tar.gz", hash = "sha256:e80157857dc378545b0cd8626668bf0e20d7f3608a5587f3fcc71a56d2416814"},
]
[package.extras]
tests = ["zope.testing"]
[metadata]
lock-version = "2.0"
python-versions = "*"
content-hash = "edfc27fcb4a7dc1b1a11f2224d7b7f3e936c5f624df1dd86207c4dc08e047b5d"

View File

@@ -16,6 +16,7 @@ dependencies = [
"openpyxl", "openpyxl",
"pandas", "pandas",
"patito", "patito",
"polars==1.32.0",
"pyarrow", "pyarrow",
"pydantic[email]", "pydantic[email]",
"pydantic-settings", "pydantic-settings",
@@ -43,12 +44,12 @@ local = [
"ipywidgets" "ipywidgets"
] ]
dagster = [ dagster = [
"dagster", "dagster==1.11.4",
"dagster-graphql", "dagster-graphql",
"dagster-postgres", "dagster-postgres",
"dagster-docker", "dagster-docker",
"dagster-aws", "dagster-aws",
"dagster-polars[patito]", "dagster-polars[patito]==0.27.4",
"dagster-duckdb", "dagster-duckdb",
"dagster-duckdb-pandas", "dagster-duckdb-pandas",
"dagit" "dagit"
@@ -65,6 +66,9 @@ weather = [
"requests_cache", "requests_cache",
"retry_requests" "retry_requests"
] ]
backup = [
"paramiko"
]
other = [ other = [
# "deltalake>=1.0.0", # "deltalake>=1.0.0",
# "dagster-deltalake-pandas", # "dagster-deltalake-pandas",

View File

@@ -5,4 +5,5 @@ uv pip compile pyproject.toml --extra=dagster --extra=vinyl > apps/vinyl/require
uv pip compile pyproject.toml --extra=dagster --extra=stocks > apps/stocks/requirements.txt uv pip compile pyproject.toml --extra=dagster --extra=stocks > apps/stocks/requirements.txt
uv pip compile pyproject.toml --extra=dagster --extra=tesla > apps/tesla/requirements.txt uv pip compile pyproject.toml --extra=dagster --extra=tesla > apps/tesla/requirements.txt
uv pip compile pyproject.toml --extra=dagster --extra=weather > apps/weather/requirements.txt uv pip compile pyproject.toml --extra=dagster --extra=weather > apps/weather/requirements.txt
uv pip compile pyproject.toml --extra=dagster --extra=backup > apps/backup/requirements.txt
uv pip compile pyproject.toml --extra=dagster --extra=other > apps/other/requirements.txt uv pip compile pyproject.toml --extra=dagster --extra=other > apps/other/requirements.txt

View File

@@ -1,152 +0,0 @@
# This file was autogenerated by uv via the following command:
# uv pip compile pyproject.toml
annotated-types==0.7.0
# via pydantic
asttokens==3.0.0
# via icecream
beautifulsoup4==4.13.4
# via dev (pyproject.toml)
boto3==1.40.1
# via dev (pyproject.toml)
botocore==1.40.1
# via
# boto3
# s3fs
# s3transfer
certifi==2025.8.3
# via requests
charset-normalizer==3.4.2
# via requests
colorama==0.4.6
# via icecream
contourpy==1.3.3
# via matplotlib
cramjam==2.11.0
# via fastparquet
cycler==0.12.1
# via matplotlib
dnspython==2.7.0
# via email-validator
duckdb==1.3.2
# via dev (pyproject.toml)
email-validator==2.2.0
# via pydantic
et-xmlfile==2.0.0
# via openpyxl
executing==2.2.0
# via icecream
fastparquet==2024.11.0
# via dev (pyproject.toml)
fonttools==4.59.0
# via matplotlib
fsspec==2025.7.0
# via
# fastparquet
# s3fs
gitdb==4.0.12
# via gitpython
gitpython==3.1.45
# via dev (pyproject.toml)
icecream==2.1.5
# via dev (pyproject.toml)
idna==3.10
# via
# email-validator
# requests
jmespath==1.0.1
# via
# boto3
# botocore
kiwisolver==1.4.8
# via matplotlib
lxml==6.0.0
# via dev (pyproject.toml)
matplotlib==3.10.5
# via seaborn
numpy==2.3.2
# via
# contourpy
# fastparquet
# matplotlib
# pandas
# seaborn
openpyxl==3.1.5
# via dev (pyproject.toml)
packaging==25.0
# via
# fastparquet
# matplotlib
pandas==2.3.1
# via
# dev (pyproject.toml)
# fastparquet
# seaborn
patito==0.8.3
# via dev (pyproject.toml)
pillow==11.3.0
# via matplotlib
polars==1.32.0
# via patito
pyarrow==21.0.0
# via dev (pyproject.toml)
pydantic==2.11.7
# via
# dev (pyproject.toml)
# patito
# pydantic-settings
pydantic-core==2.33.2
# via pydantic
pydantic-settings==2.10.1
# via dev (pyproject.toml)
pygments==2.19.2
# via icecream
pyparsing==3.2.3
# via matplotlib
pysocks==1.7.1
# via requests
python-dateutil==2.9.0.post0
# via
# botocore
# matplotlib
# pandas
python-dotenv==1.1.1
# via pydantic-settings
pytz==2025.2
# via pandas
pyyaml==6.0.2
# via dev (pyproject.toml)
requests==2.32.4
# via dev (pyproject.toml)
s3fs==0.4.2
# via dev (pyproject.toml)
s3transfer==0.13.1
# via boto3
seaborn==0.13.2
# via dev (pyproject.toml)
six==1.17.0
# via python-dateutil
smmap==5.0.2
# via gitdb
soupsieve==2.7
# via beautifulsoup4
structlog==25.4.0
# via dev (pyproject.toml)
typing-extensions==4.14.1
# via
# beautifulsoup4
# patito
# pydantic
# pydantic-core
# typing-inspection
typing-inspection==0.4.1
# via
# pydantic
# pydantic-settings
tzdata==2025.2
# via pandas
urllib3==2.5.0
# via
# botocore
# requests
xlsxwriter==3.2.5
# via dev (pyproject.toml)

View File

@@ -6,6 +6,10 @@ from pydantic import Field, PrivateAttr
from upath import UPath from upath import UPath
import dagster as dg import dagster as dg
from dagster import (
InputContext,
OutputContext,
)
def _process_env_vars(config: dict[str, Any]) -> dict[str, Any]: def _process_env_vars(config: dict[str, Any]) -> dict[str, Any]:
@@ -60,12 +64,26 @@ class BaseIOManager(dg.ConfigurableIOManager, dg.UPathIOManager, ABC):
with path.open("r") as fp: with path.open("r") as fp:
return json.load(fp) return json.load(fp)
def get_path_for_partition(
self, context: InputContext | OutputContext, path: "UPath", partition: str
) -> UPath:
"""Use path from metadata when provided."""
ic()
context_metadata = context.output_metadata or {}
ic(context_metadata)
if "path" in context_metadata:
return UPath(*context_metadata["path"].value)
return super().get_path_for_partition(context)
def get_asset_relative_path( def get_asset_relative_path(
self, context: dg.InputContext | dg.OutputContext self, context: dg.InputContext | dg.OutputContext
) -> UPath: ) -> UPath:
"""Get the relative path for the asset based on context metadata.""" """Get the relative path for the asset based on context metadata."""
ic()
context_metadata = context.output_metadata or {} context_metadata = context.output_metadata or {}
ic(context_metadata) ic(context_metadata)
path_prefix = ( path_prefix = (
context_metadata["path_prefix"].value context_metadata["path_prefix"].value
if "path_prefix" in context_metadata if "path_prefix" in context_metadata

View File

@@ -15,6 +15,10 @@ load_from:
location_name: weather location_name: weather
host: dagster-code-weather host: dagster-code-weather
port: 4000 port: 4000
- grpc_server:
location_name: backup
host: dagster-code-backup
port: 4000
- grpc_server: - grpc_server:
location_name: other location_name: other
host: dagster-code-other host: dagster-code-other