diff --git a/authentik/database/postmaster.opts b/authentik/database/postmaster.opts new file mode 100644 index 0000000..77c8b5d --- /dev/null +++ b/authentik/database/postmaster.opts @@ -0,0 +1 @@ +/usr/local/bin/postgres diff --git a/borgmatic/keys/argenta.key b/borgmatic/keys/argenta.key new file mode 100644 index 0000000..87c522b --- /dev/null +++ b/borgmatic/keys/argenta.key @@ -0,0 +1 @@ +iXVAFI8sjSwzzbUOm5XU diff --git a/borgmatic/keys/bram.key b/borgmatic/keys/bram.key new file mode 100644 index 0000000..0eb728f --- /dev/null +++ b/borgmatic/keys/bram.key @@ -0,0 +1 @@ +2ULHLyKziqxvZajNoafS diff --git a/collectd/docker/Dockerfile.bookworm b/collectd/docker/Dockerfile.bookworm new file mode 100644 index 0000000..8760e79 --- /dev/null +++ b/collectd/docker/Dockerfile.bookworm @@ -0,0 +1,35 @@ +FROM debian:bookworm + +ENV LC_ALL=C +ENV DEBIAN_FRONTEND=noninteractive +ENV TIMEZONE=Europe/Amsterdam + +RUN apt update +RUN apt dist-upgrade -y --no-install-recommends + +RUN echo $TIMEZONE > /etc/timezone +RUN dpkg-reconfigure -f noninteractive tzdata + +RUN apt install -y libsensors5 liblzo2-2 collectd sudo btrfs-progs libatasmart4 speedtest-cli + +RUN apt install -y smartmontools + +RUN apt install -y wget git + +ENV HDDTEMP_VERSION=0.2.4 +RUN wget https://github.com/slowpeek/hddtemp/archive/refs/tags/${HDDTEMP_VERSION}.tar.gz \ + && tar xvf ${HDDTEMP_VERSION}.tar.gz && mv hddtemp-${HDDTEMP_VERSION}/hddtemp-lt /usr/sbin/hddtemp + +RUN apt install -y gcc python3-dev make +RUN git clone https://github.com/RRZE-HPC/likwid.git +RUN cd likwid && make -j && make install +RUN git clone https://github.com/RRZE-HPC/pylikwid.git +RUN cd pylikwid && python3 setup.py build_ext && python3 setup.py install +RUN apt remove -y gcc python-dev make +RUN apt autoremove -y + +RUN useradd collectd +RUN usermod -aG sudo collectd +RUN echo 'collectd ALL=(ALL) NOPASSWD: ALL' >> /etc/sudoers.d/collectd + +CMD /usr/sbin/collectd -f diff --git a/collectd/docker/Dockerfile.buster b/collectd/docker/Dockerfile.buster new file mode 100644 index 0000000..eaca4a1 --- /dev/null +++ b/collectd/docker/Dockerfile.buster @@ -0,0 +1,31 @@ +FROM debian:buster + +ENV LC_ALL=C +ENV DEBIAN_FRONTEND=noninteractive +ENV TIMEZONE=Europe/Amsterdam + +RUN apt update +RUN apt dist-upgrade -y --no-install-recommends + +RUN echo $TIMEZONE > /etc/timezone +RUN dpkg-reconfigure -f noninteractive tzdata + +RUN apt install -y software-properties-common gpgv dirmngr psmisc wget curl python3-pip git gawk zip gperf unzip lbzip2 inetutils-ping inetutils-telnet rsync + +RUN pip3 install argparse + +RUN apt install -y libsensors5 liblzo2-2 hddtemp collectd sudo btrfs-progs libatasmart4 speedtest-cli + +RUN useradd collectd +RUN usermod -aG sudo collectd +RUN echo 'collectd ALL=(ALL) NOPASSWD: ALL' >> /etc/sudoers.d/collectd + +RUN apt install -y gcc python-dev make +RUN git clone https://github.com/RRZE-HPC/likwid.git +RUN cd likwid && make -j && make install +RUN git clone https://github.com/RRZE-HPC/pylikwid.git +RUN cd pylikwid && python setup.py build_ext && python setup.py install + +ENV LD_LIBRARY_PATH /usr/local/lib + +CMD /usr/sbin/collectd -f diff --git a/collectd/usr/local/bin/btrfs-data b/collectd/usr/local/bin/btrfs-data new file mode 100755 index 0000000..cd33608 --- /dev/null +++ b/collectd/usr/local/bin/btrfs-data @@ -0,0 +1,212 @@ +#!/usr/bin/python -u + +# +# Imports +# +import sys +import time +import commands +import argparse + + +# +# Misc +# +#sys.tracebacklimit = 0 + + +# +# Global variables +# +size_data_total = 0 +size_data_exclusive = 0 +size_snapshot_total = 0 +size_snapshot_exclusive = 0 + + +# +# Methods +# +def get_subvol_list(path): + command = 'sudo btrfs subvolume list -t %s' % (path) + status, output = commands.getstatusoutput(command) + + if status is not 0: + raise Exception(command) + + # Every line contains the following values: subvol_id, gen, toplevel, path + return output.splitlines()[2:] + +def get_filesystem_size(path): + command = 'sudo btrfs filesystem show --raw %s' % (path) + status, output = commands.getstatusoutput(command) + + if status is not 0 or True: + # This command fails when running inside Docker container + # return maximum size of any filesystem instead + command = 'sudo btrfs filesystem show --raw' + status, output = commands.getstatusoutput(command) + lines = output.splitlines() + lines = filter(lambda x: 'devid' in x, lines) + sizes = [int(line.split()[3]) for line in lines] + return max(sizes) + + # The sizes are on the third line + line = output.splitlines()[2] + + # Element 3 and 5 respectively contain total and used sizes + return int(line.split()[3]) + +def get_id_root(name, path): + lines = get_subvol_list(path) + + # Filter lines where toplevel == 5 + subvol_ids = filter(lambda x: int(x.split()[2]) == 5, lines) + + # Try to retrieve the subvol_id for the root subvolume (if any) + if len(subvol_ids) == 1: + # The path contains a btrfs filesystem without subvolume for data + return int(subvol_ids[0].split()[0]) + else: + # The path contains a btrfs filesystem with multiple subvolumes for data + try: + return int(filter(lambda x: x.split()[3] == name, subvol_ids)[0].split()[0]) + except IndexError: + pass + + + # Volume not found, root is probably the btrfs default (5) + return 5 + +def get_id_subvolumes(path, subvol_id): + lines = get_subvol_list(path) + lines = filter(lambda x: int(x.split()[2]) == subvol_id, lines) + return list(map(lambda x: int(x.split()[0]), lines)) + + +def get_disk_usage(name, path): + id_root = get_id_root(name, path) + id_subvolumes = get_id_subvolumes(path, id_root) + size_filesystem = get_filesystem_size(path) + + # Get disk usage from quota + command = 'sudo btrfs qgroup show --raw %s' % (path) + status, output = commands.getstatusoutput(command) + + if status is not 0: + raise Exception(command) + + lines = output.splitlines()[2:] + + # Global variables + global size_data_total + global size_data_exclusive + global size_snapshot_total + global size_snapshot_exclusive + + # Total data volume in subvolume + size_data_total = 0 + + # Total data volume in snapshots + # -> this variable is useless + size_snapshot_total = 0 + + # Data exclusively in subvolume + # -> data that is not (yet) incorporated in a snapshot + size_data_exclusive = 0 + + # Data exclusively available in snapshots + # -> data that was removed from volume + size_snapshot_exclusive = 0 + + for line in lines: + split = line.split() + subvol_id = 0 + size_total = 0 + size_exclusive = 0 + try: + subvol_id = int(split[0].split('/')[1]) + size_total = float(split[1]) + size_exclusive = float(split[2]) + except IndexError: + # ignore 'WARNING: Quota disabled' + pass + + # size_exclusive is incorrect when snapshot is + # removed and qgroups are not updated yet, + # ignore the value when it seems unrealistic + if size_exclusive > size_filesystem: + size_exclusive = 0 + + if subvol_id == id_root: + size_data_total = size_total + size_data_exclusive = size_exclusive + elif subvol_id in id_subvolumes: + size_snapshot_total += size_total + size_snapshot_exclusive += size_exclusive + +def rescan_quota(path): + command = 'sudo btrfs quota rescan %s' % (path) + status, output = commands.getstatusoutput(command) + if status is not 0: + Exception(command) + +def print_human_readable(name): + global size_data_total + global size_data_exclusive + global size_snapshot_exclusive + size_data_total = size_data_total / (1024*1e6) + size_data_exclusive = size_data_exclusive / (1024*1e6) + size_snapshot_exclusive = size_snapshot_exclusive / (1024*1e6) + print '%10s: %6.1f Gb, %6.1f Gb, %6.1f Gb' % (name, size_data_total, size_data_exclusive, size_snapshot_exclusive) + +def print_rrd(name): + timestamp = int(time.time()) + print('PUTVAL {}/exec-btrfs_{}/gauge-data_total {}:{:.1f}'.format(hostname, name, timestamp, size_data_total)) + print('PUTVAL {}/exec-btrfs_{}/gauge-data_exclusive {}:{:.1f}'.format(hostname, name, timestamp, size_data_exclusive)) + print('PUTVAL {}/exec-btrfs_{}/gauge-snapshot_total {}:{:.1f}'.format(hostname, name, timestamp, size_snapshot_total)) + print('PUTVAL {}/exec-btrfs_{}/gauge-snapshot_exclusive {}:{:.1f}'.format(hostname, name, timestamp, size_snapshot_exclusive)) + + +# +# Volumes to scan +# +hostname = 'server' +interval = 10 +volumes = [ + ['mezzo-scratch', '/mnt/mezzo/scratch'], + ['mezzo-sync', '/mnt/mezzo/sync'], + ['helium-personal', '/mnt/yotta/helium/personal'], + ['helium-shared', '/mnt/yotta/helium/shared'], + ['neon', '/mnt/yotta/neon'], + ['krypton', '/mnt/yotta/krypton'], + ['xenon-borg', '/mnt/yotta/xenon/borg'], + ['xenon-rsnapshot', '/mnt/yotta/xenon/rsnapshot'] +] + + +# +# Command line arguments +# +parser = argparse.ArgumentParser(description='Get BTRFS disk usage') +parser.add_argument('-s', action='store_true', help='print in human readable format') +args = parser.parse_args() +human_readable = args.s + + +# +# Main +# +if human_readable: + for name, path in volumes: + get_disk_usage(name, path) + print_human_readable(name) +else: + # RRD mode + while True: + for name, path in volumes: + get_disk_usage(name, path) + print_rrd(name) + + time.sleep(interval) + rescan_quota(path) diff --git a/collectd/usr/local/bin/du-data b/collectd/usr/local/bin/du-data new file mode 100755 index 0000000..68df7ad --- /dev/null +++ b/collectd/usr/local/bin/du-data @@ -0,0 +1,77 @@ +#!/usr/bin/python + +# +# Imports +# +import os +import sys +import time +import commands +import argparse + + +# +# Methods +# +def get_disk_usage(path, human_readable): + '''disk usage in human readable format (e.g. '2,1GB')''' + arguments = '-sh' if human_readable else '-s' + path = os.path.realpath(path) + command = 'sudo du %s %s' % (arguments, path) + status, output = commands.getstatusoutput(command) + + if status is not 0: + raise Exception(command) + + disk_usage = output.split()[0] + if not human_readable: + # du reports in units of 1024 bytes, convert to plain number of bytes + disk_usage = int(disk_usage) * 1024 + return disk_usage + +# +# Directories to scan +# +hostname = 'server' +interval = 10 +directories = [ + #['bram', '/media/data/Personal/Bram'], + ['rik', '/media/data/Personal/Rik'], + ['books', '/media/data/Shared/Books'], + ['games', '/media/data/Shared/Games'], + ['misc', '/media/data/Shared/Miscellaneous'], + ['shows', '/media/data/Shared/Video/Shows'], + ['movies', '/media/data/Shared/Video/Movies'], + ['music', '/media/data/Shared/Music'], + ['photographs', '/media/data/Shared/Photographs'], + ['pictures', '/media/data/Shared/Pictures'], + ['raw', '/media/data/Shared/Raw'], + ['software', '/media/data/Shared/Software'] +] + +# +# Command line arguments +# +parser = argparse.ArgumentParser(description='Get BTRFS disk usage') +parser.add_argument('-s', action='store_true', help='print in human readable format') +args = parser.parse_args() +human_readable = args.s + + +# +# Main +# +if (human_readable): + for (name, path) in directories: + disk_usage = get_disk_usage(path, human_readable) + print('%s: %s' % (name, disk_usage)) +else: + # RRD mode + while True: + for (name, path) in directories: + disk_usage = get_disk_usage(path, human_readable) + timestamp = int(time.time()) + size = float(disk_usage) + print('PUTVAL {}/exec-du_{}/gauge-size {}:{:.1f}'.format(hostname, name, timestamp, size)) + sys.stdout.flush() + time.sleep(interval) diff --git a/collectd/usr/local/bin/funds-data b/collectd/usr/local/bin/funds-data new file mode 100755 index 0000000..8e5aeb8 --- /dev/null +++ b/collectd/usr/local/bin/funds-data @@ -0,0 +1,67 @@ +#!/usr/bin/env python +import os +import requests +import re +import time +import datetime +import sys +import random + +collection = 'funds' +interval = 3600 +funds_behr = { + 'robeco_one_defensief': 'rg.one.def', + 'robeco_one_neutraal' : 'rg.one.neut', + 'robeco_one_offensief': 'rg.one.offe', + # 'robeco_plus_gprec' : 'rg.bp.gprec', + # 'robeco_plus_glconsc' : 'rg.gl.consc', + # 'robeco_plus_uspreme' : 'rg.us.preme', + # 'robeco_plus_uslcfe' : 'rg.us.lc.fe' +} +funds_morningstar = { + 'robeco_one_defensief': 'F00000OZ3S', + 'robeco_one_neutraal' : 'F00000OZ3T', + 'robeco_one_offensief': 'F00000OZ3U', + # 'robeco_plus_gprec' : 'F00000QDBK', # BP Global Premium Equities C + # 'robeco_plus_glconsc' : 'F00000PNRA', # Global Conservative Equities C EUR + # 'robeco_plus_uspreme' : 'F00000OWHQ', # BP US Premium Equities F + # 'robeco_plus_uslcfe' : 'F00000QDBI' # BP US Large Cap Equities F EUR +} +# log = open('/host/var/log/funds.log', 'a') + +def behr_backlog(funds): + for fund,code in funds.items(): + dir = '/var/lib/collectd/rrd/{}/exec-fund-{}'.format(collection, fund) + if not os.path.isdir(dir): + url = 'http://www.behr.nl/Beurs/Slotkoersen/slotkoersen.php?fd={}'.format(code) + # url = 'http://haggis.no-ip.org/funds/{}.html'.format(code) + response = requests.get(url) + matches = re.findall('(\d+): ([\d\.]+)', response.text) + for match in matches: + for i in range(23): + timestamp = int(time.mktime(datetime.datetime.strptime(match[0], '%y%m%d').replace(hour = i + 1).timetuple())) + put_value(fund, match[1], timestamp) + sys.stdout.flush() + +def morningstar_ticker(funds): + for fund,code in funds.items(): + url = 'http://www.morningstar.nl/nl/funds/snapshot/snapshot.aspx?id={}'.format(code) + response = requests.get(url) + matches = re.findall('>EUR[\s]*[^\d]*([\d,]+)', response.text) + quote = matches[0].replace(',', '.') + # quote = 100 + 50 * (random.random() - 1) + put_value(fund, quote) + +def put_value(fund, value, timestamp = 'N'): + print('PUTVAL {}/exec-fund-{}/gauge-ticker interval={} {}:{}'.format(collection, fund, interval, timestamp, value)) + sys.stdout.flush() + if timestamp is 'N': + timestamp = int(time.time()) + # log.write('{},{},{}\n'.format(fund, timestamp, int(value))) + +behr_backlog(funds_behr) +while True: + seconds = os.times()[4] + morningstar_ticker(funds_morningstar) + elapsed = os.times()[4] - seconds + time.sleep(interval - elapsed) diff --git a/collectd/usr/local/bin/power-data b/collectd/usr/local/bin/power-data new file mode 100755 index 0000000..8ef16d1 --- /dev/null +++ b/collectd/usr/local/bin/power-data @@ -0,0 +1,69 @@ +#!/usr/bin/python + +# +# Imports +# +import sys +import time +import argparse +import pylikwid + +# +# Configuration +# +hostname = 'server' +cpuid = 0 +pinfo = pylikwid.getpowerinfo() +domainid = pinfo.get('domains').get('PKG').get('ID') +measurement_duration = 10 +measurement_interval = 60 +dinfo = pinfo.get('domains') +domain_names = dinfo.keys() +domain_ids = [domain['ID'] for domain in dinfo.values()] + +# +# Command line arguments +# +parser = argparse.ArgumentParser(description='Get CPU power consumption') +parser.add_argument('-s', action='store_true', help='print in human readable format') +args = parser.parse_args() +human_readable = args.s + +# +# Methods +# +def get_power(): + #print dict(zip(domain_names, domain_ids)) + start = list() + end = list() + power = list() + for domain_id in domain_ids: + e_start = pylikwid.startpower(cpuid, domain_id) + start.append(e_start) + time.sleep(measurement_duration) + for domain_id in domain_ids: + e_stop = pylikwid.stoppower(cpuid, domain_id) + end.append(e_stop) + for events in zip(start, end, domain_ids): + power.append(pylikwid.getpower(events[0], events[1], events[2])) + + return dict(zip(domain_names, power)) + +def print_rrd(measurements): + timestamp = int(time.time()) + for measurement in measurements.items(): + name = measurement[0].lower() + power = measurement[1] + print('PUTVAL {}/exec-power/gauge-{} {}:{:.1f}'.format(hostname, name, timestamp, power)) + +# +# Main +# +if (human_readable): + print get_power() +else: + while True: + power = get_power() + print_rrd(power) + sys.stdout.flush() + time.sleep(measurement_interval) diff --git a/collectd/usr/local/bin/speedtest-data b/collectd/usr/local/bin/speedtest-data new file mode 100755 index 0000000..74a3453 --- /dev/null +++ b/collectd/usr/local/bin/speedtest-data @@ -0,0 +1,12 @@ +#!/bin/bash +SPEEDTEST=/sbin/speedtest-cli +COLLECTION=server +INTERVAL=1800 + +while :; do + SECONDS=0 + RESULT=($($SPEEDTEST | grep Mbit | cut -d' ' -f 2)) + echo "PUTVAL $COLLECTION/exec-speedtest/gauge-download interval=$INTERVAL N:${RESULT[0]}" + echo "PUTVAL $COLLECTION/exec-speedtest/gauge-upload interval=$INTERVAL N:${RESULT[1]}" + sleep $((INTERVAL-$SECONDS)) +done diff --git a/collectd/usr/local/bin/temperature-data b/collectd/usr/local/bin/temperature-data new file mode 100755 index 0000000..3a7f622 --- /dev/null +++ b/collectd/usr/local/bin/temperature-data @@ -0,0 +1,43 @@ +#!/usr/bin/python -u + +# +# Imports +# +import sys +import time +import commands +import argparse + + +# +# Methods +# +def get_temperature(disks): + command = "sudo smartctl -a /dev/%s | grep Temperature_Celsius | awk '{print $10}'" % disk + status, output = commands.getstatusoutput(command) + + try: + return int(output) + except Exception as e: + return None + + +# +# Settings +# +hostname = 'server' +interval = 10 +disks = ['sdd', 'sde', 'sdf'] + + +# +# Main +# +while True: + timestamp = int(time.time()) + for disk in disks: + temperature = get_temperature(disk) + if temperature: + print('PUTVAL {}/exec-temperature/gauge-{}_total {}:{}'.format(hostname, disk, timestamp, temperature)) + time.sleep(interval) + diff --git a/dagster/Makefile b/dagster/Makefile new file mode 100644 index 0000000..ff04132 --- /dev/null +++ b/dagster/Makefile @@ -0,0 +1,20 @@ +requirements.txt: pyproject.toml + uv pip compile $(UPGRADE) --output-file=requirements.txt pyproject.toml >/dev/null + +dagster-requirements.txt: requirements.txt pyproject.toml + uv pip compile $(UPGRADE) --constraint=requirements.txt --output-file=dagster-requirements.txt --extra=dagster pyproject.toml >/dev/null + +sync: virtualenv + uv pip sync requirements.txt + +upgrade-deps: virtualenv + touch pyproject.toml + $(MAKE) UPGRADE="--upgrade" dev-requirements.txt + +install-tools: virtualenv + pip install $(UPGRADE) pip wheel pip-tools uv + +upgrade-tools: virtualenv + $(MAKE) UPGRADE="--upgrade" install-tools + +upgrade: upgrade-tools upgrade-pre-commit upgrade-deps sync diff --git a/dagster/dagster-requirements.txt b/dagster/dagster-requirements.txt new file mode 100755 index 0000000..41e78bf --- /dev/null +++ b/dagster/dagster-requirements.txt @@ -0,0 +1,398 @@ +# This file was autogenerated by uv via the following command: +# uv pip compile --constraint=requirements.txt --output-file=dagster-requirements.txt --extra=dagster pyproject.toml +aiobotocore==2.15.1 + # via s3fs +aiohappyeyeballs==2.4.3 + # via aiohttp +aiohttp==3.10.8 + # via + # aiobotocore + # s3fs +aioitertools==0.12.0 + # via aiobotocore +aiosignal==1.3.1 + # via aiohttp +alembic==1.13.3 + # via dagster +aniso8601==9.0.1 + # via graphene +annotated-types==0.7.0 + # via pydantic +anyio==4.6.0 + # via + # gql + # starlette + # watchfiles +appdirs==1.4.4 + # via pint +asttokens==2.4.1 + # via icecream +attrs==24.2.0 + # via aiohttp +backoff==2.2.1 + # via gql +beautifulsoup4==4.12.3 +boto3==1.35.23 + # via + # aiobotocore + # dagster-aws +botocore==1.35.23 + # via + # aiobotocore + # boto3 + # s3transfer +cachetools==5.5.0 + # via google-auth +certifi==2024.8.30 + # via + # influxdb-client + # kubernetes + # pyogrio + # pyproj + # requests +charset-normalizer==3.3.2 + # via requests +click==8.1.7 + # via + # dagster + # dagster-webserver + # uvicorn +colorama==0.4.6 + # via icecream +coloredlogs==14.0 + # via dagster +contourpy==1.3.0 + # via matplotlib +cramjam==2.8.4 + # via fastparquet +croniter==3.0.3 + # via dagster +cycler==0.12.1 + # via matplotlib +dagit==1.8.9 +dagster==1.8.9 + # via + # dagster-aws + # dagster-docker + # dagster-duckdb + # dagster-duckdb-pandas + # dagster-graphql + # dagster-polars + # dagster-postgres + # dagster-webserver +dagster-aws==0.24.9 +dagster-docker==0.24.9 +dagster-duckdb==0.24.9 + # via dagster-duckdb-pandas +dagster-duckdb-pandas==0.24.9 +dagster-graphql==1.8.9 + # via dagster-webserver +dagster-pipes==1.8.9 + # via dagster +dagster-polars==0.24.9 +dagster-postgres==0.24.9 +dagster-webserver==1.8.9 + # via dagit +dnspython==2.6.1 + # via email-validator +docker==7.1.0 + # via dagster-docker +docker-image-py==0.1.13 + # via dagster-docker +docstring-parser==0.16 + # via dagster +duckdb==1.1.1 + # via dagster-duckdb +durationpy==0.8 + # via kubernetes +email-validator==2.2.0 + # via pydantic +et-xmlfile==1.1.0 + # via openpyxl +executing==2.1.0 + # via icecream +fastapi==0.115.0 +fastparquet==2024.5.0 +filelock==3.16.1 + # via dagster +flexcache==0.3 + # via pint +flexparser==0.3.1 + # via pint +fonttools==4.54.1 + # via matplotlib +frozenlist==1.4.1 + # via + # aiohttp + # aiosignal +fsspec==2024.9.0 + # via + # fastparquet + # s3fs + # universal-pathlib +geopandas==1.0.1 +gitdb==4.0.11 + # via gitpython +gitpython==3.1.43 +google-auth==2.35.0 + # via kubernetes +gql==3.5.0 + # via dagster-graphql +graphene==3.3 + # via dagster-graphql +graphql-core==3.2.4 + # via + # gql + # graphene + # graphql-relay +graphql-relay==3.2.0 + # via graphene +grpcio==1.66.2 + # via + # dagster + # grpcio-health-checking +grpcio-health-checking==1.62.3 + # via dagster +h11==0.14.0 + # via uvicorn +httptools==0.6.1 + # via uvicorn +humanfriendly==10.0 + # via coloredlogs +icecream==2.1.3 +idna==3.10 + # via + # anyio + # email-validator + # requests + # yarl +influxdb-client==1.46.0 +jinja2==3.1.4 + # via dagster +jmespath==1.0.1 + # via + # boto3 + # botocore +kiwisolver==1.4.7 + # via matplotlib +kubernetes==31.0.0 +lxml==5.3.0 +mako==1.3.5 + # via alembic +markdown-it-py==3.0.0 + # via rich +markupsafe==2.1.5 + # via + # jinja2 + # mako +matplotlib==3.9.2 + # via seaborn +mdurl==0.1.2 + # via markdown-it-py +multidict==6.1.0 + # via + # aiohttp + # yarl +networkx==3.3 +numpy==2.1.1 + # via + # contourpy + # fastparquet + # geopandas + # matplotlib + # pandas + # pyarrow + # pyogrio + # seaborn + # shapely +oauthlib==3.2.2 + # via + # kubernetes + # requests-oauthlib +openpyxl==3.1.5 +packaging==24.1 + # via + # dagster + # dagster-aws + # fastparquet + # geopandas + # matplotlib + # pyogrio +pandas==2.2.3 + # via + # dagster-duckdb-pandas + # fastparquet + # geopandas + # pint-pandas + # seaborn +pillow==10.4.0 + # via matplotlib +pint==0.24.3 + # via pint-pandas +pint-pandas==0.6.2 +polars==1.9.0 + # via dagster-polars +protobuf==4.25.5 + # via + # dagster + # grpcio-health-checking +psycopg2-binary==2.9.9 + # via dagster-postgres +pyarrow==17.0.0 + # via dagster-polars +pyasn1==0.6.1 + # via + # pyasn1-modules + # rsa +pyasn1-modules==0.4.1 + # via google-auth +pydantic==2.9.2 + # via + # dagster + # fastapi + # pydantic-settings +pydantic-core==2.23.4 + # via pydantic +pydantic-settings==2.5.2 +pygments==2.18.0 + # via + # icecream + # rich +pyogrio==0.10.0 + # via geopandas +pyparsing==3.1.4 + # via matplotlib +pyproj==3.7.0 + # via geopandas +pysocks==1.7.1 + # via requests +python-dateutil==2.9.0.post0 + # via + # botocore + # croniter + # influxdb-client + # kubernetes + # matplotlib + # pandas +python-dotenv==1.0.1 + # via + # dagster + # pydantic-settings + # uvicorn +pytz==2024.2 + # via + # croniter + # dagster + # pandas +pyyaml==6.0.2 + # via + # dagster + # kubernetes + # uvicorn +reactivex==4.0.4 + # via influxdb-client +regex==2024.9.11 + # via docker-image-py +requests==2.32.3 + # via + # dagster + # dagster-aws + # dagster-graphql + # docker + # gql + # kubernetes + # requests-oauthlib + # requests-toolbelt +requests-oauthlib==2.0.0 + # via kubernetes +requests-toolbelt==1.0.0 + # via gql +rich==13.8.1 + # via dagster +rsa==4.9 + # via google-auth +s3fs==2024.9.0 +s3transfer==0.10.2 + # via boto3 +seaborn==0.13.2 +setuptools==75.1.0 + # via + # dagster + # influxdb-client +shapely==2.0.6 + # via geopandas +six==1.16.0 + # via + # asttokens + # kubernetes + # python-dateutil +smmap==5.0.1 + # via gitdb +sniffio==1.3.1 + # via anyio +soupsieve==2.6 + # via beautifulsoup4 +sqlalchemy==2.0.35 + # via + # alembic + # dagster +starlette==0.38.6 + # via + # dagster-graphql + # dagster-webserver + # fastapi +structlog==24.4.0 + # via dagster +tabulate==0.9.0 + # via dagster +tomli==2.0.1 + # via dagster +toposort==1.10 + # via dagster +tqdm==4.66.5 + # via dagster +typing-extensions==4.12.2 + # via + # alembic + # dagster + # dagster-polars + # fastapi + # flexcache + # flexparser + # pint + # pydantic + # pydantic-core + # reactivex + # sqlalchemy +tzdata==2024.2 + # via pandas +universal-pathlib==0.2.5 + # via + # dagster + # dagster-polars +urllib3==2.2.3 + # via + # botocore + # docker + # influxdb-client + # kubernetes + # requests +uvicorn==0.31.0 + # via dagster-webserver +uvloop==0.20.0 + # via uvicorn +watchdog==5.0.3 + # via dagster +watchfiles==0.24.0 + # via uvicorn +websocket-client==1.8.0 + # via kubernetes +websockets==13.1 + # via uvicorn +wrapt==1.16.0 + # via aiobotocore +xlsxwriter==3.2.0 +yarl==1.13.1 + # via + # aiohttp + # gql diff --git a/dagster/requirements.txt b/dagster/requirements.txt new file mode 100755 index 0000000..85e1c5c --- /dev/null +++ b/dagster/requirements.txt @@ -0,0 +1,238 @@ +# This file was autogenerated by uv via the following command: +# uv pip compile --output-file=requirements.txt pyproject.toml +aiobotocore==2.15.1 + # via s3fs +aiohappyeyeballs==2.4.3 + # via aiohttp +aiohttp==3.10.8 + # via + # aiobotocore + # s3fs +aioitertools==0.12.0 + # via aiobotocore +aiosignal==1.3.1 + # via aiohttp +annotated-types==0.7.0 + # via pydantic +anyio==4.6.0 + # via starlette +appdirs==1.4.4 + # via pint +asttokens==2.4.1 + # via icecream +attrs==24.2.0 + # via aiohttp +beautifulsoup4==4.12.3 +boto3==1.35.23 + # via aiobotocore +botocore==1.35.23 + # via + # aiobotocore + # boto3 + # s3transfer +cachetools==5.5.0 + # via google-auth +certifi==2024.8.30 + # via + # influxdb-client + # kubernetes + # pyogrio + # pyproj + # requests +charset-normalizer==3.3.2 + # via requests +click==8.1.7 + # via uvicorn +colorama==0.4.6 + # via icecream +contourpy==1.3.0 + # via matplotlib +cramjam==2.8.4 + # via fastparquet +cycler==0.12.1 + # via matplotlib +dnspython==2.6.1 + # via email-validator +duckdb==1.1.1 +durationpy==0.8 + # via kubernetes +email-validator==2.2.0 + # via pydantic +et-xmlfile==1.1.0 + # via openpyxl +executing==2.1.0 + # via icecream +fastapi==0.115.0 +fastparquet==2024.5.0 +flexcache==0.3 + # via pint +flexparser==0.3.1 + # via pint +fonttools==4.54.1 + # via matplotlib +frozenlist==1.4.1 + # via + # aiohttp + # aiosignal +fsspec==2024.9.0 + # via + # fastparquet + # s3fs +geopandas==1.0.1 +gitdb==4.0.11 + # via gitpython +gitpython==3.1.43 +google-auth==2.35.0 + # via kubernetes +h11==0.14.0 + # via uvicorn +icecream==2.1.3 +idna==3.10 + # via + # anyio + # email-validator + # requests + # yarl +influxdb-client==1.46.0 +jmespath==1.0.1 + # via + # boto3 + # botocore +kiwisolver==1.4.7 + # via matplotlib +kubernetes==31.0.0 +lxml==5.3.0 +matplotlib==3.9.2 + # via seaborn +multidict==6.1.0 + # via + # aiohttp + # yarl +networkx==3.3 +numpy==2.1.1 + # via + # contourpy + # fastparquet + # geopandas + # matplotlib + # pandas + # pyarrow + # pyogrio + # seaborn + # shapely +oauthlib==3.2.2 + # via + # kubernetes + # requests-oauthlib +openpyxl==3.1.5 +packaging==24.1 + # via + # fastparquet + # geopandas + # matplotlib + # pyogrio +pandas==2.2.3 + # via + # fastparquet + # geopandas + # pint-pandas + # seaborn +pillow==10.4.0 + # via matplotlib +pint==0.24.3 + # via pint-pandas +pint-pandas==0.6.2 +pyarrow==17.0.0 +pyasn1==0.6.1 + # via + # pyasn1-modules + # rsa +pyasn1-modules==0.4.1 + # via google-auth +pydantic==2.9.2 + # via + # fastapi + # pydantic-settings +pydantic-core==2.23.4 + # via pydantic +pydantic-settings==2.5.2 +pygments==2.18.0 + # via icecream +pyogrio==0.10.0 + # via geopandas +pyparsing==3.1.4 + # via matplotlib +pyproj==3.7.0 + # via geopandas +pysocks==1.7.1 + # via requests +python-dateutil==2.9.0.post0 + # via + # botocore + # influxdb-client + # kubernetes + # matplotlib + # pandas +python-dotenv==1.0.1 + # via pydantic-settings +pytz==2024.2 + # via pandas +pyyaml==6.0.2 + # via kubernetes +reactivex==4.0.4 + # via influxdb-client +requests==2.32.3 + # via + # kubernetes + # requests-oauthlib +requests-oauthlib==2.0.0 + # via kubernetes +rsa==4.9 + # via google-auth +s3fs==2024.9.0 +s3transfer==0.10.2 + # via boto3 +seaborn==0.13.2 +setuptools==75.1.0 + # via influxdb-client +shapely==2.0.6 + # via geopandas +six==1.16.0 + # via + # asttokens + # kubernetes + # python-dateutil +smmap==5.0.1 + # via gitdb +sniffio==1.3.1 + # via anyio +soupsieve==2.6 + # via beautifulsoup4 +starlette==0.38.6 + # via fastapi +structlog==24.4.0 +typing-extensions==4.12.2 + # via + # fastapi + # flexcache + # flexparser + # pint + # pydantic + # pydantic-core + # reactivex +tzdata==2024.2 + # via pandas +urllib3==2.2.3 + # via + # botocore + # influxdb-client + # kubernetes + # requests +uvicorn==0.31.0 +websocket-client==1.8.0 + # via kubernetes +wrapt==1.16.0 + # via aiobotocore +xlsxwriter==3.2.0 +yarl==1.13.1 + # via aiohttp diff --git a/dagster/src/.telemetry/id.yaml b/dagster/src/.telemetry/id.yaml new file mode 100644 index 0000000..9c4cb04 --- /dev/null +++ b/dagster/src/.telemetry/id.yaml @@ -0,0 +1 @@ +instance_id: 9a2d409d-a36a-492d-8f23-2f20c1f49bf4 diff --git a/dagster/src/app/__init__.py b/dagster/src/app/__init__.py new file mode 100644 index 0000000..ef37bac --- /dev/null +++ b/dagster/src/app/__init__.py @@ -0,0 +1,3 @@ +from icecream import install + +install() diff --git a/dagster/src/app/vinyl/__init__.py b/dagster/src/app/vinyl/__init__.py new file mode 100755 index 0000000..e69de29 diff --git a/dagster/src/app/vinyl/assets.py b/dagster/src/app/vinyl/assets.py new file mode 100755 index 0000000..b8051f7 --- /dev/null +++ b/dagster/src/app/vinyl/assets.py @@ -0,0 +1,181 @@ +from datetime import datetime +from glob import glob + +import duckdb +import polars as pl +import structlog +from duckdb.typing import DATE, VARCHAR + +from app.vinyl.plato.fetch import scrape_plato +from app.vinyl.sounds.fetch import fetch_deals +from app.vinyl.utils import parse_date +from dagster import ( + DailyPartitionsDefinition, + DimensionPartitionMapping, + Failure, + Field, + IdentityPartitionMapping, + MultiPartitionMapping, + MultiPartitionsDefinition, + OpExecutionContext, + StaticPartitionsDefinition, + TimeWindowPartitionMapping, + asset, +) + +SOURCES = ["plato", "sounds"] + +logger = structlog.get_logger() + +partitions_def = MultiPartitionsDefinition( + { + "date": DailyPartitionsDefinition(start_date="2024-09-01", end_offset=1), + "source": StaticPartitionsDefinition(SOURCES), + } +) + +partition_mapping = MultiPartitionMapping( + { + "date": DimensionPartitionMapping( + dimension_name="date", + partition_mapping=TimeWindowPartitionMapping(start_offset=-1, end_offset=0), + ), + "source": DimensionPartitionMapping( + dimension_name="source", + partition_mapping=IdentityPartitionMapping(), + ), + } +) + + +@asset( + io_manager_key="polars_parquet_io_manager", + partitions_def=partitions_def, + metadata={ + "partition_by": ["date", "source"], + }, + config_schema={ + "import_dir": Field(str, default_value="/opt/dagster/home/storage/import") + }, +) +def deals(context): + ic() + ic(context.partition_key) + ic(context.op_config) + import_dir = context.op_config["import_dir"] + partition_key = context.partition_key.keys_by_dimension + date_str = partition_key["date"] + source = partition_key["source"] + logger.info("Materializing deals", date=date_str, source=source) + + date = datetime.strptime(partition_key["date"], "%Y-%m-%d") + days = (date - datetime.today()).days + ic(days) + if days > 0: + raise Failure(f"Cannot materialize for the future: {date.date()}") + if days < -1: + if source == "sounds": + pattern = f"{import_dir}/{date.date()}_*_sounds.csv" + logger.info("Looking for existing CSV files", pattern=pattern) + files = glob(pattern) + if len(files): + file = sorted(files)[-1] + logger.info("Using existing CSV file", file=file) + try: + df = pl.read_csv(file) + logger.info("Loaded CSV file", rows=len(df)) + return df.with_columns( + **{k: pl.lit(v) for k, v in partition_key.items()} + ) + except Exception as e: + logger.error("Failed to load CSV file!", error=e) + raise Failure(f"Cannot materialize for the past: {date.date()}") + + if source == "plato": + logger.info("Scraping Plato") + df = scrape_plato() + logger.info("Scraped Plato", rows=len(df), head=df.head().to_markdown()) + ic(df.columns) + return pl.from_pandas(df.assign(**partition_key)) + if source == "sounds": + logger.info("Scraping Sounds") + df = fetch_deals() + ic(df.columns) + logger.info("Scraped Sounds", rows=len(df), head=df.head().to_markdown()) + return pl.from_pandas(df.assign(**partition_key)) + + return pl.DataFrame( + [{"date": context.partition_key, "data": f"Data for {context.partition_key}"}] + ) + + +@asset(deps=[deals], io_manager_key="polars_parquet_io_manager") +def new_deals(context: OpExecutionContext) -> pl.DataFrame: + ic() + storage_dir = context.instance.storage_directory() + asset_key = "deals" + + with duckdb.connect() as con: + con.create_function("PARSE_DATE", parse_date, [VARCHAR], DATE) + return con.execute( + f""" + WITH tmp_plato AS ( + SELECT + source, + CAST(date AS DATE) AS date, + ean AS id, + _artist AS artist, + LOWER(title) AS title, + CAST(_date AS DATE) AS release, + CAST(_price AS FLOAT) AS price, + CONCAT('https://www.platomania.nl', url) AS url, + FROM read_parquet('{storage_dir}/{asset_key}/*/plato.parquet', union_by_name = true) + ), tmp_sounds AS ( + SELECT + source, + date, + id, + LOWER(TRIM(COALESCE(artist, SPLIT(name, '-')[1]))) AS artist, + LOWER(TRIM(COALESCE(title, ARRAY_TO_STRING(split(name, '-')[2:], '-')))) AS title, + PARSE_DATE(release) AS release, + CAST(price AS FLOAT) AS price, + CONCAT('https://www.sounds.nl/detail/', id) AS url + FROM read_parquet('{storage_dir}/{asset_key}/*/sounds.parquet', union_by_name = true) + ), tmp_both AS ( + SELECT * FROM tmp_plato UNION ALL SELECT * FROM tmp_sounds + ), tmp_rn AS ( + SELECT + *, + ROW_NUMBER() OVER(PARTITION BY source, id, artist, title, price ORDER BY date DESC) as rn + FROM tmp_both + ) + SELECT + source, + date, + id, + artist, + title, + release, + price, + url + FROM tmp_rn + WHERE rn = 1 + ORDER BY date ASC + """ + ).pl() + + +@asset( + io_manager_key="polars_parquet_io_manager", +) +def works(new_deals: pl.DataFrame) -> pl.DataFrame: + # Pandas + # columns = ["artist", "title"] + # return pl.from_pandas(new_deals[columns].to_pandas().drop_duplicates()) + + # Polars + # return new_deals[columns].unique(subset=columns) + + # DuckDB + with duckdb.connect() as con: + return con.execute("SELECT DISTINCT artist, title, release FROM new_deals").pl() diff --git a/dagster/src/app/vinyl/jobs.py b/dagster/src/app/vinyl/jobs.py new file mode 100755 index 0000000..b1d6bcd --- /dev/null +++ b/dagster/src/app/vinyl/jobs.py @@ -0,0 +1,58 @@ +from dagster import ( + AssetKey, + AssetMaterialization, + OpExecutionContext, + define_asset_job, + job, + op, +) + +from .assets import deals, new_deals, works + +deals_job = define_asset_job( + "deals_job", selection=[deals], partitions_def=deals.partitions_def +) + + +@op +def check_partititions(context: OpExecutionContext): + # Replace with your asset/job name + asset_key = "deals" + + context.log_event( + AssetMaterialization(asset_key=asset_key, partition="2024-09-30|sounds") + ) + + # Fetch the materializations for the asset key + materializations = context.instance.get_materialized_partitions( + asset_key=AssetKey(asset_key) + ) + context.log.info("Existing partitions", extra=dict(partitions=materializations)) + + import polars as pl + + storage_dir = context.instance.storage_directory() + ic(storage_dir) + for row in ( + pl.scan_parquet(f"{storage_dir}/{asset_key}/*/*.parquet") + .select(["date", "source"]) + .unique() + .collect() + .iter_rows() + ): + partition = "|".join(row) + if partition not in materializations: + context.log.info(f"Missing partition: {partition}") + context.log_event( + AssetMaterialization(asset_key=asset_key, partition=partition) + ) + + +@job +def check_partititions_job(): + check_partititions() + + +musicbrainz_lookup_job = define_asset_job( + "musicbrainz_lookup_job", selection=[works, new_deals] +) diff --git a/dagster/src/app/vinyl/plato/__init__.py b/dagster/src/app/vinyl/plato/__init__.py new file mode 100755 index 0000000..e69de29 diff --git a/dagster/src/app/vinyl/plato/deals.py b/dagster/src/app/vinyl/plato/deals.py new file mode 100755 index 0000000..1f5c348 --- /dev/null +++ b/dagster/src/app/vinyl/plato/deals.py @@ -0,0 +1,154 @@ +import os + +import boto3 +import pandas as pd +from botocore.exceptions import NoCredentialsError, PartialCredentialsError +from dotenv import load_dotenv +from fetch import scrape_plato +from utils import get + + +def update_database(articles_df=None, database_file="/home/user/plato.parquet"): + if os.path.exists(database_file): + database_df = pd.read_parquet(database_file) + else: + database_df = None + + if articles_df is None: + new_df = None if database_df is None else database_df.head(0) + else: + if database_df is None: + articles_df.to_parquet(database_file) + return articles_df, articles_df + + compare = ["ean", "_price"] + check_df = pd.merge( + database_df[compare], articles_df[compare], how="right", indicator=True + ) + new_df = ( + check_df[check_df["_merge"] == "right_only"] + .drop(columns="_merge") + .merge(articles_df) + ) + database_df = ( + pd.concat([database_df, new_df]) + .sort_values("_date") + .groupby("ean") + .last() + .reset_index() + ) + database_df.to_parquet(database_file) + + return database_df, new_df + + +def send_email(lines): + # Define the email parameters + SENDER = "mail@veenboer.xyz" + RECIPIENT = "rik.veenboer@gmail.com" + SUBJECT = "Aanbieding op plato!" + + # The email body for recipients with non-HTML email clients + BODY_TEXT = "" + + # The HTML body of the email + tmp = "\n".join(lines) + BODY_HTML = f""" +
+ + {tmp} + + """ + + # The character encoding for the email + CHARSET = "UTF-8" + + # Try to send the email + try: + client = boto3.client( + "ses", region_name="eu-west-1" + ) # Change the region as needed + + # Provide the contents of the email + response = client.send_email( + Destination={ + "ToAddresses": [ + RECIPIENT, + ], + }, + Message={ + "Body": { + "Html": { + "Charset": CHARSET, + "Data": BODY_HTML, + }, + "Text": { + "Charset": CHARSET, + "Data": BODY_TEXT, + }, + }, + "Subject": { + "Charset": CHARSET, + "Data": SUBJECT, + }, + }, + Source=SENDER, + ) + # Display an error if something goes wrong. + except NoCredentialsError: + print("Credentials not available") + except PartialCredentialsError: + print("Incomplete credentials provided") + except Exception as e: + print(f"Error: {e}") + else: + print("Email sent! Message ID:"), + print(response["MessageId"]) + + +def main(dry=False): + load_dotenv("/opt/.env") + + local_ip = get("http://ifconfig.me", False).text + get_ip = get("http://ifconfig.me").text + print(f"Local IP = {local_ip}") + print(f"Request IP = {get_ip}") + assert local_ip != get_ip + + artists = open("/home/user/artists.txt").read().strip().splitlines() + print(f"Number of known artists = {len(artists)}") + + if dry: + articles_df = None + else: + articles_df = scrape_plato(get=get) + database_df, new_df = update_database(articles_df) + + if dry: + new_df = database_df.sample(20) + + print(f"Database size = {len(database_df)}") + print(f"New = {len(new_df)}") + + # new_df = new_df[new_df['_artist'].isin(artists)].query('_price <= 25') + new_df = new_df.query('_price <= 25 and ean != ""') + print(f"Interesting = {len(new_df)}") + + if new_df is not None and len(new_df): + message = [] + for _, row in new_df.head(10).iterrows(): + message.append( + f'