add untracked files

This commit is contained in:
2024-11-27 10:16:41 +01:00
parent 007bd1614a
commit 396053a94f
45 changed files with 3128 additions and 0 deletions

20
dagster/Makefile Normal file
View File

@@ -0,0 +1,20 @@
requirements.txt: pyproject.toml
uv pip compile $(UPGRADE) --output-file=requirements.txt pyproject.toml >/dev/null
dagster-requirements.txt: requirements.txt pyproject.toml
uv pip compile $(UPGRADE) --constraint=requirements.txt --output-file=dagster-requirements.txt --extra=dagster pyproject.toml >/dev/null
sync: virtualenv
uv pip sync requirements.txt
upgrade-deps: virtualenv
touch pyproject.toml
$(MAKE) UPGRADE="--upgrade" dev-requirements.txt
install-tools: virtualenv
pip install $(UPGRADE) pip wheel pip-tools uv
upgrade-tools: virtualenv
$(MAKE) UPGRADE="--upgrade" install-tools
upgrade: upgrade-tools upgrade-pre-commit upgrade-deps sync

398
dagster/dagster-requirements.txt Executable file
View File

@@ -0,0 +1,398 @@
# This file was autogenerated by uv via the following command:
# uv pip compile --constraint=requirements.txt --output-file=dagster-requirements.txt --extra=dagster pyproject.toml
aiobotocore==2.15.1
# via s3fs
aiohappyeyeballs==2.4.3
# via aiohttp
aiohttp==3.10.8
# via
# aiobotocore
# s3fs
aioitertools==0.12.0
# via aiobotocore
aiosignal==1.3.1
# via aiohttp
alembic==1.13.3
# via dagster
aniso8601==9.0.1
# via graphene
annotated-types==0.7.0
# via pydantic
anyio==4.6.0
# via
# gql
# starlette
# watchfiles
appdirs==1.4.4
# via pint
asttokens==2.4.1
# via icecream
attrs==24.2.0
# via aiohttp
backoff==2.2.1
# via gql
beautifulsoup4==4.12.3
boto3==1.35.23
# via
# aiobotocore
# dagster-aws
botocore==1.35.23
# via
# aiobotocore
# boto3
# s3transfer
cachetools==5.5.0
# via google-auth
certifi==2024.8.30
# via
# influxdb-client
# kubernetes
# pyogrio
# pyproj
# requests
charset-normalizer==3.3.2
# via requests
click==8.1.7
# via
# dagster
# dagster-webserver
# uvicorn
colorama==0.4.6
# via icecream
coloredlogs==14.0
# via dagster
contourpy==1.3.0
# via matplotlib
cramjam==2.8.4
# via fastparquet
croniter==3.0.3
# via dagster
cycler==0.12.1
# via matplotlib
dagit==1.8.9
dagster==1.8.9
# via
# dagster-aws
# dagster-docker
# dagster-duckdb
# dagster-duckdb-pandas
# dagster-graphql
# dagster-polars
# dagster-postgres
# dagster-webserver
dagster-aws==0.24.9
dagster-docker==0.24.9
dagster-duckdb==0.24.9
# via dagster-duckdb-pandas
dagster-duckdb-pandas==0.24.9
dagster-graphql==1.8.9
# via dagster-webserver
dagster-pipes==1.8.9
# via dagster
dagster-polars==0.24.9
dagster-postgres==0.24.9
dagster-webserver==1.8.9
# via dagit
dnspython==2.6.1
# via email-validator
docker==7.1.0
# via dagster-docker
docker-image-py==0.1.13
# via dagster-docker
docstring-parser==0.16
# via dagster
duckdb==1.1.1
# via dagster-duckdb
durationpy==0.8
# via kubernetes
email-validator==2.2.0
# via pydantic
et-xmlfile==1.1.0
# via openpyxl
executing==2.1.0
# via icecream
fastapi==0.115.0
fastparquet==2024.5.0
filelock==3.16.1
# via dagster
flexcache==0.3
# via pint
flexparser==0.3.1
# via pint
fonttools==4.54.1
# via matplotlib
frozenlist==1.4.1
# via
# aiohttp
# aiosignal
fsspec==2024.9.0
# via
# fastparquet
# s3fs
# universal-pathlib
geopandas==1.0.1
gitdb==4.0.11
# via gitpython
gitpython==3.1.43
google-auth==2.35.0
# via kubernetes
gql==3.5.0
# via dagster-graphql
graphene==3.3
# via dagster-graphql
graphql-core==3.2.4
# via
# gql
# graphene
# graphql-relay
graphql-relay==3.2.0
# via graphene
grpcio==1.66.2
# via
# dagster
# grpcio-health-checking
grpcio-health-checking==1.62.3
# via dagster
h11==0.14.0
# via uvicorn
httptools==0.6.1
# via uvicorn
humanfriendly==10.0
# via coloredlogs
icecream==2.1.3
idna==3.10
# via
# anyio
# email-validator
# requests
# yarl
influxdb-client==1.46.0
jinja2==3.1.4
# via dagster
jmespath==1.0.1
# via
# boto3
# botocore
kiwisolver==1.4.7
# via matplotlib
kubernetes==31.0.0
lxml==5.3.0
mako==1.3.5
# via alembic
markdown-it-py==3.0.0
# via rich
markupsafe==2.1.5
# via
# jinja2
# mako
matplotlib==3.9.2
# via seaborn
mdurl==0.1.2
# via markdown-it-py
multidict==6.1.0
# via
# aiohttp
# yarl
networkx==3.3
numpy==2.1.1
# via
# contourpy
# fastparquet
# geopandas
# matplotlib
# pandas
# pyarrow
# pyogrio
# seaborn
# shapely
oauthlib==3.2.2
# via
# kubernetes
# requests-oauthlib
openpyxl==3.1.5
packaging==24.1
# via
# dagster
# dagster-aws
# fastparquet
# geopandas
# matplotlib
# pyogrio
pandas==2.2.3
# via
# dagster-duckdb-pandas
# fastparquet
# geopandas
# pint-pandas
# seaborn
pillow==10.4.0
# via matplotlib
pint==0.24.3
# via pint-pandas
pint-pandas==0.6.2
polars==1.9.0
# via dagster-polars
protobuf==4.25.5
# via
# dagster
# grpcio-health-checking
psycopg2-binary==2.9.9
# via dagster-postgres
pyarrow==17.0.0
# via dagster-polars
pyasn1==0.6.1
# via
# pyasn1-modules
# rsa
pyasn1-modules==0.4.1
# via google-auth
pydantic==2.9.2
# via
# dagster
# fastapi
# pydantic-settings
pydantic-core==2.23.4
# via pydantic
pydantic-settings==2.5.2
pygments==2.18.0
# via
# icecream
# rich
pyogrio==0.10.0
# via geopandas
pyparsing==3.1.4
# via matplotlib
pyproj==3.7.0
# via geopandas
pysocks==1.7.1
# via requests
python-dateutil==2.9.0.post0
# via
# botocore
# croniter
# influxdb-client
# kubernetes
# matplotlib
# pandas
python-dotenv==1.0.1
# via
# dagster
# pydantic-settings
# uvicorn
pytz==2024.2
# via
# croniter
# dagster
# pandas
pyyaml==6.0.2
# via
# dagster
# kubernetes
# uvicorn
reactivex==4.0.4
# via influxdb-client
regex==2024.9.11
# via docker-image-py
requests==2.32.3
# via
# dagster
# dagster-aws
# dagster-graphql
# docker
# gql
# kubernetes
# requests-oauthlib
# requests-toolbelt
requests-oauthlib==2.0.0
# via kubernetes
requests-toolbelt==1.0.0
# via gql
rich==13.8.1
# via dagster
rsa==4.9
# via google-auth
s3fs==2024.9.0
s3transfer==0.10.2
# via boto3
seaborn==0.13.2
setuptools==75.1.0
# via
# dagster
# influxdb-client
shapely==2.0.6
# via geopandas
six==1.16.0
# via
# asttokens
# kubernetes
# python-dateutil
smmap==5.0.1
# via gitdb
sniffio==1.3.1
# via anyio
soupsieve==2.6
# via beautifulsoup4
sqlalchemy==2.0.35
# via
# alembic
# dagster
starlette==0.38.6
# via
# dagster-graphql
# dagster-webserver
# fastapi
structlog==24.4.0
# via dagster
tabulate==0.9.0
# via dagster
tomli==2.0.1
# via dagster
toposort==1.10
# via dagster
tqdm==4.66.5
# via dagster
typing-extensions==4.12.2
# via
# alembic
# dagster
# dagster-polars
# fastapi
# flexcache
# flexparser
# pint
# pydantic
# pydantic-core
# reactivex
# sqlalchemy
tzdata==2024.2
# via pandas
universal-pathlib==0.2.5
# via
# dagster
# dagster-polars
urllib3==2.2.3
# via
# botocore
# docker
# influxdb-client
# kubernetes
# requests
uvicorn==0.31.0
# via dagster-webserver
uvloop==0.20.0
# via uvicorn
watchdog==5.0.3
# via dagster
watchfiles==0.24.0
# via uvicorn
websocket-client==1.8.0
# via kubernetes
websockets==13.1
# via uvicorn
wrapt==1.16.0
# via aiobotocore
xlsxwriter==3.2.0
yarl==1.13.1
# via
# aiohttp
# gql

238
dagster/requirements.txt Executable file
View File

@@ -0,0 +1,238 @@
# This file was autogenerated by uv via the following command:
# uv pip compile --output-file=requirements.txt pyproject.toml
aiobotocore==2.15.1
# via s3fs
aiohappyeyeballs==2.4.3
# via aiohttp
aiohttp==3.10.8
# via
# aiobotocore
# s3fs
aioitertools==0.12.0
# via aiobotocore
aiosignal==1.3.1
# via aiohttp
annotated-types==0.7.0
# via pydantic
anyio==4.6.0
# via starlette
appdirs==1.4.4
# via pint
asttokens==2.4.1
# via icecream
attrs==24.2.0
# via aiohttp
beautifulsoup4==4.12.3
boto3==1.35.23
# via aiobotocore
botocore==1.35.23
# via
# aiobotocore
# boto3
# s3transfer
cachetools==5.5.0
# via google-auth
certifi==2024.8.30
# via
# influxdb-client
# kubernetes
# pyogrio
# pyproj
# requests
charset-normalizer==3.3.2
# via requests
click==8.1.7
# via uvicorn
colorama==0.4.6
# via icecream
contourpy==1.3.0
# via matplotlib
cramjam==2.8.4
# via fastparquet
cycler==0.12.1
# via matplotlib
dnspython==2.6.1
# via email-validator
duckdb==1.1.1
durationpy==0.8
# via kubernetes
email-validator==2.2.0
# via pydantic
et-xmlfile==1.1.0
# via openpyxl
executing==2.1.0
# via icecream
fastapi==0.115.0
fastparquet==2024.5.0
flexcache==0.3
# via pint
flexparser==0.3.1
# via pint
fonttools==4.54.1
# via matplotlib
frozenlist==1.4.1
# via
# aiohttp
# aiosignal
fsspec==2024.9.0
# via
# fastparquet
# s3fs
geopandas==1.0.1
gitdb==4.0.11
# via gitpython
gitpython==3.1.43
google-auth==2.35.0
# via kubernetes
h11==0.14.0
# via uvicorn
icecream==2.1.3
idna==3.10
# via
# anyio
# email-validator
# requests
# yarl
influxdb-client==1.46.0
jmespath==1.0.1
# via
# boto3
# botocore
kiwisolver==1.4.7
# via matplotlib
kubernetes==31.0.0
lxml==5.3.0
matplotlib==3.9.2
# via seaborn
multidict==6.1.0
# via
# aiohttp
# yarl
networkx==3.3
numpy==2.1.1
# via
# contourpy
# fastparquet
# geopandas
# matplotlib
# pandas
# pyarrow
# pyogrio
# seaborn
# shapely
oauthlib==3.2.2
# via
# kubernetes
# requests-oauthlib
openpyxl==3.1.5
packaging==24.1
# via
# fastparquet
# geopandas
# matplotlib
# pyogrio
pandas==2.2.3
# via
# fastparquet
# geopandas
# pint-pandas
# seaborn
pillow==10.4.0
# via matplotlib
pint==0.24.3
# via pint-pandas
pint-pandas==0.6.2
pyarrow==17.0.0
pyasn1==0.6.1
# via
# pyasn1-modules
# rsa
pyasn1-modules==0.4.1
# via google-auth
pydantic==2.9.2
# via
# fastapi
# pydantic-settings
pydantic-core==2.23.4
# via pydantic
pydantic-settings==2.5.2
pygments==2.18.0
# via icecream
pyogrio==0.10.0
# via geopandas
pyparsing==3.1.4
# via matplotlib
pyproj==3.7.0
# via geopandas
pysocks==1.7.1
# via requests
python-dateutil==2.9.0.post0
# via
# botocore
# influxdb-client
# kubernetes
# matplotlib
# pandas
python-dotenv==1.0.1
# via pydantic-settings
pytz==2024.2
# via pandas
pyyaml==6.0.2
# via kubernetes
reactivex==4.0.4
# via influxdb-client
requests==2.32.3
# via
# kubernetes
# requests-oauthlib
requests-oauthlib==2.0.0
# via kubernetes
rsa==4.9
# via google-auth
s3fs==2024.9.0
s3transfer==0.10.2
# via boto3
seaborn==0.13.2
setuptools==75.1.0
# via influxdb-client
shapely==2.0.6
# via geopandas
six==1.16.0
# via
# asttokens
# kubernetes
# python-dateutil
smmap==5.0.1
# via gitdb
sniffio==1.3.1
# via anyio
soupsieve==2.6
# via beautifulsoup4
starlette==0.38.6
# via fastapi
structlog==24.4.0
typing-extensions==4.12.2
# via
# fastapi
# flexcache
# flexparser
# pint
# pydantic
# pydantic-core
# reactivex
tzdata==2024.2
# via pandas
urllib3==2.2.3
# via
# botocore
# influxdb-client
# kubernetes
# requests
uvicorn==0.31.0
websocket-client==1.8.0
# via kubernetes
wrapt==1.16.0
# via aiobotocore
xlsxwriter==3.2.0
yarl==1.13.1
# via aiohttp

View File

@@ -0,0 +1 @@
instance_id: 9a2d409d-a36a-492d-8f23-2f20c1f49bf4

View File

@@ -0,0 +1,3 @@
from icecream import install
install()

View File

181
dagster/src/app/vinyl/assets.py Executable file
View File

@@ -0,0 +1,181 @@
from datetime import datetime
from glob import glob
import duckdb
import polars as pl
import structlog
from duckdb.typing import DATE, VARCHAR
from app.vinyl.plato.fetch import scrape_plato
from app.vinyl.sounds.fetch import fetch_deals
from app.vinyl.utils import parse_date
from dagster import (
DailyPartitionsDefinition,
DimensionPartitionMapping,
Failure,
Field,
IdentityPartitionMapping,
MultiPartitionMapping,
MultiPartitionsDefinition,
OpExecutionContext,
StaticPartitionsDefinition,
TimeWindowPartitionMapping,
asset,
)
SOURCES = ["plato", "sounds"]
logger = structlog.get_logger()
partitions_def = MultiPartitionsDefinition(
{
"date": DailyPartitionsDefinition(start_date="2024-09-01", end_offset=1),
"source": StaticPartitionsDefinition(SOURCES),
}
)
partition_mapping = MultiPartitionMapping(
{
"date": DimensionPartitionMapping(
dimension_name="date",
partition_mapping=TimeWindowPartitionMapping(start_offset=-1, end_offset=0),
),
"source": DimensionPartitionMapping(
dimension_name="source",
partition_mapping=IdentityPartitionMapping(),
),
}
)
@asset(
io_manager_key="polars_parquet_io_manager",
partitions_def=partitions_def,
metadata={
"partition_by": ["date", "source"],
},
config_schema={
"import_dir": Field(str, default_value="/opt/dagster/home/storage/import")
},
)
def deals(context):
ic()
ic(context.partition_key)
ic(context.op_config)
import_dir = context.op_config["import_dir"]
partition_key = context.partition_key.keys_by_dimension
date_str = partition_key["date"]
source = partition_key["source"]
logger.info("Materializing deals", date=date_str, source=source)
date = datetime.strptime(partition_key["date"], "%Y-%m-%d")
days = (date - datetime.today()).days
ic(days)
if days > 0:
raise Failure(f"Cannot materialize for the future: {date.date()}")
if days < -1:
if source == "sounds":
pattern = f"{import_dir}/{date.date()}_*_sounds.csv"
logger.info("Looking for existing CSV files", pattern=pattern)
files = glob(pattern)
if len(files):
file = sorted(files)[-1]
logger.info("Using existing CSV file", file=file)
try:
df = pl.read_csv(file)
logger.info("Loaded CSV file", rows=len(df))
return df.with_columns(
**{k: pl.lit(v) for k, v in partition_key.items()}
)
except Exception as e:
logger.error("Failed to load CSV file!", error=e)
raise Failure(f"Cannot materialize for the past: {date.date()}")
if source == "plato":
logger.info("Scraping Plato")
df = scrape_plato()
logger.info("Scraped Plato", rows=len(df), head=df.head().to_markdown())
ic(df.columns)
return pl.from_pandas(df.assign(**partition_key))
if source == "sounds":
logger.info("Scraping Sounds")
df = fetch_deals()
ic(df.columns)
logger.info("Scraped Sounds", rows=len(df), head=df.head().to_markdown())
return pl.from_pandas(df.assign(**partition_key))
return pl.DataFrame(
[{"date": context.partition_key, "data": f"Data for {context.partition_key}"}]
)
@asset(deps=[deals], io_manager_key="polars_parquet_io_manager")
def new_deals(context: OpExecutionContext) -> pl.DataFrame:
ic()
storage_dir = context.instance.storage_directory()
asset_key = "deals"
with duckdb.connect() as con:
con.create_function("PARSE_DATE", parse_date, [VARCHAR], DATE)
return con.execute(
f"""
WITH tmp_plato AS (
SELECT
source,
CAST(date AS DATE) AS date,
ean AS id,
_artist AS artist,
LOWER(title) AS title,
CAST(_date AS DATE) AS release,
CAST(_price AS FLOAT) AS price,
CONCAT('https://www.platomania.nl', url) AS url,
FROM read_parquet('{storage_dir}/{asset_key}/*/plato.parquet', union_by_name = true)
), tmp_sounds AS (
SELECT
source,
date,
id,
LOWER(TRIM(COALESCE(artist, SPLIT(name, '-')[1]))) AS artist,
LOWER(TRIM(COALESCE(title, ARRAY_TO_STRING(split(name, '-')[2:], '-')))) AS title,
PARSE_DATE(release) AS release,
CAST(price AS FLOAT) AS price,
CONCAT('https://www.sounds.nl/detail/', id) AS url
FROM read_parquet('{storage_dir}/{asset_key}/*/sounds.parquet', union_by_name = true)
), tmp_both AS (
SELECT * FROM tmp_plato UNION ALL SELECT * FROM tmp_sounds
), tmp_rn AS (
SELECT
*,
ROW_NUMBER() OVER(PARTITION BY source, id, artist, title, price ORDER BY date DESC) as rn
FROM tmp_both
)
SELECT
source,
date,
id,
artist,
title,
release,
price,
url
FROM tmp_rn
WHERE rn = 1
ORDER BY date ASC
"""
).pl()
@asset(
io_manager_key="polars_parquet_io_manager",
)
def works(new_deals: pl.DataFrame) -> pl.DataFrame:
# Pandas
# columns = ["artist", "title"]
# return pl.from_pandas(new_deals[columns].to_pandas().drop_duplicates())
# Polars
# return new_deals[columns].unique(subset=columns)
# DuckDB
with duckdb.connect() as con:
return con.execute("SELECT DISTINCT artist, title, release FROM new_deals").pl()

58
dagster/src/app/vinyl/jobs.py Executable file
View File

@@ -0,0 +1,58 @@
from dagster import (
AssetKey,
AssetMaterialization,
OpExecutionContext,
define_asset_job,
job,
op,
)
from .assets import deals, new_deals, works
deals_job = define_asset_job(
"deals_job", selection=[deals], partitions_def=deals.partitions_def
)
@op
def check_partititions(context: OpExecutionContext):
# Replace with your asset/job name
asset_key = "deals"
context.log_event(
AssetMaterialization(asset_key=asset_key, partition="2024-09-30|sounds")
)
# Fetch the materializations for the asset key
materializations = context.instance.get_materialized_partitions(
asset_key=AssetKey(asset_key)
)
context.log.info("Existing partitions", extra=dict(partitions=materializations))
import polars as pl
storage_dir = context.instance.storage_directory()
ic(storage_dir)
for row in (
pl.scan_parquet(f"{storage_dir}/{asset_key}/*/*.parquet")
.select(["date", "source"])
.unique()
.collect()
.iter_rows()
):
partition = "|".join(row)
if partition not in materializations:
context.log.info(f"Missing partition: {partition}")
context.log_event(
AssetMaterialization(asset_key=asset_key, partition=partition)
)
@job
def check_partititions_job():
check_partititions()
musicbrainz_lookup_job = define_asset_job(
"musicbrainz_lookup_job", selection=[works, new_deals]
)

View File

View File

@@ -0,0 +1,154 @@
import os
import boto3
import pandas as pd
from botocore.exceptions import NoCredentialsError, PartialCredentialsError
from dotenv import load_dotenv
from fetch import scrape_plato
from utils import get
def update_database(articles_df=None, database_file="/home/user/plato.parquet"):
if os.path.exists(database_file):
database_df = pd.read_parquet(database_file)
else:
database_df = None
if articles_df is None:
new_df = None if database_df is None else database_df.head(0)
else:
if database_df is None:
articles_df.to_parquet(database_file)
return articles_df, articles_df
compare = ["ean", "_price"]
check_df = pd.merge(
database_df[compare], articles_df[compare], how="right", indicator=True
)
new_df = (
check_df[check_df["_merge"] == "right_only"]
.drop(columns="_merge")
.merge(articles_df)
)
database_df = (
pd.concat([database_df, new_df])
.sort_values("_date")
.groupby("ean")
.last()
.reset_index()
)
database_df.to_parquet(database_file)
return database_df, new_df
def send_email(lines):
# Define the email parameters
SENDER = "mail@veenboer.xyz"
RECIPIENT = "rik.veenboer@gmail.com"
SUBJECT = "Aanbieding op plato!"
# The email body for recipients with non-HTML email clients
BODY_TEXT = ""
# The HTML body of the email
tmp = "\n".join(lines)
BODY_HTML = f"""<html>
<head></head>
<body>
{tmp}
</html>
"""
# The character encoding for the email
CHARSET = "UTF-8"
# Try to send the email
try:
client = boto3.client(
"ses", region_name="eu-west-1"
) # Change the region as needed
# Provide the contents of the email
response = client.send_email(
Destination={
"ToAddresses": [
RECIPIENT,
],
},
Message={
"Body": {
"Html": {
"Charset": CHARSET,
"Data": BODY_HTML,
},
"Text": {
"Charset": CHARSET,
"Data": BODY_TEXT,
},
},
"Subject": {
"Charset": CHARSET,
"Data": SUBJECT,
},
},
Source=SENDER,
)
# Display an error if something goes wrong.
except NoCredentialsError:
print("Credentials not available")
except PartialCredentialsError:
print("Incomplete credentials provided")
except Exception as e:
print(f"Error: {e}")
else:
print("Email sent! Message ID:"),
print(response["MessageId"])
def main(dry=False):
load_dotenv("/opt/.env")
local_ip = get("http://ifconfig.me", False).text
get_ip = get("http://ifconfig.me").text
print(f"Local IP = {local_ip}")
print(f"Request IP = {get_ip}")
assert local_ip != get_ip
artists = open("/home/user/artists.txt").read().strip().splitlines()
print(f"Number of known artists = {len(artists)}")
if dry:
articles_df = None
else:
articles_df = scrape_plato(get=get)
database_df, new_df = update_database(articles_df)
if dry:
new_df = database_df.sample(20)
print(f"Database size = {len(database_df)}")
print(f"New = {len(new_df)}")
# new_df = new_df[new_df['_artist'].isin(artists)].query('_price <= 25')
new_df = new_df.query('_price <= 25 and ean != ""')
print(f"Interesting = {len(new_df)}")
if new_df is not None and len(new_df):
message = []
for _, row in new_df.head(10).iterrows():
message.append(
f'<a href="https://www.platomania.nl{row.url}"><h1>NEW</h1></a>'
)
message.append("<ul>")
message.append(f"<li>[artist] {row.artist}</li>")
message.append(f"<li>[title] {row.title}</li>")
message.append(f"<li>[price] {row.price}</li>")
message.append(f"<li>[release] {row.release_date}</li>")
message.append("</ul>")
send_email(message)
if __name__ == "__main__":
cwd = os.path.dirname(__file__)
main(dry=False)

View File

@@ -0,0 +1,52 @@
#!/root/.pyenv/versions/dev/bin/python
import re
from datetime import datetime
import pandas as pd
from .scrape import get_soup, scrape_page, scrape_page_links
def scrape_plato(get=None):
ic()
url = "https://www.platomania.nl/vinyl-aanbiedingen?page=1"
ic(url)
soup = get_soup(url=url, get=get)
articles_info = scrape_page(soup)
ic(len(articles_info))
links = sorted(set(scrape_page_links(soup)), key=lambda x: int(x.split("=")[-1]))
for link in links:
ic(link)
soup = get_soup(url=link, get=get)
tmp = scrape_page(soup)
ic(len(tmp))
articles_info.extend(tmp)
def clean(name):
tmp = " ".join(reversed(name.split(", ")))
tmp = tmp.lower()
tmp = re.sub(r"\s+\([^)]*\)", "", tmp)
return tmp
articles_df = pd.DataFrame(articles_info).reindex(
columns=[
"artist",
"title",
"url",
"label",
"release_date",
"origin",
"item_number",
"ean",
"delivery_info",
"price",
]
)
articles_df["_artist"] = articles_df["artist"].map(clean)
articles_df["_price"] = articles_df["price"].map(lambda x: float(x.split(" ")[-1]))
articles_df["_date"] = datetime.now()
return articles_df

View File

@@ -0,0 +1,79 @@
import requests
from bs4 import BeautifulSoup
def get_soup(url, get=None):
# Send a GET request to the specified URL
if get is None:
get = requests.get
response = get(url)
# Check if the request was successful
if response.status_code == 200:
# Parse the HTML content of the page
return BeautifulSoup(response.content, "html.parser")
else:
raise ValueError(
f"Failed to retrieve the page. Status code: {response.status_code}"
)
def scrape_page_links(soup):
# Find all <li> elements with class "page-item"
page_items = soup.find_all("li", class_="page-item")
# Extract the href attribute of <a> tags within these <li> elements
links = []
for item in page_items:
a_tag = item.find("a", class_="page-link")
if a_tag and "href" in a_tag.attrs:
links.append(a_tag["href"])
return links
def extract_article_info(article):
info = {}
# Extract the artist name
artist_tag = article.find("h1", class_="product-card__artist")
info["artist"] = artist_tag.text.strip() if artist_tag else None
# Extract the title and URL
title_tag = article.find("h2", class_="product-card__title")
info["title"] = title_tag.text.strip() if title_tag else None
url_tag = title_tag.find_parent("a") if title_tag else None
info["url"] = url_tag["href"] if url_tag else None
# Extract additional details
details = article.find_all("div", class_="article-details__text")
for detail in details:
text = detail.text.strip()
if "Label:" in text:
info["label"] = text.replace("Label: ", "").strip()
elif "Releasedatum:" in text:
info["release_date"] = text.replace("Releasedatum: ", "").strip()
elif "Herkomst:" in text:
info["origin"] = text.replace("Herkomst: ", "").strip()
elif "Item-nr:" in text:
info["item_number"] = text.replace("Item-nr: ", "").strip()
elif "EAN:" in text:
info["ean"] = text.replace("EAN:", "").strip()
# Extract delivery information
delivery_tag = article.find("div", class_="article-details__delivery-text")
info["delivery_info"] = delivery_tag.text.strip() if delivery_tag else None
# Extract price
price_tag = article.find("div", class_="article__price")
info["price"] = price_tag.text.strip() if price_tag else None
return info
def scrape_page(soup):
# Find all article blocks
article_blocks = soup.find_all("article", class_="article LP")
# Extract information from each article block
return [extract_article_info(article) for article in article_blocks]

View File

@@ -0,0 +1,10 @@
import requests
def get(url, proxy=True):
if proxy:
tmp = "socks5://localhost:1080"
kwargs = dict(proxies=dict(http=tmp, https=tmp))
else:
kwargs = {}
return requests.get(url, **kwargs)

31
dagster/src/app/vinyl/repo.py Executable file
View File

@@ -0,0 +1,31 @@
from collections.abc import Sequence
from dagster_duckdb import DuckDBIOManager
from dagster_duckdb.io_manager import DbTypeHandler
from dagster_duckdb_pandas import DuckDBPandasTypeHandler
from dagster_polars import PolarsParquetIOManager
from dagster import Definitions
from .assets import deals, new_deals, works
from .jobs import check_partititions_job, deals_job, musicbrainz_lookup_job
from .schedules import deals_schedule
from .sensors import musicbrainz_lookup_sensor
class PandasDuckDBIOManager(DuckDBIOManager):
@staticmethod
def type_handlers() -> Sequence[DbTypeHandler]:
return [DuckDBPandasTypeHandler()]
vinyl = Definitions(
assets=[deals, new_deals, works],
resources={
"polars_parquet_io_manager": PolarsParquetIOManager(),
"duckdb_io_manager": PandasDuckDBIOManager(database="vinyl"),
},
jobs=[deals_job, check_partititions_job, musicbrainz_lookup_job],
schedules=[deals_schedule],
sensors=[musicbrainz_lookup_sensor],
)

View File

@@ -0,0 +1,10 @@
from dagster import DefaultScheduleStatus, build_schedule_from_partitioned_job
from app.vinyl.repo import deals_job
deals_schedule = build_schedule_from_partitioned_job(
job=deals_job,
hour_of_day=7,
# execution_timezone="Europe/Amsterdam",
default_status=DefaultScheduleStatus.RUNNING,
)

View File

@@ -0,0 +1,21 @@
from app.vinyl.assets import deals
from app.vinyl.jobs import musicbrainz_lookup_job
from dagster import (
DefaultSensorStatus,
EventLogEntry,
RunRequest,
SensorEvaluationContext,
asset_sensor,
)
@asset_sensor(
asset_key=deals.key,
job=musicbrainz_lookup_job,
default_status=DefaultSensorStatus.RUNNING,
)
def musicbrainz_lookup_sensor(
context: SensorEvaluationContext, asset_event: EventLogEntry
):
assert asset_event.dagster_event and asset_event.dagster_event.asset_key
yield RunRequest(run_key=context.cursor)

View File

View File

@@ -0,0 +1,80 @@
#!/usr/bin/python3
import glob
import os
from datetime import datetime
import pandas as pd
def get_csvs(directory, n):
# List all files matching the pattern *_sounds.csv
suffix = "_sounds.csv"
files = glob.glob(os.path.join(directory, f"*{suffix}"))
# Function to extract date from filename
def extract_date_from_filename(filename):
# Extract the date string
basename = os.path.basename(filename)
date_str = basename.split(suffix)[0]
try:
return datetime.strptime(date_str, "%Y-%m-%d_%H:%M:%S")
except ValueError:
# The date string cannot be parsed
return None
# Create a list of tuples (date, filename), ignoring files with unparsable dates
result = [(extract_date_from_filename(file), file) for file in files]
result = [item for item in result if item[0] is not None]
# Sort the list by date in descending order (most recent first)
result.sort(key=lambda x: x[0], reverse=True)
# Return the two most recent files
return [x[1] for x in result[:n]]
def analyze(df1, df2):
df1 = df1.drop_duplicates(subset="id")
df2 = df2.drop_duplicates(subset="id")
combined_df = pd.merge(
df1[["id", "price"]], df2, on="id", how="right", indicator=True
)
combined_df["discount"] = combined_df.price_y - combined_df.price_x
combined_df.drop(columns=["price_x"], inplace=True)
combined_df.rename(columns={"price_y": "price"}, inplace=True)
deals = combined_df.query("discount < 0").sort_values(by="discount")[
["id", "name", "price", "discount"]
]
new = combined_df.query("_merge == 'right_only'").sort_values(by="price")[
["id", "name", "price"]
]
return deals, new
if __name__ == "__main__":
csvs = get_csvs(".", 100)
for i in range(1, len(csvs)):
print(f"Comparing {csvs[i]} with {csvs[0]}")
df_previous = pd.read_csv(csvs[i], index_col=0)
df_latest = pd.read_csv(csvs[0], index_col=0)
deals, new = analyze(df_previous, df_latest)
done = False
if len(deals) > 0:
print()
print("New items:")
print(new)
print()
done = True
if len(deals) > 0:
print(f"Discounted items:")
print(deals)
done = True
if done:
break

View File

@@ -0,0 +1,110 @@
#!/usr/bin/python3
import time
from datetime import datetime
import pandas as pd
import requests
from bs4 import BeautifulSoup
from tqdm import tqdm
def get_page_count(html_content):
soup = BeautifulSoup(html_content, "html.parser")
# Find all pagination links
page_links = soup.select("ul.pagination li a")
# Extract the numbers from the hrefs and convert to integers
page_numbers = [
int(link.get_text()) for link in page_links if link.get_text().isdigit()
]
return max(page_numbers)
def parse_page(html_content):
entries = []
soup = BeautifulSoup(html_content, "html.parser")
for product in soup.find_all("div", {"class": "search-product"}):
item_id = product.find("a", rel=True)["rel"][0]
name = product.find("h5").text.strip()
artist_title = name.split("-")
artist = artist_title[0].strip()
title = artist_title[1].strip()
price = (
product.find("span", class_="product-price")
.text.strip()
.replace("", "")
.strip()
)
entry = {
"id": item_id,
"name": name,
"artist": artist,
"title": title,
"price": price,
}
if detail := product.find("h6", {"class": "hide-for-small"}):
entry["detail"] = detail.text
if supply := product.find("div", {"class": "product-voorraad"}):
entry["supply"] = supply.text
for info in product.find_all("div", {"class": "product-info"}):
info = info.text.split(":")
if "Genre" in info[0]:
entry["genre"] = info[1].strip()
if "Releasedatum" in info[0]:
entry["release"] = info[1].strip()
entries.append(entry)
return pd.DataFrame(entries).reindex(
columns=[
"id",
"name",
"artist",
"title",
"price",
"supply",
"release",
"genre",
"detail",
]
)
def fetch_deals():
# Get page count
page_count = get_page_count(
requests.get("https://www.sounds.nl/uitverkoop/1/lp/all/art").text
)
time.sleep(1)
print(f"Number of pages: {page_count}")
# Parse all pages
base_url = "https://www.sounds.nl/uitverkoop/{page_number}/lp/all"
dfs = []
for i in tqdm(range(page_count)):
df = parse_page(requests.get(base_url.format(page_number=i)).text)
dfs.append(df)
time.sleep(2)
# Combine dfs
return pd.concat(dfs) if dfs else pd.DataFrame(columns=["id", "name", "price"])
if __name__ == "__main__":
df = fetch_deals()
print(f"Found {len(df)} deals")
# Show current deals
print(df.sort_values(by="price").head(10))
# Write to file
now = datetime.now()
prefix = now.strftime("%Y-%m-%d_%H:%M:%S")
directory = "/home/bram/src/python"
filepath = f"{directory}/{prefix}_sounds.csv"
print(f"Writing data to {filepath}")
df.to_csv(filepath)

41
dagster/src/app/vinyl/test.py Executable file
View File

@@ -0,0 +1,41 @@
import warnings
from datetime import datetime
from dagster import materialize
from dagster_polars import PolarsParquetIOManager
from app.vinyl.assets import deals
from app.vinyl.jobs import check_partititions_job
warnings.filterwarnings("ignore", category=UserWarning)
import logging
logging.getLogger().setLevel(logging.INFO)
resources = {
"polars_parquet_io_manager": PolarsParquetIOManager(base_dir="/opt/dagster/storage")
}
def test_deals(source="sounds", date: str = None):
if not date:
today = datetime.today().strftime("%Y-%m-%d")
date = today
result = materialize(
[deals],
partition_key=f"{date}|{source}",
resources=resources,
run_config={
"loggers": {"console": {"config": {"log_level": "ERROR"}}},
"ops": {"deals": {"config": {"import_dir": "/opt/dagster/storage/import"}}},
},
)
assert result.success
ic(result.asset_value)
if __name__ == "__main__":
# test_deals(source="plato")
check_partititions_job.execute_in_process()

29
dagster/src/app/vinyl/utils.py Executable file
View File

@@ -0,0 +1,29 @@
import datetime
def parse_date(dutch_date: str):
# Create a dictionary to map Dutch month names to English
dutch_to_english_months = {
"januari": "January",
"februari": "February",
"maart": "March",
"april": "April",
"mei": "May",
"juni": "June",
"juli": "July",
"augustus": "August",
"september": "September",
"oktober": "October",
"november": "November",
"december": "December",
}
# Split the date and replace the Dutch month with its English equivalent
day, dutch_month, year = dutch_date.split()
english_month = dutch_to_english_months[dutch_month]
# Rebuild the date string in English format
english_date = f"{day} {english_month} {year}"
# Parse the date using strptime
return datetime.datetime.strptime(english_date, "%d %B %Y").date()

1
dagster/src/repo.py Normal file
View File

@@ -0,0 +1 @@
from app.vinyl.repo import vinyl # noqa