From 0196f8bd27654b486f078cb28e47f6d7a2dcffd1 Mon Sep 17 00:00:00 2001 From: Rik Veenboer Date: Mon, 14 Oct 2024 09:58:24 +0200 Subject: [PATCH] initial commit --- .gitignore | 5 + Dockerfile.code | 19 ++ Dockerfile.system | 17 ++ Makefile | 20 ++ __init__.py | 0 dagster-requirements.txt | 398 +++++++++++++++++++++++++++++ dagster.yaml | 66 +++++ docker-compose.code.yaml | 47 ++++ docker-compose.system.yaml | 90 +++++++ docker-compose.yaml | 8 + poetry.lock | 19 ++ pyproject.toml | 70 +++++ requirements.txt | 245 ++++++++++++++++++ src/__init__.py | 0 src/app/__init__.py | 3 + src/app/partitions/__init__.py | 0 src/app/partitions/assets.py | 97 +++++++ src/app/partitions/mapping.py | 108 ++++++++ src/app/partitions/repo.py | 13 + src/app/partitions/test.py | 55 ++++ src/app/sync.sh | 7 + src/app/test.py | 34 +++ src/app/vinyl/__init__.py | 0 src/app/vinyl/assets.py | 115 +++++++++ src/app/vinyl/jobs.py | 39 +++ src/app/vinyl/plato/__init__.py | 0 src/app/vinyl/plato/check_plato.py | 191 ++++++++++++++ src/app/vinyl/plato/scrape.py | 77 ++++++ src/app/vinyl/repo.py | 13 + src/app/vinyl/schedules.py | 10 + src/app/vinyl/sounds/__init__.py | 0 src/app/vinyl/sounds/deals.py | 80 ++++++ src/app/vinyl/sounds/fetch.py | 84 ++++++ src/app/vinyl/test.py | 48 ++++ workspace.yaml | 27 ++ 35 files changed, 2005 insertions(+) create mode 100644 .gitignore create mode 100644 Dockerfile.code create mode 100644 Dockerfile.system create mode 100644 Makefile create mode 100644 __init__.py create mode 100644 dagster-requirements.txt create mode 100644 dagster.yaml create mode 100644 docker-compose.code.yaml create mode 100644 docker-compose.system.yaml create mode 100644 docker-compose.yaml create mode 100644 poetry.lock create mode 100644 pyproject.toml create mode 100644 requirements.txt create mode 100644 src/__init__.py create mode 100644 src/app/__init__.py create mode 100644 src/app/partitions/__init__.py create mode 100644 src/app/partitions/assets.py create mode 100644 src/app/partitions/mapping.py create mode 100644 src/app/partitions/repo.py create mode 100644 src/app/partitions/test.py create mode 100644 src/app/sync.sh create mode 100644 src/app/test.py create mode 100644 src/app/vinyl/__init__.py create mode 100644 src/app/vinyl/assets.py create mode 100644 src/app/vinyl/jobs.py create mode 100644 src/app/vinyl/plato/__init__.py create mode 100755 src/app/vinyl/plato/check_plato.py create mode 100644 src/app/vinyl/plato/scrape.py create mode 100644 src/app/vinyl/repo.py create mode 100644 src/app/vinyl/schedules.py create mode 100644 src/app/vinyl/sounds/__init__.py create mode 100644 src/app/vinyl/sounds/deals.py create mode 100644 src/app/vinyl/sounds/fetch.py create mode 100644 src/app/vinyl/test.py create mode 100644 workspace.yaml diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..02ec0c3 --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +.env +storage/ +.idea/ +.ipynb_checkpoints/ +trash/ \ No newline at end of file diff --git a/Dockerfile.code b/Dockerfile.code new file mode 100644 index 0000000..01b5ee9 --- /dev/null +++ b/Dockerfile.code @@ -0,0 +1,19 @@ +FROM python:3.12-slim + +# Checkout and install dagster libraries needed to run the gRPC server +# exposing your repository to dagit and dagster-daemon, and to load the DagsterInstance + +COPY dagster-requirements.txt requirements.txt +RUN pip install uv +RUN uv pip install -r requirements.txt --system + +# Add repository code +WORKDIR /opt/dagster/home + +# Run dagster gRPC server on port 4000 +EXPOSE 4000 + +# CMD allows this to be overridden from run launchers or executors that want +# to run other commands against your repository +#CMD ["dagster", "api", "grpc", "-h", "0.0.0.0", "-p", "4000", "-f", "repo.py"] +CMD ["dagster", "code-server", "start", "-h", "0.0.0.0", "-p", "4000", "-f", "repo.py"] diff --git a/Dockerfile.system b/Dockerfile.system new file mode 100644 index 0000000..ffeb699 --- /dev/null +++ b/Dockerfile.system @@ -0,0 +1,17 @@ +# Dagster libraries to run both dagit and the dagster-daemon. Does not +# need to have access to any pipeline code. + +FROM python:3.12-slim + +COPY dagster-requirements.txt requirements.txt +RUN pip install uv +RUN uv pip install -r requirements.txt --system + +# Set $DAGSTER_HOME and copy dagster instance and workspace YAML there +ENV DAGSTER_HOME=/opt/dagster/home/ + +RUN mkdir -p $DAGSTER_HOME + +COPY dagster.yaml workspace.yaml $DAGSTER_HOME + +WORKDIR $DAGSTER_HOME diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..ff04132 --- /dev/null +++ b/Makefile @@ -0,0 +1,20 @@ +requirements.txt: pyproject.toml + uv pip compile $(UPGRADE) --output-file=requirements.txt pyproject.toml >/dev/null + +dagster-requirements.txt: requirements.txt pyproject.toml + uv pip compile $(UPGRADE) --constraint=requirements.txt --output-file=dagster-requirements.txt --extra=dagster pyproject.toml >/dev/null + +sync: virtualenv + uv pip sync requirements.txt + +upgrade-deps: virtualenv + touch pyproject.toml + $(MAKE) UPGRADE="--upgrade" dev-requirements.txt + +install-tools: virtualenv + pip install $(UPGRADE) pip wheel pip-tools uv + +upgrade-tools: virtualenv + $(MAKE) UPGRADE="--upgrade" install-tools + +upgrade: upgrade-tools upgrade-pre-commit upgrade-deps sync diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/dagster-requirements.txt b/dagster-requirements.txt new file mode 100644 index 0000000..dafb3b1 --- /dev/null +++ b/dagster-requirements.txt @@ -0,0 +1,398 @@ +# This file was autogenerated by uv via the following command: +# uv pip compile --constraint=requirements.txt --output-file=dagster-requirements.txt --extra=dagster pyproject.toml +aiobotocore==2.15.1 + # via s3fs +aiohappyeyeballs==2.4.3 + # via aiohttp +aiohttp==3.10.8 + # via + # aiobotocore + # s3fs +aioitertools==0.12.0 + # via aiobotocore +aiosignal==1.3.1 + # via aiohttp +alembic==1.13.3 + # via dagster +aniso8601==9.0.1 + # via graphene +annotated-types==0.7.0 + # via pydantic +anyio==4.6.0 + # via + # gql + # starlette + # watchfiles +appdirs==1.4.4 + # via pint +asttokens==2.4.1 + # via icecream +async-timeout==4.0.3 + # via aiohttp +attrs==24.2.0 + # via aiohttp +backoff==2.2.1 + # via gql +beautifulsoup4==4.12.3 +boto3==1.35.23 + # via + # aiobotocore + # dagster-aws +botocore==1.35.23 + # via + # aiobotocore + # boto3 + # s3transfer +cachetools==5.5.0 + # via google-auth +certifi==2024.8.30 + # via + # influxdb-client + # kubernetes + # pyogrio + # pyproj + # requests +charset-normalizer==3.3.2 + # via requests +click==8.1.7 + # via + # dagster + # dagster-webserver + # uvicorn +colorama==0.4.6 + # via icecream +coloredlogs==14.0 + # via dagster +contourpy==1.3.0 + # via matplotlib +cramjam==2.8.4 + # via fastparquet +croniter==3.0.3 + # via dagster +cycler==0.12.1 + # via matplotlib +dagit==1.8.9 +dagster==1.8.9 + # via + # dagster-aws + # dagster-docker + # dagster-graphql + # dagster-polars + # dagster-postgres + # dagster-webserver +dagster-aws==0.24.9 +dagster-docker==0.24.9 +dagster-graphql==1.8.9 + # via dagster-webserver +dagster-pipes==1.8.9 + # via dagster +dagster-polars==0.24.9 +dagster-postgres==0.24.9 +dagster-webserver==1.8.9 + # via dagit +dnspython==2.6.1 + # via email-validator +docker==7.1.0 + # via dagster-docker +docker-image-py==0.1.13 + # via dagster-docker +docstring-parser==0.16 + # via dagster +duckdb==1.1.1 +durationpy==0.8 + # via kubernetes +email-validator==2.2.0 + # via pydantic +et-xmlfile==1.1.0 + # via openpyxl +exceptiongroup==1.2.2 + # via anyio +executing==2.1.0 + # via icecream +fastapi==0.115.0 +fastparquet==2024.5.0 +filelock==3.16.1 + # via dagster +flexcache==0.3 + # via pint +flexparser==0.3.1 + # via pint +fonttools==4.54.1 + # via matplotlib +frozenlist==1.4.1 + # via + # aiohttp + # aiosignal +fsspec==2024.9.0 + # via + # fastparquet + # s3fs + # universal-pathlib +geopandas==1.0.1 +gitdb==4.0.11 + # via gitpython +gitpython==3.1.43 +google-auth==2.35.0 + # via kubernetes +gql==3.5.0 + # via dagster-graphql +graphene==3.3 + # via dagster-graphql +graphql-core==3.2.4 + # via + # gql + # graphene + # graphql-relay +graphql-relay==3.2.0 + # via graphene +grpcio==1.66.2 + # via + # dagster + # grpcio-health-checking +grpcio-health-checking==1.62.3 + # via dagster +h11==0.14.0 + # via uvicorn +httptools==0.6.1 + # via uvicorn +humanfriendly==10.0 + # via coloredlogs +icecream==2.1.3 +idna==3.10 + # via + # anyio + # email-validator + # requests + # yarl +influxdb-client==1.46.0 +jinja2==3.1.4 + # via dagster +jmespath==1.0.1 + # via + # boto3 + # botocore +kiwisolver==1.4.7 + # via matplotlib +kubernetes==31.0.0 +lxml==5.3.0 +mako==1.3.5 + # via alembic +markdown-it-py==3.0.0 + # via rich +markupsafe==2.1.5 + # via + # jinja2 + # mako +matplotlib==3.9.2 + # via seaborn +mdurl==0.1.2 + # via markdown-it-py +multidict==6.1.0 + # via + # aiohttp + # yarl +networkx==3.3 +numpy==2.1.1 + # via + # contourpy + # fastparquet + # geopandas + # matplotlib + # pandas + # pyarrow + # pyogrio + # seaborn + # shapely +oauthlib==3.2.2 + # via + # kubernetes + # requests-oauthlib +openpyxl==3.1.5 +packaging==24.1 + # via + # dagster + # dagster-aws + # fastparquet + # geopandas + # matplotlib + # pyogrio +pandas==2.2.3 + # via + # fastparquet + # geopandas + # pint-pandas + # seaborn +pillow==10.4.0 + # via matplotlib +pint==0.24.3 + # via pint-pandas +pint-pandas==0.6.2 +polars==1.9.0 + # via dagster-polars +protobuf==4.25.5 + # via + # dagster + # grpcio-health-checking +psycopg2-binary==2.9.9 + # via dagster-postgres +pyarrow==17.0.0 + # via dagster-polars +pyasn1==0.6.1 + # via + # pyasn1-modules + # rsa +pyasn1-modules==0.4.1 + # via google-auth +pydantic==2.9.2 + # via + # dagster + # fastapi + # pydantic-settings +pydantic-core==2.23.4 + # via pydantic +pydantic-settings==2.5.2 +pygments==2.18.0 + # via + # icecream + # rich +pyogrio==0.10.0 + # via geopandas +pyparsing==3.1.4 + # via matplotlib +pyproj==3.7.0 + # via geopandas +pysocks==1.7.1 + # via requests +python-dateutil==2.9.0.post0 + # via + # botocore + # croniter + # influxdb-client + # kubernetes + # matplotlib + # pandas +python-dotenv==1.0.1 + # via + # dagster + # pydantic-settings + # uvicorn +pytz==2024.2 + # via + # croniter + # dagster + # pandas +pyyaml==6.0.2 + # via + # dagster + # kubernetes + # uvicorn +reactivex==4.0.4 + # via influxdb-client +regex==2024.9.11 + # via docker-image-py +requests==2.32.3 + # via + # dagster + # dagster-aws + # dagster-graphql + # docker + # gql + # kubernetes + # requests-oauthlib + # requests-toolbelt +requests-oauthlib==2.0.0 + # via kubernetes +requests-toolbelt==1.0.0 + # via gql +rich==13.8.1 + # via dagster +rsa==4.9 + # via google-auth +s3fs==2024.9.0 +s3transfer==0.10.2 + # via boto3 +seaborn==0.13.2 +setuptools==75.1.0 + # via + # dagster + # influxdb-client +shapely==2.0.6 + # via geopandas +six==1.16.0 + # via + # asttokens + # kubernetes + # python-dateutil +smmap==5.0.1 + # via gitdb +sniffio==1.3.1 + # via anyio +soupsieve==2.6 + # via beautifulsoup4 +sqlalchemy==2.0.35 + # via + # alembic + # dagster +starlette==0.38.6 + # via + # dagster-graphql + # dagster-webserver + # fastapi +structlog==24.4.0 + # via dagster +tabulate==0.9.0 + # via dagster +tomli==2.0.1 + # via dagster +toposort==1.10 + # via dagster +tqdm==4.66.5 + # via dagster +typing-extensions==4.12.2 + # via + # alembic + # anyio + # dagster + # dagster-polars + # fastapi + # flexcache + # flexparser + # multidict + # pint + # pydantic + # pydantic-core + # reactivex + # sqlalchemy + # uvicorn +tzdata==2024.2 + # via pandas +universal-pathlib==0.2.5 + # via + # dagster + # dagster-polars +urllib3==2.2.3 + # via + # botocore + # docker + # influxdb-client + # kubernetes + # requests +uvicorn==0.31.0 + # via dagster-webserver +uvloop==0.20.0 + # via uvicorn +watchdog==5.0.3 + # via dagster +watchfiles==0.24.0 + # via uvicorn +websocket-client==1.8.0 + # via kubernetes +websockets==13.1 + # via uvicorn +wrapt==1.16.0 + # via aiobotocore +xlsxwriter==3.2.0 +yarl==1.13.1 + # via + # aiohttp + # gql diff --git a/dagster.yaml b/dagster.yaml new file mode 100644 index 0000000..48a5598 --- /dev/null +++ b/dagster.yaml @@ -0,0 +1,66 @@ +telemetry: + enabled: false + +run_coordinator: + module: dagster.core.run_coordinator + class: QueuedRunCoordinator + +run_launcher: + module: dagster_docker + class: DockerRunLauncher + config: + env_vars: + - DAGSTER_POSTGRES_USER + - DAGSTER_POSTGRES_PASSWORD + - DAGSTER_POSTGRES_DB + network: dagster + container_kwargs: + volumes: + - /opt/dagster/src/app/:/opt/dagster/home/app/ + - /opt/dagster/src/repo.py:/opt/dagster/home/repo.py + + # - /opt/dagster/storage/:/opt/dagster/home/storage/ + - /opt/dagster/storage/import/:/opt/dagster/home/storage/import/ + - /opt/dagster/storage/deals/:/opt/dagster/home/storage/deals/ + +run_storage: + module: dagster_postgres.run_storage + class: PostgresRunStorage + config: + postgres_db: + hostname: postgresql + username: + env: DAGSTER_POSTGRES_USER + password: + env: DAGSTER_POSTGRES_PASSWORD + db_name: + env: DAGSTER_POSTGRES_DB + port: 5432 + +schedule_storage: + module: dagster_postgres.schedule_storage + class: PostgresScheduleStorage + config: + postgres_db: + hostname: postgresql + username: + env: DAGSTER_POSTGRES_USER + password: + env: DAGSTER_POSTGRES_PASSWORD + db_name: + env: DAGSTER_POSTGRES_DB + port: 5432 + +event_log_storage: + module: dagster_postgres.event_log + class: PostgresEventLogStorage + config: + postgres_db: + hostname: postgresql + username: + env: DAGSTER_POSTGRES_USER + password: + env: DAGSTER_POSTGRES_PASSWORD + db_name: + env: DAGSTER_POSTGRES_DB + port: 5432 diff --git a/docker-compose.code.yaml b/docker-compose.code.yaml new file mode 100644 index 0000000..6b87332 --- /dev/null +++ b/docker-compose.code.yaml @@ -0,0 +1,47 @@ +x-dagster-env: &dagster_env + DAGSTER_POSTGRES_USER: ${POSTGRES_USER} + DAGSTER_POSTGRES_PASSWORD: ${POSTGRES_PASSWORD} + DAGSTER_POSTGRES_DB: ${POSTGRES_DB} + DAGSTER_CURRENT_IMAGE: ${DAGSTER_CURRENT_IMAGE} + +x-volumes: &volumes + volumes: + #- /opt/dagster/storage/:/opt/dagster/home/storage/ + - /opt/dagster/storage/import/:/opt/dagster/home/storage/import/ + - /opt/dagster/storage/deals/:/opt/dagster/home/storage/deals/ + - /opt/dagster/src/app/:/opt/dagster/home/app/ + - /opt/dagster/src/repo.py:/opt/dagster/home/repo.py + +services: + # This service runs the gRPC server that loads your user code, in both dagit + # and dagster-daemon. By setting DAGSTER_CURRENT_IMAGE to its own image, we tell the + # run launcher to use this same image when launching runs in a new container as well. + # Multiple containers like this can be deployed separately - each just needs to run on + # its own port, and have its own entry in the workspace.yaml file that's loaded by dagit. + user_code: + build: + context: . + dockerfile: Dockerfile.code + container_name: user_code + image: user_code_image + restart: always + environment: + <<: *dagster_env + <<: *volumes + networks: + - dagster + + other_image: + profiles: [ disabled ] + build: + context: . + dockerfile: Dockerfile + container_name: other_image + image: user_code_image + restart: always + environment: + <<: *dagster_env + DAGSTER_CURRENT_IMAGE: something_else + <<: *volumes + networks: + - dagster diff --git a/docker-compose.system.yaml b/docker-compose.system.yaml new file mode 100644 index 0000000..082e206 --- /dev/null +++ b/docker-compose.system.yaml @@ -0,0 +1,90 @@ +x-postgres-env: &postgres_env + POSTGRES_USER: ${POSTGRES_USER} + POSTGRES_PASSWORD: ${POSTGRES_PASSWORD} + POSTGRES_DB: ${POSTGRES_DB} +x-aws-env: &aws_env + AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID} + AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY} +x-dagster-env: &dagster_env + DAGSTER_POSTGRES_USER: ${POSTGRES_USER} + DAGSTER_POSTGRES_PASSWORD: ${POSTGRES_PASSWORD} + DAGSTER_POSTGRES_DB: ${POSTGRES_DB} + DAGSTER_CURRENT_IMAGE: ${DAGSTER_CURRENT_IMAGE} + +x-volumes: &volumes + volumes: + - /opt/dagster/dagster.yaml:/opt/dagster/home/dagster.yaml + - /opt/dagster/workspace.yaml:/opt/dagster/home/workspace.yaml + - /var/run/docker.sock:/var/run/docker.sock + + # - /opt/dagster/storage/:/opt/dagster/home/storage/ + - /opt/dagster/storage/import/:/opt/dagster/home/storage/import/ + - /opt/dagster/storage/deals/:/opt/dagster/home/storage/deals/ + + - /opt/dagster/src/app/:/opt/dagster/home/app/ + - /opt/dagster/src/repo.py:/opt/dagster/home/repo.py + # - /opt/homebrew/Caskroom/mambaforge/base/envs:/opt/homebrew/Caskroom/mambaforge/base/envs + + # Towel + # - /opt/dagster/src/towel.py:/opt/dagster/home/towel.py + # - /Users/rik/Seafile/Code/company/navara/Klanten/Eneco/towel/towel:/opt/dagster/home/app/towel + # - /Users/rik/Library/Caches/pypoetry/virtualenvs/towel-V7mtCF2c-py3.9:/venv/towel + +services: + # This service runs the postgres DB used by dagster for run storage, schedule storage, + # and event log storage. + postgresql: + image: postgres:11 + container_name: postgresql + environment: + <<: *postgres_env + networks: + - dagster + + # This service runs dagit, which loads your user code from the user code container. + # Since our instance uses the QueuedRunCoordinator, any runs submitted from dagit will be put on + # a queue and later dequeued and launched by dagster-daemon. + dagit: + build: + context: . + dockerfile: Dockerfile.system + entrypoint: + - dagster-webserver + - -h + - "0.0.0.0" + - -p + - "3000" + - -w + - workspace.yaml + container_name: dagit + expose: + - "3000" + ports: + - "3000:3000" + environment: + <<: *dagster_env + <<: *volumes + networks: + - dagster + depends_on: + - postgresql + - user_code + + # This service runs the dagster-daemon process, which is responsible for taking runs + # off of the queue and launching them, as well as creating runs from schedules or sensors. + daemon: + build: + context: . + dockerfile: Dockerfile.system + entrypoint: + - dagster-daemon + - run + container_name: daemon + restart: on-failure + environment: + <<: [ *dagster_env, *aws_env ] + <<: *volumes + networks: + - dagster + depends_on: + - postgresql diff --git a/docker-compose.yaml b/docker-compose.yaml new file mode 100644 index 0000000..421ce12 --- /dev/null +++ b/docker-compose.yaml @@ -0,0 +1,8 @@ +networks: + dagster: + driver: bridge + name: dagster + +include: + - docker-compose.system.yaml + - docker-compose.code.yaml \ No newline at end of file diff --git a/poetry.lock b/poetry.lock new file mode 100644 index 0000000..a0bdefb --- /dev/null +++ b/poetry.lock @@ -0,0 +1,19 @@ +# This file is automatically @generated by Poetry 1.7.1 and should not be changed by hand. + +[[package]] +name = "seven" +version = "1.0.0" +description = "Python 2.5 compatibility wrapper for Python 2.7 code." +optional = false +python-versions = "*" +files = [ + {file = "seven-1.0.0.tar.gz", hash = "sha256:e80157857dc378545b0cd8626668bf0e20d7f3608a5587f3fcc71a56d2416814"}, +] + +[package.extras] +tests = ["zope.testing"] + +[metadata] +lock-version = "2.0" +python-versions = "*" +content-hash = "edfc27fcb4a7dc1b1a11f2224d7b7f3e936c5f624df1dd86207c4dc08e047b5d" diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..f400be6 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,70 @@ +[project] +requires-python = "==3.12" +name = "dev" +authors = [ + { name = "Rik Veenboer", email = "rik.veenboer@gmail.com" } +] +version = "0.1.0" +dependencies = [ + "fastapi", + "gitpython", + "kubernetes", + "matplotlib", + "seaborn", + "openpyxl", + "xlsxwriter", + "pandas", + "pyarrow", + "pydantic[email]", + "pydantic-settings", + "pyyaml", + "requests", + "s3fs[boto3]", + "structlog", + "uvicorn", + "duckdb", + "geopandas", + "lxml", + "networkx", + "Pint", + "Pint-Pandas", + "boto3", + "influxdb-client", + "requests[socks]", + "beautifulsoup4", + "fastparquet", + "icecream" +] + +[project.optional-dependencies] +dev = [ + "black", + "isort", + "nbstripout", + "pip-tools", + "pre-commit", + "ruff", + "mypy" +] +local = [ + "ipykernel", + "ipywidgets" +] +dagster = [ + "dagster", + "dagster-graphql", + "dagster-postgres", + "dagster-docker", + "dagster-aws", + "dagster-polars", + "dagit" +] + +[tool.poetry] +name = "dev" +version = "0.1.0" +description = "" +authors = ["Rik Veenboer "] + +[tool.poetry.dependencies] +seven = "^1.0.0" diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..0935faa --- /dev/null +++ b/requirements.txt @@ -0,0 +1,245 @@ +# This file was autogenerated by uv via the following command: +# uv pip compile --output-file=requirements.txt pyproject.toml +aiobotocore==2.15.1 + # via s3fs +aiohappyeyeballs==2.4.3 + # via aiohttp +aiohttp==3.10.8 + # via + # aiobotocore + # s3fs +aioitertools==0.12.0 + # via aiobotocore +aiosignal==1.3.1 + # via aiohttp +annotated-types==0.7.0 + # via pydantic +anyio==4.6.0 + # via starlette +appdirs==1.4.4 + # via pint +asttokens==2.4.1 + # via icecream +async-timeout==4.0.3 + # via aiohttp +attrs==24.2.0 + # via aiohttp +beautifulsoup4==4.12.3 +boto3==1.35.23 + # via aiobotocore +botocore==1.35.23 + # via + # aiobotocore + # boto3 + # s3transfer +cachetools==5.5.0 + # via google-auth +certifi==2024.8.30 + # via + # influxdb-client + # kubernetes + # pyogrio + # pyproj + # requests +charset-normalizer==3.3.2 + # via requests +click==8.1.7 + # via uvicorn +colorama==0.4.6 + # via icecream +contourpy==1.3.0 + # via matplotlib +cramjam==2.8.4 + # via fastparquet +cycler==0.12.1 + # via matplotlib +dnspython==2.6.1 + # via email-validator +duckdb==1.1.1 +durationpy==0.8 + # via kubernetes +email-validator==2.2.0 + # via pydantic +et-xmlfile==1.1.0 + # via openpyxl +exceptiongroup==1.2.2 + # via anyio +executing==2.1.0 + # via icecream +fastapi==0.115.0 +fastparquet==2024.5.0 +flexcache==0.3 + # via pint +flexparser==0.3.1 + # via pint +fonttools==4.54.1 + # via matplotlib +frozenlist==1.4.1 + # via + # aiohttp + # aiosignal +fsspec==2024.9.0 + # via + # fastparquet + # s3fs +geopandas==1.0.1 +gitdb==4.0.11 + # via gitpython +gitpython==3.1.43 +google-auth==2.35.0 + # via kubernetes +h11==0.14.0 + # via uvicorn +icecream==2.1.3 +idna==3.10 + # via + # anyio + # email-validator + # requests + # yarl +influxdb-client==1.46.0 +jmespath==1.0.1 + # via + # boto3 + # botocore +kiwisolver==1.4.7 + # via matplotlib +kubernetes==31.0.0 +lxml==5.3.0 +matplotlib==3.9.2 + # via seaborn +multidict==6.1.0 + # via + # aiohttp + # yarl +networkx==3.3 +numpy==2.1.1 + # via + # contourpy + # fastparquet + # geopandas + # matplotlib + # pandas + # pyarrow + # pyogrio + # seaborn + # shapely +oauthlib==3.2.2 + # via + # kubernetes + # requests-oauthlib +openpyxl==3.1.5 +packaging==24.1 + # via + # fastparquet + # geopandas + # matplotlib + # pyogrio +pandas==2.2.3 + # via + # fastparquet + # geopandas + # pint-pandas + # seaborn +pillow==10.4.0 + # via matplotlib +pint==0.24.3 + # via pint-pandas +pint-pandas==0.6.2 +pyarrow==17.0.0 +pyasn1==0.6.1 + # via + # pyasn1-modules + # rsa +pyasn1-modules==0.4.1 + # via google-auth +pydantic==2.9.2 + # via + # fastapi + # pydantic-settings +pydantic-core==2.23.4 + # via pydantic +pydantic-settings==2.5.2 +pygments==2.18.0 + # via icecream +pyogrio==0.10.0 + # via geopandas +pyparsing==3.1.4 + # via matplotlib +pyproj==3.7.0 + # via geopandas +pysocks==1.7.1 + # via requests +python-dateutil==2.9.0.post0 + # via + # botocore + # influxdb-client + # kubernetes + # matplotlib + # pandas +python-dotenv==1.0.1 + # via pydantic-settings +pytz==2024.2 + # via pandas +pyyaml==6.0.2 + # via kubernetes +reactivex==4.0.4 + # via influxdb-client +requests==2.32.3 + # via + # kubernetes + # requests-oauthlib +requests-oauthlib==2.0.0 + # via kubernetes +rsa==4.9 + # via google-auth +s3fs==2024.9.0 +s3transfer==0.10.2 + # via boto3 +seaborn==0.13.2 +setuptools==75.1.0 + # via influxdb-client +shapely==2.0.6 + # via geopandas +six==1.16.0 + # via + # asttokens + # kubernetes + # python-dateutil +smmap==5.0.1 + # via gitdb +sniffio==1.3.1 + # via anyio +soupsieve==2.6 + # via beautifulsoup4 +starlette==0.38.6 + # via fastapi +structlog==24.4.0 +typing-extensions==4.12.2 + # via + # anyio + # fastapi + # flexcache + # flexparser + # multidict + # pint + # pydantic + # pydantic-core + # reactivex + # uvicorn +tzdata==2024.2 + # via pandas +urllib3==2.2.3 + # via + # botocore + # influxdb-client + # kubernetes + # requests +uvicorn==0.31.0 +websocket-client==1.8.0 + # via kubernetes +wrapt==1.16.0 + # via aiobotocore +xlsxwriter==3.2.0 +yarl==1.13.1 + # via aiohttp diff --git a/src/__init__.py b/src/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/app/__init__.py b/src/app/__init__.py new file mode 100644 index 0000000..ef37bac --- /dev/null +++ b/src/app/__init__.py @@ -0,0 +1,3 @@ +from icecream import install + +install() diff --git a/src/app/partitions/__init__.py b/src/app/partitions/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/app/partitions/assets.py b/src/app/partitions/assets.py new file mode 100644 index 0000000..7a1ecd5 --- /dev/null +++ b/src/app/partitions/assets.py @@ -0,0 +1,97 @@ +import polars as pl +from dagster import ( + AssetIn, + DailyPartitionsDefinition, + DimensionPartitionMapping, + IdentityPartitionMapping, + MultiPartitionMapping, + MultiPartitionsDefinition, + StaticPartitionsDefinition, + TimeWindowPartitionMapping, + asset, +) + +partitions_def_single = DailyPartitionsDefinition(start_date="2024-09-20") + +partitions_def_multi = MultiPartitionsDefinition( + { + "date": DailyPartitionsDefinition(start_date="2024-09-20"), + "source": StaticPartitionsDefinition(["plato", "sounds"]), + } +) + + +@asset( + io_manager_key="polars_parquet_io_manager", + partitions_def=partitions_def_single, + metadata={ + "partition_by": ["date"], + }, +) +def asset_single_1(context): + ic() + ic(context.partition_key) + return pl.DataFrame( + [{"date": context.partition_key, "data": f"Data for {context.partition_key}"}] + ) + + +@asset( + io_manager_key="polars_parquet_io_manager", + partitions_def=partitions_def_multi, + metadata={ + "partition_by": ["date", "source"], + }, +) +def asset_multi_1(context): + ic() + ic(context.partition_key) + + return pl.DataFrame( + [{"date": context.partition_key, "data": f"Data for {context.partition_key}"}] + ) + + +@asset( + partitions_def=partitions_def_single, + ins={ + "asset_single_1": AssetIn( + partition_mapping=TimeWindowPartitionMapping(start_offset=-1, end_offset=0) + ) + }, +) +def asset_single_2(context, asset_single_1): + ic() + ic(context.partition_key) + ic(asset_single_1.keys()) + partition_key = context.asset_partition_key_for_output() + return f"Processed data for {partition_key}" + + +partition_mapping = MultiPartitionMapping( + { + "date": DimensionPartitionMapping( + dimension_name="date", + partition_mapping=TimeWindowPartitionMapping(start_offset=-1, end_offset=0), + ), + "source": DimensionPartitionMapping( + dimension_name="source", + partition_mapping=IdentityPartitionMapping(), + ), + } +) + + +@asset( + partitions_def=partitions_def_multi, + ins={"asset_multi_1": AssetIn(partition_mapping=partition_mapping)}, +) +def asset_multi_2(context, asset_multi_1): + ic() + ic(context.partition_key) + ic(context.partition_key.keys_by_dimension) + ic(asset_multi_1) + + partition_key = context.asset_partition_key_for_output() + ic(partition_key) + return f"Processed data for {partition_key}" diff --git a/src/app/partitions/mapping.py b/src/app/partitions/mapping.py new file mode 100644 index 0000000..d155c01 --- /dev/null +++ b/src/app/partitions/mapping.py @@ -0,0 +1,108 @@ +from datetime import datetime +from typing import Optional + +from dagster import MultiPartitionKey, PartitionMapping, PartitionsDefinition +from dagster._core.definitions.partition import PartitionsSubset +from dagster._core.definitions.partition_mapping import ( + MultiPartitionMapping, + UpstreamPartitionsResult, +) +from dagster._core.instance import DynamicPartitionsStore +from dagster._serdes import whitelist_for_serdes + + +# @whitelist_for_serdes +class LatestTwoPartitionsMapping(PartitionMapping): + def get_upstream_mapped_partitions_result_for_partitions( + self, + downstream_partitions_subset: Optional[PartitionsSubset], + downstream_partitions_def: Optional[PartitionsDefinition], + upstream_partitions_def: PartitionsDefinition, + current_time: Optional[datetime] = None, + dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, + ) -> UpstreamPartitionsResult: + ic() + + # Get upstream partitions from the subset + all_partitions = upstream_partitions_def.get_partition_keys() + ic(all_partitions) + + if len(all_partitions) < 2: + raise ValueError("Not enough partitions to proceed.") + + # Select the last two partitions + partition_keys = [all_partitions[-2], all_partitions[-1]] + return UpstreamPartitionsResult( + upstream_partitions_def.subset_with_partition_keys(partition_keys), [] + ) + + def get_downstream_partitions_for_partitions( + self, + upstream_partitions_subset: PartitionsSubset, + downstream_partitions_def, + upstream_partitions_def, + ) -> PartitionsSubset: + ic() + # Get the downstream partition that corresponds to the latest upstream partition + downstream_partition_key = upstream_partitions_subset.get_partition_keys()[-1] + return downstream_partitions_def.subset_with_partition_keys( + [downstream_partition_key] + ) + + @property + def description(self): + return "Maps to the latest two upstream partitions." + + +@whitelist_for_serdes +class X(MultiPartitionMapping): + def get_upstream_partitions_for_partition_range( + self, + downstream_partition_range, + upstream_partitions_def, + downstream_partitions_def, + ) -> UpstreamPartitionsResult: + ic() + + # Extract downstream partition range keys + downstream_keys = downstream_partition_range.get_partition_keys() + + # Initialize a list to hold the upstream partition keys + upstream_keys = [] + + # Iterate over each downstream partition key + for downstream_key in downstream_keys: + # Parse the MultiPartitionKey + downstream_mpk = MultiPartitionKey.from_str(downstream_key) + + for i in [1, 2]: + # Shift the daily partition by one day + shifted_date = datetime.strptime( + downstream_mpk.keys_by_dimension["date"], "%Y-%m-%d" + ) - timedelta(days=i) + + # Recreate the MultiPartitionKey with the shifted daily partition + upstream_mpk = MultiPartitionKey( + { + "source": downstream_mpk.keys_by_dimension["source"], + "date": shifted_date.strftime("%Y-%m-%d"), + } + ) + + # Add the upstream partition key + upstream_keys.append(upstream_mpk.to_string()) + + return UpstreamPartitionsResult( + upstream_partitions_def.subset_with_partition_keys(upstream_keys), [] + ) + + def get_downstream_partitions_for_partition_range( + self, + upstream_partition_range, + downstream_partitions_def, + upstream_partitions_def, + ) -> PartitionsSubset: + # This method would map upstream partitions back to downstream, but for simplicity, let's assume it's symmetric. + return self.get_upstream_partitions_for_partition_range( + upstream_partition_range, upstream_partitions_def, downstream_partitions_def + ) diff --git a/src/app/partitions/repo.py b/src/app/partitions/repo.py new file mode 100644 index 0000000..ca10abd --- /dev/null +++ b/src/app/partitions/repo.py @@ -0,0 +1,13 @@ +from dagster import Definitions, define_asset_job +from dagster_polars import PolarsParquetIOManager + +from .assets import asset_multi_1, asset_multi_2, asset_single_1, asset_single_2 + +# Define a job that includes both assets +daily_job = define_asset_job("daily_job", selection=[asset_multi_1, asset_multi_2]) + +vinyl = Definitions( + assets=[asset_single_1, asset_multi_1, asset_single_2, asset_multi_2], + resources={"polars_parquet_io_manager": PolarsParquetIOManager()}, + jobs=[daily_job], +) diff --git a/src/app/partitions/test.py b/src/app/partitions/test.py new file mode 100644 index 0000000..7b2dfee --- /dev/null +++ b/src/app/partitions/test.py @@ -0,0 +1,55 @@ +from dagster import materialize +from dagster_polars import PolarsParquetIOManager + +from app.vinyl.assets import ( + asset_multi_1, + asset_multi_2, + asset_single_1, + asset_single_2, +) + +resources = { + "polars_parquet_io_manager": PolarsParquetIOManager( + base_dir="/opt/dagster/storage" + ) +} + + +def test_single(): + result = materialize( + [asset_single_1, asset_single_2], + partition_key="2024-10-02", + resources=resources, + ) + assert result.success + ic(result.asset_value) + + +def test_multi(): + # result = materialize([ + # asset_multi_1 + # ], partition_key="2024-10-01|plato", resources=resources + # ) + # assert result.success + # ic(result.asset_value) + # + # + # result = materialize([ + # asset_multi_1 + # ], partition_key="2024-10-02|plato", resources=resources + # ) + # assert result.success + # ic(result.asset_value) + + result = materialize( + [asset_multi_1, asset_multi_2], + partition_key="2024-10-02|plato", + resources=resources, + ) + assert result.success + ic(result.asset_value) + + +if __name__ == "__main__": + # test_single() + test_multi() diff --git a/src/app/sync.sh b/src/app/sync.sh new file mode 100644 index 0000000..68ab2f5 --- /dev/null +++ b/src/app/sync.sh @@ -0,0 +1,7 @@ +#!/usr/bin/env bash +rsync -av /opt/dagster/src/app/vinyl/ \ + /Volumes/dagster/src/app/vinyl/ \ + --include='*.py' \ + --exclude='__pycache__/' \ + -progress \ + --delete $* diff --git a/src/app/test.py b/src/app/test.py new file mode 100644 index 0000000..f656f5b --- /dev/null +++ b/src/app/test.py @@ -0,0 +1,34 @@ +import time + +from dagster import AssetMaterialization, Output, config_mapping, job, op + + +@op(config_schema={"config_param": str}) +def hello(context): + time.sleep(1) + print("halllo") + return Output(123, metadata={"aa": context.op_config["config_param"]}) + + +@op +def goodbye(context, x: int): + time.sleep(2) + print("doooei", x) + context.log_event( + AssetMaterialization( + asset_key="my_asset", + metadata={"my_meta": 444}, + description="A very useful value!", + ) + ) + return 2 + + +@config_mapping(config_schema={"simplified_param": str}) +def simplified_config(val): + return {"ops": {"hello": {"config": {"config_param": val["simplified_param"]}}}} + + +@job +def my_job(): + goodbye(hello()) diff --git a/src/app/vinyl/__init__.py b/src/app/vinyl/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/app/vinyl/assets.py b/src/app/vinyl/assets.py new file mode 100644 index 0000000..eb22df0 --- /dev/null +++ b/src/app/vinyl/assets.py @@ -0,0 +1,115 @@ +from datetime import datetime +from glob import glob + +import polars as pl +import structlog +from dagster import ( + AssetIn, + DailyPartitionsDefinition, + DimensionPartitionMapping, + IdentityPartitionMapping, + MultiPartitionMapping, + MultiPartitionsDefinition, + StaticPartitionsDefinition, + TimeWindowPartitionMapping, + asset, Failure, Field, ) + +from app.vinyl.plato.check_plato import scrape_plato +from app.vinyl.sounds.fetch import fetch_deals + +SOURCES = ["plato", "sounds"] + +logger = structlog.get_logger() + +partitions_def = MultiPartitionsDefinition( + { + "date": DailyPartitionsDefinition(start_date="2024-09-01", end_offset=1), + "source": StaticPartitionsDefinition(SOURCES), + } +) + +partition_mapping = MultiPartitionMapping( + { + "date": DimensionPartitionMapping( + dimension_name="date", + partition_mapping=TimeWindowPartitionMapping(start_offset=-1, end_offset=0), + ), + "source": DimensionPartitionMapping( + dimension_name="source", + partition_mapping=IdentityPartitionMapping(), + ), + } +) + + +@asset( + io_manager_key="polars_parquet_io_manager", + partitions_def=partitions_def, + metadata={ + "partition_by": ["date", "source"], + }, + config_schema={"import_dir": Field(str, default_value="/opt/dagster/home/storage/import")}, +) +def deals(context): + ic() + ic(context.partition_key) + ic(context.op_config) + import_dir = context.op_config["import_dir"] + partition_key = context.partition_key.keys_by_dimension + date_str = partition_key["date"] + source = partition_key["source"] + logger.info("Materializing deals", date=partition_key["date"], source=source) + + date = datetime.strptime(partition_key["date"], "%Y-%m-%d") + days = (date - datetime.today()).days + ic(days) + if days > 0: + raise Failure(f"Cannot materialize for the future: {date.date()}") + if days < -1: + if source == "sounds": + pattern = f"{import_dir}/{date.date()}_*_sounds.csv" + logger.info("Looking for existing CSV files", pattern=pattern) + files = glob(pattern) + if len(files): + file = sorted(files)[-1] + logger.info("Using existing CSV file", file=file) + try: + df = pl.read_csv(file)[["id", "name", "price"]] + logger.info("Loaded CSV file", rows=len(df)) + return df.with_columns(**{k: pl.lit(v) for k, v in partition_key.items()}) + except Exception as e: + logger.error("Failed to load CSV file!", error=e) + raise Failure(f"Cannot materialize for the past: {date.date()}") + + if source == "plato": + logger.info("Scraping Plato") + df = scrape_plato() + logger.info("Scraped Plato", rows=len(df), head=df.head().to_markdown()) + ic(df.columns) + return pl.from_pandas(df.assign(**partition_key)) + if source == "sounds": + logger.info("Scraping Sounds") + df = fetch_deals() + ic(df.columns) + df = df[["id", "name", "price"]] + logger.info("Scraped Sounds", rows=len(df), head=df.head().to_markdown()) + return pl.from_pandas(df.assign(**partition_key)) + + return pl.DataFrame( + [{"date": context.partition_key, "data": f"Data for {context.partition_key}"}] + ) + + +@asset( + partitions_def=partitions_def, + ins={"asset_multi_1": AssetIn(partition_mapping=partition_mapping)}, +) +def new_deals(context, asset_multi_1): + ic() + ic(context.partition_key) + ic(context.partition_key.keys_by_dimension) + ic(asset_multi_1) + + partition_key = context.asset_partition_key_for_output() + ic(partition_key) + return f"Processed data for {partition_key}" diff --git a/src/app/vinyl/jobs.py b/src/app/vinyl/jobs.py new file mode 100644 index 0000000..03ef1fb --- /dev/null +++ b/src/app/vinyl/jobs.py @@ -0,0 +1,39 @@ +from dagster import job, OpExecutionContext, op, \ + AssetMaterialization, AssetKey, define_asset_job + +from .assets import deals + +deals_job = define_asset_job("deals_job", selection=[deals], partitions_def=deals.partitions_def) + + +@op +def check_partititions(context: OpExecutionContext): + # Replace with your asset/job name + asset_key = "deals" + + context.log_event( + AssetMaterialization(asset_key=asset_key, partition="2024-09-30|sounds") + ) + + # Fetch the materializations for the asset key + materializations = context.instance.get_materialized_partitions( + asset_key=AssetKey(asset_key) + ) + context.log.info("Existing partitions", extra=dict(partitions=materializations)) + + import polars as pl + storage_dir = context.instance.storage_directory() + ic(storage_dir) + for row in pl.scan_parquet(f'{storage_dir}/{asset_key}/*/*.parquet').select( + ['date', 'source']).unique().collect().iter_rows(): + partition = '|'.join(row) + if partition not in materializations: + context.log.info(f"Missing partition: {partition}") + context.log_event( + AssetMaterialization(asset_key=asset_key, partition=partition) + ) + + +@job +def check_partititions_job(): + check_partititions() diff --git a/src/app/vinyl/plato/__init__.py b/src/app/vinyl/plato/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/app/vinyl/plato/check_plato.py b/src/app/vinyl/plato/check_plato.py new file mode 100755 index 0000000..5ba8812 --- /dev/null +++ b/src/app/vinyl/plato/check_plato.py @@ -0,0 +1,191 @@ +#!/root/.pyenv/versions/dev/bin/python + +import os +import re +from datetime import datetime + +import boto3 +import pandas as pd +from botocore.exceptions import NoCredentialsError, PartialCredentialsError +from dotenv import load_dotenv + +from .scrape import * + + +def scrape_plato(get=None): + ic() + url = 'https://www.platomania.nl/vinyl-aanbiedingen?page=1' + + ic(url) + soup = get_soup(url=url, get=get) + articles_info = scrape_page(soup) + ic(len(articles_info)) + + links = sorted(set(scrape_page_links(soup)), key=lambda x: int(x.split('=')[-1])) + for link in links: + ic(link) + soup = get_soup(url=link, get=get) + tmp = scrape_page(soup) + ic(len(tmp)) + articles_info.extend(tmp) + # break + + def clean(name): + tmp = ' '.join(reversed(name.split(', '))) + tmp = tmp.lower() + tmp = re.sub(r'\s+\([^\)]*\)', '', tmp) + return tmp + + articles_df = pd.DataFrame(articles_info) + articles_df['_artist'] = articles_df['artist'].map(clean) + articles_df['_price'] = articles_df['price'].map(lambda x: float(x.split(' ')[-1])) + articles_df['_date'] = datetime.now() + + return articles_df + + +def update_database(articles_df=None, database_file='/home/user/plato.parquet'): + if os.path.exists(database_file): + database_df = pd.read_parquet(database_file) + else: + database_df = None + + if articles_df is None: + new_df = None if database_df is None else database_df.head(0) + else: + if database_df is None: + articles_df.to_parquet(database_file) + return articles_df, articles_df + + compare = ['ean', '_price'] + check_df = pd.merge( + database_df[compare], + articles_df[compare], + how='right', + indicator=True + ) + new_df = check_df[check_df['_merge'] == 'right_only'].drop(columns='_merge').merge(articles_df) + database_df = pd.concat([ + database_df, + new_df + ]).sort_values('_date').groupby('ean').last().reset_index() + database_df.to_parquet(database_file) + + return database_df, new_df + + +def send_email(lines): + # Define the email parameters + SENDER = "mail@veenboer.xyz" + RECIPIENT = "rik.veenboer@gmail.com" + SUBJECT = "Aanbieding op plato!" + + # The email body for recipients with non-HTML email clients + BODY_TEXT = "" + + # The HTML body of the email + tmp = '\n'.join(lines) + BODY_HTML = f""" + + + {tmp} + + """ + + # The character encoding for the email + CHARSET = "UTF-8" + + # Try to send the email + try: + client = boto3.client('ses', region_name='eu-west-1') # Change the region as needed + + # Provide the contents of the email + response = client.send_email( + Destination={ + 'ToAddresses': [ + RECIPIENT, + ], + }, + Message={ + 'Body': { + 'Html': { + 'Charset': CHARSET, + 'Data': BODY_HTML, + }, + 'Text': { + 'Charset': CHARSET, + 'Data': BODY_TEXT, + }, + }, + 'Subject': { + 'Charset': CHARSET, + 'Data': SUBJECT, + }, + }, + Source=SENDER, + ) + # Display an error if something goes wrong. + except NoCredentialsError: + print("Credentials not available") + except PartialCredentialsError: + print("Incomplete credentials provided") + except Exception as e: + print(f"Error: {e}") + else: + print("Email sent! Message ID:"), + print(response['MessageId']) + + +def get(url, proxy=True): + if proxy: + tmp = 'socks5://localhost:1080' + kwargs = dict(proxies=dict(http=tmp, https=tmp)) + else: + kwargs = {} + return requests.get(url, **kwargs) + + +def main(dry=False): + load_dotenv('/opt/.env') + + local_ip = get('http://ifconfig.me', False).text + get_ip = get('http://ifconfig.me').text + print(f'Local IP = {local_ip}') + print(f'Request IP = {get_ip}') + assert local_ip != get_ip + + artists = open('/home/user/artists.txt').read().strip().splitlines() + print(f'Number of known artists = {len(artists)}') + + if dry: + articles_df = None + else: + articles_df = scrape_plato(get=get) + database_df, new_df = update_database(articles_df) + + if dry: + new_df = database_df.sample(20) + + print(f'Database size = {len(database_df)}') + print(f'New = {len(new_df)}') + + # new_df = new_df[new_df['_artist'].isin(artists)].query('_price <= 25') + new_df = new_df.query('_price <= 25 and ean != ""') + print(f'Interesting = {len(new_df)}') + + if new_df is not None and len(new_df): + message = [] + for _, row in new_df.head(10).iterrows(): + message.append(f'

NEW

') + message.append('') + send_email(message) + + +if __name__ == '__main__': + cwd = os.path.dirname(__file__) + main(dry=False) diff --git a/src/app/vinyl/plato/scrape.py b/src/app/vinyl/plato/scrape.py new file mode 100644 index 0000000..33ce325 --- /dev/null +++ b/src/app/vinyl/plato/scrape.py @@ -0,0 +1,77 @@ +import requests +from bs4 import BeautifulSoup + + + +def get_soup(url, get = None): + # Send a GET request to the specified URL + if get is None: + get = requests.get + response = get(url) + + # Check if the request was successful + if response.status_code == 200: + # Parse the HTML content of the page + return BeautifulSoup(response.content, 'html.parser') + else: + raise ValueError(f"Failed to retrieve the page. Status code: {response.status_code}") + + +def scrape_page_links(soup): + # Find all
  • elements with class "page-item" + page_items = soup.find_all('li', class_='page-item') + + # Extract the href attribute of tags within these
  • elements + links = [] + for item in page_items: + a_tag = item.find('a', class_='page-link') + if a_tag and 'href' in a_tag.attrs: + links.append(a_tag['href']) + + return links + + +def extract_article_info(article): + info = {} + + # Extract the artist name + artist_tag = article.find('h1', class_='product-card__artist') + info['artist'] = artist_tag.text.strip() if artist_tag else None + + # Extract the title and URL + title_tag = article.find('h2', class_='product-card__title') + info['title'] = title_tag.text.strip() if title_tag else None + url_tag = title_tag.find_parent('a') if title_tag else None + info['url'] = url_tag['href'] if url_tag else None + + # Extract additional details + details = article.find_all('div', class_='article-details__text') + for detail in details: + text = detail.text.strip() + if 'Label:' in text: + info['label'] = text.replace('Label: ', '').strip() + elif 'Releasedatum:' in text: + info['release_date'] = text.replace('Releasedatum: ', '').strip() + elif 'Herkomst:' in text: + info['origin'] = text.replace('Herkomst: ', '').strip() + elif 'Item-nr:' in text: + info['item_number'] = text.replace('Item-nr: ', '').strip() + elif 'EAN:' in text: + info['ean'] = text.replace('EAN:', '').strip() + + # Extract delivery information + delivery_tag = article.find('div', class_='article-details__delivery-text') + info['delivery_info'] = delivery_tag.text.strip() if delivery_tag else None + + # Extract price + price_tag = article.find('div', class_='article__price') + info['price'] = price_tag.text.strip() if price_tag else None + + return info + +def scrape_page(soup): + # Find all article blocks + article_blocks = soup.find_all('article', class_='article LP') + + # Extract information from each article block + return [extract_article_info(article) for article in article_blocks] diff --git a/src/app/vinyl/repo.py b/src/app/vinyl/repo.py new file mode 100644 index 0000000..959238d --- /dev/null +++ b/src/app/vinyl/repo.py @@ -0,0 +1,13 @@ +from dagster import Definitions +from dagster_polars import PolarsParquetIOManager + +from .assets import deals +from .jobs import deals_job, check_partititions_job +from .schedules import deals_schedule + +vinyl = Definitions( + assets=[deals], + resources={"polars_parquet_io_manager": PolarsParquetIOManager()}, + jobs=[deals_job, check_partititions_job], + schedules=[deals_schedule] +) diff --git a/src/app/vinyl/schedules.py b/src/app/vinyl/schedules.py new file mode 100644 index 0000000..d3f4bf1 --- /dev/null +++ b/src/app/vinyl/schedules.py @@ -0,0 +1,10 @@ +from dagster import DefaultScheduleStatus, build_schedule_from_partitioned_job + +from app.vinyl.repo import deals_job + +deals_schedule = build_schedule_from_partitioned_job( + job=deals_job, + hour_of_day=7, + # execution_timezone="Europe/Amsterdam", + default_status=DefaultScheduleStatus.RUNNING +) diff --git a/src/app/vinyl/sounds/__init__.py b/src/app/vinyl/sounds/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/app/vinyl/sounds/deals.py b/src/app/vinyl/sounds/deals.py new file mode 100644 index 0000000..21729af --- /dev/null +++ b/src/app/vinyl/sounds/deals.py @@ -0,0 +1,80 @@ +#!/usr/bin/python3 + +import glob +import os +from datetime import datetime + +import pandas as pd + + +def get_csvs(directory, n): + # List all files matching the pattern *_sounds.csv + suffix = "_sounds.csv" + files = glob.glob(os.path.join(directory, f"*{suffix}")) + + # Function to extract date from filename + def extract_date_from_filename(filename): + # Extract the date string + basename = os.path.basename(filename) + date_str = basename.split(suffix)[0] + try: + return datetime.strptime(date_str, "%Y-%m-%d_%H:%M:%S") + except ValueError: + # The date string cannot be parsed + return None + + # Create a list of tuples (date, filename), ignoring files with unparsable dates + result = [(extract_date_from_filename(file), file) for file in files] + result = [item for item in result if item[0] is not None] + + # Sort the list by date in descending order (most recent first) + result.sort(key=lambda x: x[0], reverse=True) + + # Return the two most recent files + return [x[1] for x in result[:n]] + + +def analyze(df1, df2): + df1 = df1.drop_duplicates(subset="id") + df2 = df2.drop_duplicates(subset="id") + combined_df = pd.merge( + df1[["id", "price"]], df2, on="id", how="right", indicator=True + ) + combined_df["discount"] = combined_df.price_y - combined_df.price_x + combined_df.drop(columns=["price_x"], inplace=True) + combined_df.rename(columns={"price_y": "price"}, inplace=True) + + deals = combined_df.query("discount < 0").sort_values(by="discount")[ + ["id", "name", "price", "discount"] + ] + new = combined_df.query("_merge == 'right_only'").sort_values(by="price")[ + ["id", "name", "price"] + ] + return deals, new + + +if __name__ == "__main__": + csvs = get_csvs(".", 100) + + for i in range(1, len(csvs)): + print(f"Comparing {csvs[i]} with {csvs[0]}") + df_previous = pd.read_csv(csvs[i], index_col=0) + df_latest = pd.read_csv(csvs[0], index_col=0) + deals, new = analyze(df_previous, df_latest) + + done = False + + if len(deals) > 0: + print() + print("New items:") + print(new) + print() + done = True + + if len(deals) > 0: + print(f"Discounted items:") + print(deals) + done = True + + if done: + break diff --git a/src/app/vinyl/sounds/fetch.py b/src/app/vinyl/sounds/fetch.py new file mode 100644 index 0000000..7be3185 --- /dev/null +++ b/src/app/vinyl/sounds/fetch.py @@ -0,0 +1,84 @@ +#!/usr/bin/python3 + +import time +from datetime import datetime + +import pandas as pd +import requests +from bs4 import BeautifulSoup +from tqdm import tqdm + + +def get_page_count(html_content): + soup = BeautifulSoup(html_content, "html.parser") + + # Find all pagination links + page_links = soup.select("ul.pagination li a") + + # Extract the numbers from the hrefs and convert to integers + page_numbers = [ + int(link.get_text()) for link in page_links if link.get_text().isdigit() + ] + + return max(page_numbers) + + +def parse_page(html_content): + soup = BeautifulSoup(html_content, "html.parser") + + # Extract the name (artist - album) from the h5 tag + names = list(map(lambda x: x.get_text(strip=True), soup.find_all("h5"))) + + # Remove 'Telefoon', 'E-mail', 'Facebook' + names = list(filter(lambda x: " -" in x, names)) + + # Extract the numerical id from the a tag + ids = list(map(lambda x: x["rel"][0], soup.find_all("a", rel=True))) + + # Extract the price + prices = list( + map( + lambda x: float(x.get_text(strip=True).split()[1]), + soup.find_all("span", class_="product-price"), + ) + ) + + df = pd.DataFrame({"id": ids, "name": names, "price": prices}) + + return df + + +def fetch_deals(): + # Get page count + page_count = get_page_count( + requests.get("https://www.sounds.nl/uitverkoop/1/lp/all/art").text + ) + time.sleep(1) + print(f"Number of pages: {page_count}") + + # Parse all pages + base_url = "https://www.sounds.nl/uitverkoop/{page_number}/lp/all" + dfs = [] + for i in tqdm(range(page_count)): + df = parse_page(requests.get(base_url.format(page_number=i)).text) + dfs.append(df) + time.sleep(2) + + # Combine dfs + return pd.concat(dfs) if dfs else pd.DataFrame(columns=["id", "name", "price"]) + + +if __name__ == "__main__": + df = fetch_deals() + print(f"Found {len(df)} deals") + + # Show current deals + print(df.sort_values(by="price").head(10)) + + # Write to file + now = datetime.now() + prefix = now.strftime("%Y-%m-%d_%H:%M:%S") + directory = "/home/bram/src/python" + filepath = f"{directory}/{prefix}_sounds.csv" + print(f"Writing data to {filepath}") + df.to_csv(filepath) diff --git a/src/app/vinyl/test.py b/src/app/vinyl/test.py new file mode 100644 index 0000000..50d8bfb --- /dev/null +++ b/src/app/vinyl/test.py @@ -0,0 +1,48 @@ +import warnings +from datetime import datetime + +from dagster import materialize +from dagster_polars import PolarsParquetIOManager + +from app.vinyl.assets import ( + deals +) +from app.vinyl.jobs import check_partititions_job + +warnings.filterwarnings("ignore", category=UserWarning) + +import logging + +logging.getLogger().setLevel(logging.INFO) + +resources = { + "polars_parquet_io_manager": PolarsParquetIOManager( + base_dir="/opt/dagster/storage" + ) +} + + +def test_deals( + source="sounds", + date: str = None +): + if not date: + today = datetime.today().strftime("%Y-%m-%d") + date = today + + result = materialize( + [deals], + partition_key=f"{date}|{source}", + resources=resources, + run_config={"loggers": {"console": {"config": {"log_level": "ERROR"}}}, + "ops": {"deals": {"config": {"import_dir": "/opt/dagster/storage/import"}}} + + } + ) + assert result.success + ic(result.asset_value) + + +if __name__ == "__main__": + # test_deals(source="plato") + check_partititions_job.execute_in_process() diff --git a/workspace.yaml b/workspace.yaml new file mode 100644 index 0000000..bbd9fb7 --- /dev/null +++ b/workspace.yaml @@ -0,0 +1,27 @@ +load_from: + - grpc_server: + location_name: example + host: user_code + port: 4000 + +# - python_file: +# location_name: local +# relative_path: repo.py +# working_directory: /opt/dagster/home + +# - grpc_server: +# location_name: other +# host: other_image +# port: 4000 + +# - python_file: +# location_name: towel +# relative_path: towel.py +# working_directory: /opt/dagster/home +# executable_path: /venv/towel/bin/python + +# - python_file: +# location_name: alliander +# relative_path: towel.py +# working_directory: /opt/dagster/home +# executable_path: /venv/alliander/bin/python