diff --git a/dagster/Dockerfile.code b/dagster/Dockerfile.code deleted file mode 100755 index ac97ab0..0000000 --- a/dagster/Dockerfile.code +++ /dev/null @@ -1,20 +0,0 @@ -FROM python:3.12-slim - -# Checkout and install dagster libraries needed to run the gRPC server -# exposing your repository to dagit and dagster-daemon, and to load the DagsterInstance - -COPY dagster-requirements.txt requirements.txt -RUN pip install uv -RUN uv pip install -r requirements.txt --system -RUN uv pip install polars-lts-cpu --system - -# Add repository code -WORKDIR /opt/dagster/home - -# Run dagster gRPC server on port 4000 -EXPOSE 4000 - -# CMD allows this to be overridden from run launchers or executors that want -# to run other commands against your repository -#CMD ["dagster", "api", "grpc", "-h", "0.0.0.0", "-p", "4000", "-f", "repo.py"] -CMD ["dagster", "code-server", "start", "-h", "0.0.0.0", "-p", "4000", "-f", "repo.py"] diff --git a/dagster/Dockerfile.system b/dagster/Dockerfile.system deleted file mode 100755 index a23e6ff..0000000 --- a/dagster/Dockerfile.system +++ /dev/null @@ -1,18 +0,0 @@ -# Dagster libraries to run both dagit and the dagster-daemon. Does not -# need to have access to any pipeline code. - -FROM python:3.12-slim - -COPY dagster-requirements.txt requirements.txt -RUN pip install uv -RUN uv pip install -r requirements.txt --system -RUN uv pip install polars-lts-cpu --system - -# Set $DAGSTER_HOME and copy dagster instance and workspace YAML there -ENV DAGSTER_HOME=/opt/dagster/home/ - -RUN mkdir -p $DAGSTER_HOME - -COPY dagster.yaml workspace.yaml $DAGSTER_HOME - -WORKDIR $DAGSTER_HOME diff --git a/dagster/Makefile b/dagster/Makefile deleted file mode 100644 index ff04132..0000000 --- a/dagster/Makefile +++ /dev/null @@ -1,20 +0,0 @@ -requirements.txt: pyproject.toml - uv pip compile $(UPGRADE) --output-file=requirements.txt pyproject.toml >/dev/null - -dagster-requirements.txt: requirements.txt pyproject.toml - uv pip compile $(UPGRADE) --constraint=requirements.txt --output-file=dagster-requirements.txt --extra=dagster pyproject.toml >/dev/null - -sync: virtualenv - uv pip sync requirements.txt - -upgrade-deps: virtualenv - touch pyproject.toml - $(MAKE) UPGRADE="--upgrade" dev-requirements.txt - -install-tools: virtualenv - pip install $(UPGRADE) pip wheel pip-tools uv - -upgrade-tools: virtualenv - $(MAKE) UPGRADE="--upgrade" install-tools - -upgrade: upgrade-tools upgrade-pre-commit upgrade-deps sync diff --git a/dagster/dagster-requirements.txt b/dagster/dagster-requirements.txt deleted file mode 100755 index 41e78bf..0000000 --- a/dagster/dagster-requirements.txt +++ /dev/null @@ -1,398 +0,0 @@ -# This file was autogenerated by uv via the following command: -# uv pip compile --constraint=requirements.txt --output-file=dagster-requirements.txt --extra=dagster pyproject.toml -aiobotocore==2.15.1 - # via s3fs -aiohappyeyeballs==2.4.3 - # via aiohttp -aiohttp==3.10.8 - # via - # aiobotocore - # s3fs -aioitertools==0.12.0 - # via aiobotocore -aiosignal==1.3.1 - # via aiohttp -alembic==1.13.3 - # via dagster -aniso8601==9.0.1 - # via graphene -annotated-types==0.7.0 - # via pydantic -anyio==4.6.0 - # via - # gql - # starlette - # watchfiles -appdirs==1.4.4 - # via pint -asttokens==2.4.1 - # via icecream -attrs==24.2.0 - # via aiohttp -backoff==2.2.1 - # via gql -beautifulsoup4==4.12.3 -boto3==1.35.23 - # via - # aiobotocore - # dagster-aws -botocore==1.35.23 - # via - # aiobotocore - # boto3 - # s3transfer -cachetools==5.5.0 - # via google-auth -certifi==2024.8.30 - # via - # influxdb-client - # kubernetes - # pyogrio - # pyproj - # requests -charset-normalizer==3.3.2 - # via requests -click==8.1.7 - # via - # dagster - # dagster-webserver - # uvicorn -colorama==0.4.6 - # via icecream -coloredlogs==14.0 - # via dagster -contourpy==1.3.0 - # via matplotlib -cramjam==2.8.4 - # via fastparquet -croniter==3.0.3 - # via dagster -cycler==0.12.1 - # via matplotlib -dagit==1.8.9 -dagster==1.8.9 - # via - # dagster-aws - # dagster-docker - # dagster-duckdb - # dagster-duckdb-pandas - # dagster-graphql - # dagster-polars - # dagster-postgres - # dagster-webserver -dagster-aws==0.24.9 -dagster-docker==0.24.9 -dagster-duckdb==0.24.9 - # via dagster-duckdb-pandas -dagster-duckdb-pandas==0.24.9 -dagster-graphql==1.8.9 - # via dagster-webserver -dagster-pipes==1.8.9 - # via dagster -dagster-polars==0.24.9 -dagster-postgres==0.24.9 -dagster-webserver==1.8.9 - # via dagit -dnspython==2.6.1 - # via email-validator -docker==7.1.0 - # via dagster-docker -docker-image-py==0.1.13 - # via dagster-docker -docstring-parser==0.16 - # via dagster -duckdb==1.1.1 - # via dagster-duckdb -durationpy==0.8 - # via kubernetes -email-validator==2.2.0 - # via pydantic -et-xmlfile==1.1.0 - # via openpyxl -executing==2.1.0 - # via icecream -fastapi==0.115.0 -fastparquet==2024.5.0 -filelock==3.16.1 - # via dagster -flexcache==0.3 - # via pint -flexparser==0.3.1 - # via pint -fonttools==4.54.1 - # via matplotlib -frozenlist==1.4.1 - # via - # aiohttp - # aiosignal -fsspec==2024.9.0 - # via - # fastparquet - # s3fs - # universal-pathlib -geopandas==1.0.1 -gitdb==4.0.11 - # via gitpython -gitpython==3.1.43 -google-auth==2.35.0 - # via kubernetes -gql==3.5.0 - # via dagster-graphql -graphene==3.3 - # via dagster-graphql -graphql-core==3.2.4 - # via - # gql - # graphene - # graphql-relay -graphql-relay==3.2.0 - # via graphene -grpcio==1.66.2 - # via - # dagster - # grpcio-health-checking -grpcio-health-checking==1.62.3 - # via dagster -h11==0.14.0 - # via uvicorn -httptools==0.6.1 - # via uvicorn -humanfriendly==10.0 - # via coloredlogs -icecream==2.1.3 -idna==3.10 - # via - # anyio - # email-validator - # requests - # yarl -influxdb-client==1.46.0 -jinja2==3.1.4 - # via dagster -jmespath==1.0.1 - # via - # boto3 - # botocore -kiwisolver==1.4.7 - # via matplotlib -kubernetes==31.0.0 -lxml==5.3.0 -mako==1.3.5 - # via alembic -markdown-it-py==3.0.0 - # via rich -markupsafe==2.1.5 - # via - # jinja2 - # mako -matplotlib==3.9.2 - # via seaborn -mdurl==0.1.2 - # via markdown-it-py -multidict==6.1.0 - # via - # aiohttp - # yarl -networkx==3.3 -numpy==2.1.1 - # via - # contourpy - # fastparquet - # geopandas - # matplotlib - # pandas - # pyarrow - # pyogrio - # seaborn - # shapely -oauthlib==3.2.2 - # via - # kubernetes - # requests-oauthlib -openpyxl==3.1.5 -packaging==24.1 - # via - # dagster - # dagster-aws - # fastparquet - # geopandas - # matplotlib - # pyogrio -pandas==2.2.3 - # via - # dagster-duckdb-pandas - # fastparquet - # geopandas - # pint-pandas - # seaborn -pillow==10.4.0 - # via matplotlib -pint==0.24.3 - # via pint-pandas -pint-pandas==0.6.2 -polars==1.9.0 - # via dagster-polars -protobuf==4.25.5 - # via - # dagster - # grpcio-health-checking -psycopg2-binary==2.9.9 - # via dagster-postgres -pyarrow==17.0.0 - # via dagster-polars -pyasn1==0.6.1 - # via - # pyasn1-modules - # rsa -pyasn1-modules==0.4.1 - # via google-auth -pydantic==2.9.2 - # via - # dagster - # fastapi - # pydantic-settings -pydantic-core==2.23.4 - # via pydantic -pydantic-settings==2.5.2 -pygments==2.18.0 - # via - # icecream - # rich -pyogrio==0.10.0 - # via geopandas -pyparsing==3.1.4 - # via matplotlib -pyproj==3.7.0 - # via geopandas -pysocks==1.7.1 - # via requests -python-dateutil==2.9.0.post0 - # via - # botocore - # croniter - # influxdb-client - # kubernetes - # matplotlib - # pandas -python-dotenv==1.0.1 - # via - # dagster - # pydantic-settings - # uvicorn -pytz==2024.2 - # via - # croniter - # dagster - # pandas -pyyaml==6.0.2 - # via - # dagster - # kubernetes - # uvicorn -reactivex==4.0.4 - # via influxdb-client -regex==2024.9.11 - # via docker-image-py -requests==2.32.3 - # via - # dagster - # dagster-aws - # dagster-graphql - # docker - # gql - # kubernetes - # requests-oauthlib - # requests-toolbelt -requests-oauthlib==2.0.0 - # via kubernetes -requests-toolbelt==1.0.0 - # via gql -rich==13.8.1 - # via dagster -rsa==4.9 - # via google-auth -s3fs==2024.9.0 -s3transfer==0.10.2 - # via boto3 -seaborn==0.13.2 -setuptools==75.1.0 - # via - # dagster - # influxdb-client -shapely==2.0.6 - # via geopandas -six==1.16.0 - # via - # asttokens - # kubernetes - # python-dateutil -smmap==5.0.1 - # via gitdb -sniffio==1.3.1 - # via anyio -soupsieve==2.6 - # via beautifulsoup4 -sqlalchemy==2.0.35 - # via - # alembic - # dagster -starlette==0.38.6 - # via - # dagster-graphql - # dagster-webserver - # fastapi -structlog==24.4.0 - # via dagster -tabulate==0.9.0 - # via dagster -tomli==2.0.1 - # via dagster -toposort==1.10 - # via dagster -tqdm==4.66.5 - # via dagster -typing-extensions==4.12.2 - # via - # alembic - # dagster - # dagster-polars - # fastapi - # flexcache - # flexparser - # pint - # pydantic - # pydantic-core - # reactivex - # sqlalchemy -tzdata==2024.2 - # via pandas -universal-pathlib==0.2.5 - # via - # dagster - # dagster-polars -urllib3==2.2.3 - # via - # botocore - # docker - # influxdb-client - # kubernetes - # requests -uvicorn==0.31.0 - # via dagster-webserver -uvloop==0.20.0 - # via uvicorn -watchdog==5.0.3 - # via dagster -watchfiles==0.24.0 - # via uvicorn -websocket-client==1.8.0 - # via kubernetes -websockets==13.1 - # via uvicorn -wrapt==1.16.0 - # via aiobotocore -xlsxwriter==3.2.0 -yarl==1.13.1 - # via - # aiohttp - # gql diff --git a/dagster/dagster.yaml b/dagster/dagster.yaml deleted file mode 100644 index b9d60ba..0000000 --- a/dagster/dagster.yaml +++ /dev/null @@ -1,69 +0,0 @@ -telemetry: - enabled: false - -concurrency: - default_op_concurrency_limit: 2 - -run_coordinator: - module: dagster.core.run_coordinator - class: QueuedRunCoordinator - -run_launcher: - module: dagster_docker - class: DockerRunLauncher - config: - env_vars: - - DAGSTER_POSTGRES_USER - - DAGSTER_POSTGRES_PASSWORD - - DAGSTER_POSTGRES_DB - network: dagster - container_kwargs: - volumes: - - /opt/dagster/src/app/:/opt/dagster/home/app/ - - /opt/dagster/src/repo.py:/opt/dagster/home/repo.py - - # - /opt/dagster/storage/:/opt/dagster/home/storage/ - - /opt/dagster/storage/import/:/opt/dagster/home/storage/import/ - - /opt/dagster/storage/deals/:/opt/dagster/home/storage/deals/ - -run_storage: - module: dagster_postgres.run_storage - class: PostgresRunStorage - config: - postgres_db: - hostname: postgresql - username: - env: DAGSTER_POSTGRES_USER - password: - env: DAGSTER_POSTGRES_PASSWORD - db_name: - env: DAGSTER_POSTGRES_DB - port: 5432 - -schedule_storage: - module: dagster_postgres.schedule_storage - class: PostgresScheduleStorage - config: - postgres_db: - hostname: postgresql - username: - env: DAGSTER_POSTGRES_USER - password: - env: DAGSTER_POSTGRES_PASSWORD - db_name: - env: DAGSTER_POSTGRES_DB - port: 5432 - -event_log_storage: - module: dagster_postgres.event_log - class: PostgresEventLogStorage - config: - postgres_db: - hostname: postgresql - username: - env: DAGSTER_POSTGRES_USER - password: - env: DAGSTER_POSTGRES_PASSWORD - db_name: - env: DAGSTER_POSTGRES_DB - port: 5432 diff --git a/dagster/docker-compose.code.yaml b/dagster/docker-compose.code.yaml deleted file mode 100644 index 6b87332..0000000 --- a/dagster/docker-compose.code.yaml +++ /dev/null @@ -1,47 +0,0 @@ -x-dagster-env: &dagster_env - DAGSTER_POSTGRES_USER: ${POSTGRES_USER} - DAGSTER_POSTGRES_PASSWORD: ${POSTGRES_PASSWORD} - DAGSTER_POSTGRES_DB: ${POSTGRES_DB} - DAGSTER_CURRENT_IMAGE: ${DAGSTER_CURRENT_IMAGE} - -x-volumes: &volumes - volumes: - #- /opt/dagster/storage/:/opt/dagster/home/storage/ - - /opt/dagster/storage/import/:/opt/dagster/home/storage/import/ - - /opt/dagster/storage/deals/:/opt/dagster/home/storage/deals/ - - /opt/dagster/src/app/:/opt/dagster/home/app/ - - /opt/dagster/src/repo.py:/opt/dagster/home/repo.py - -services: - # This service runs the gRPC server that loads your user code, in both dagit - # and dagster-daemon. By setting DAGSTER_CURRENT_IMAGE to its own image, we tell the - # run launcher to use this same image when launching runs in a new container as well. - # Multiple containers like this can be deployed separately - each just needs to run on - # its own port, and have its own entry in the workspace.yaml file that's loaded by dagit. - user_code: - build: - context: . - dockerfile: Dockerfile.code - container_name: user_code - image: user_code_image - restart: always - environment: - <<: *dagster_env - <<: *volumes - networks: - - dagster - - other_image: - profiles: [ disabled ] - build: - context: . - dockerfile: Dockerfile - container_name: other_image - image: user_code_image - restart: always - environment: - <<: *dagster_env - DAGSTER_CURRENT_IMAGE: something_else - <<: *volumes - networks: - - dagster diff --git a/dagster/docker-compose.system.yaml b/dagster/docker-compose.system.yaml deleted file mode 100644 index cd63be3..0000000 --- a/dagster/docker-compose.system.yaml +++ /dev/null @@ -1,90 +0,0 @@ -x-postgres-env: &postgres_env - POSTGRES_USER: ${POSTGRES_USER} - POSTGRES_PASSWORD: ${POSTGRES_PASSWORD} - POSTGRES_DB: ${POSTGRES_DB} -x-aws-env: &aws_env - AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID} - AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY} -x-dagster-env: &dagster_env - DAGSTER_POSTGRES_USER: ${POSTGRES_USER} - DAGSTER_POSTGRES_PASSWORD: ${POSTGRES_PASSWORD} - DAGSTER_POSTGRES_DB: ${POSTGRES_DB} - DAGSTER_CURRENT_IMAGE: ${DAGSTER_CURRENT_IMAGE} - -x-volumes: &volumes - volumes: - - /opt/dagster/dagster.yaml:/opt/dagster/home/dagster.yaml - - /opt/dagster/workspace.yaml:/opt/dagster/home/workspace.yaml - - /var/run/docker.sock:/var/run/docker.sock - - #- /opt/dagster/storage/:/opt/dagster/home/storage/ - - /opt/dagster/storage/import/:/opt/dagster/home/storage/import/ - - /opt/dagster/storage/deals/:/opt/dagster/home/storage/deals/ - - - /opt/dagster/src/app/:/opt/dagster/home/app/ - - /opt/dagster/src/repo.py:/opt/dagster/home/repo.py - # - /opt/homebrew/Caskroom/mambaforge/base/envs:/opt/homebrew/Caskroom/mambaforge/base/envs - - # Towel - # - /opt/dagster/src/towel.py:/opt/dagster/home/towel.py - # - /Users/rik/Seafile/Code/company/navara/Klanten/Eneco/towel/towel:/opt/dagster/home/app/towel - # - /Users/rik/Library/Caches/pypoetry/virtualenvs/towel-V7mtCF2c-py3.9:/venv/towel - -services: - # This service runs the postgres DB used by dagster for run storage, schedule storage, - # and event log storage. - postgresql: - image: postgres:11 - container_name: postgresql - environment: - <<: *postgres_env - networks: - - dagster - - # This service runs dagit, which loads your user code from the user code container. - # Since our instance uses the QueuedRunCoordinator, any runs submitted from dagit will be put on - # a queue and later dequeued and launched by dagster-daemon. - dagit: - build: - context: . - dockerfile: Dockerfile.system - entrypoint: - - dagster-webserver - - -h - - "0.0.0.0" - - -p - - "3000" - - -w - - workspace.yaml - container_name: dagit - expose: - - "3000" - ports: - - "3000:3000" - environment: - <<: *dagster_env - <<: *volumes - networks: - - dagster - depends_on: - - postgresql - - user_code - - # This service runs the dagster-daemon process, which is responsible for taking runs - # off of the queue and launching them, as well as creating runs from schedules or sensors. - daemon: - build: - context: . - dockerfile: Dockerfile.system - entrypoint: - - dagster-daemon - - run - container_name: daemon - restart: on-failure - environment: - <<: [ *dagster_env, *aws_env ] - <<: *volumes - networks: - - dagster - depends_on: - - postgresql diff --git a/dagster/docker-compose.yaml b/dagster/docker-compose.yaml deleted file mode 100644 index 421ce12..0000000 --- a/dagster/docker-compose.yaml +++ /dev/null @@ -1,8 +0,0 @@ -networks: - dagster: - driver: bridge - name: dagster - -include: - - docker-compose.system.yaml - - docker-compose.code.yaml \ No newline at end of file diff --git a/dagster/pyproject.toml b/dagster/pyproject.toml deleted file mode 100755 index 3d48deb..0000000 --- a/dagster/pyproject.toml +++ /dev/null @@ -1,75 +0,0 @@ -[project] -requires-python = "==3.12" -name = "dev" -authors = [ - { name = "Rik Veenboer", email = "rik.veenboer@gmail.com" } -] -version = "0.1.0" -dependencies = [ - "fastapi", - "gitpython", - "kubernetes", - "matplotlib", - "seaborn", - "openpyxl", - "xlsxwriter", - "pandas", - "pyarrow", - "pydantic[email]", - "pydantic-settings", - "pyyaml", - "requests", - "s3fs[boto3]", - "structlog", - "uvicorn", - "duckdb", - "geopandas", - "lxml", - "networkx", - "Pint", - "Pint-Pandas", - "boto3", - "influxdb-client", - "requests[socks]", - "beautifulsoup4", - "fastparquet", - "icecream" -] - -[project.optional-dependencies] -dev = [ - "black", - "isort", - "nbstripout", - "pip-tools", - "pre-commit", - "ruff", - "mypy" -] -local = [ - "ipykernel", - "ipywidgets" -] -dagster = [ - "dagster", - "dagster-graphql", - "dagster-postgres", - "dagster-docker", - "dagster-aws", - "dagster-polars", - "dagster-duckdb", - "dagster-duckdb-pandas", - "dagit" -] - -[tool.poetry] -name = "dev" -version = "0.1.0" -description = "" -authors = ["Rik Veenboer "] - -[tool.poetry.dependencies] -seven = "^1.0.0" - -[tool.ruff] -builtins = ["ic"] diff --git a/dagster/requirements.txt b/dagster/requirements.txt deleted file mode 100755 index 85e1c5c..0000000 --- a/dagster/requirements.txt +++ /dev/null @@ -1,238 +0,0 @@ -# This file was autogenerated by uv via the following command: -# uv pip compile --output-file=requirements.txt pyproject.toml -aiobotocore==2.15.1 - # via s3fs -aiohappyeyeballs==2.4.3 - # via aiohttp -aiohttp==3.10.8 - # via - # aiobotocore - # s3fs -aioitertools==0.12.0 - # via aiobotocore -aiosignal==1.3.1 - # via aiohttp -annotated-types==0.7.0 - # via pydantic -anyio==4.6.0 - # via starlette -appdirs==1.4.4 - # via pint -asttokens==2.4.1 - # via icecream -attrs==24.2.0 - # via aiohttp -beautifulsoup4==4.12.3 -boto3==1.35.23 - # via aiobotocore -botocore==1.35.23 - # via - # aiobotocore - # boto3 - # s3transfer -cachetools==5.5.0 - # via google-auth -certifi==2024.8.30 - # via - # influxdb-client - # kubernetes - # pyogrio - # pyproj - # requests -charset-normalizer==3.3.2 - # via requests -click==8.1.7 - # via uvicorn -colorama==0.4.6 - # via icecream -contourpy==1.3.0 - # via matplotlib -cramjam==2.8.4 - # via fastparquet -cycler==0.12.1 - # via matplotlib -dnspython==2.6.1 - # via email-validator -duckdb==1.1.1 -durationpy==0.8 - # via kubernetes -email-validator==2.2.0 - # via pydantic -et-xmlfile==1.1.0 - # via openpyxl -executing==2.1.0 - # via icecream -fastapi==0.115.0 -fastparquet==2024.5.0 -flexcache==0.3 - # via pint -flexparser==0.3.1 - # via pint -fonttools==4.54.1 - # via matplotlib -frozenlist==1.4.1 - # via - # aiohttp - # aiosignal -fsspec==2024.9.0 - # via - # fastparquet - # s3fs -geopandas==1.0.1 -gitdb==4.0.11 - # via gitpython -gitpython==3.1.43 -google-auth==2.35.0 - # via kubernetes -h11==0.14.0 - # via uvicorn -icecream==2.1.3 -idna==3.10 - # via - # anyio - # email-validator - # requests - # yarl -influxdb-client==1.46.0 -jmespath==1.0.1 - # via - # boto3 - # botocore -kiwisolver==1.4.7 - # via matplotlib -kubernetes==31.0.0 -lxml==5.3.0 -matplotlib==3.9.2 - # via seaborn -multidict==6.1.0 - # via - # aiohttp - # yarl -networkx==3.3 -numpy==2.1.1 - # via - # contourpy - # fastparquet - # geopandas - # matplotlib - # pandas - # pyarrow - # pyogrio - # seaborn - # shapely -oauthlib==3.2.2 - # via - # kubernetes - # requests-oauthlib -openpyxl==3.1.5 -packaging==24.1 - # via - # fastparquet - # geopandas - # matplotlib - # pyogrio -pandas==2.2.3 - # via - # fastparquet - # geopandas - # pint-pandas - # seaborn -pillow==10.4.0 - # via matplotlib -pint==0.24.3 - # via pint-pandas -pint-pandas==0.6.2 -pyarrow==17.0.0 -pyasn1==0.6.1 - # via - # pyasn1-modules - # rsa -pyasn1-modules==0.4.1 - # via google-auth -pydantic==2.9.2 - # via - # fastapi - # pydantic-settings -pydantic-core==2.23.4 - # via pydantic -pydantic-settings==2.5.2 -pygments==2.18.0 - # via icecream -pyogrio==0.10.0 - # via geopandas -pyparsing==3.1.4 - # via matplotlib -pyproj==3.7.0 - # via geopandas -pysocks==1.7.1 - # via requests -python-dateutil==2.9.0.post0 - # via - # botocore - # influxdb-client - # kubernetes - # matplotlib - # pandas -python-dotenv==1.0.1 - # via pydantic-settings -pytz==2024.2 - # via pandas -pyyaml==6.0.2 - # via kubernetes -reactivex==4.0.4 - # via influxdb-client -requests==2.32.3 - # via - # kubernetes - # requests-oauthlib -requests-oauthlib==2.0.0 - # via kubernetes -rsa==4.9 - # via google-auth -s3fs==2024.9.0 -s3transfer==0.10.2 - # via boto3 -seaborn==0.13.2 -setuptools==75.1.0 - # via influxdb-client -shapely==2.0.6 - # via geopandas -six==1.16.0 - # via - # asttokens - # kubernetes - # python-dateutil -smmap==5.0.1 - # via gitdb -sniffio==1.3.1 - # via anyio -soupsieve==2.6 - # via beautifulsoup4 -starlette==0.38.6 - # via fastapi -structlog==24.4.0 -typing-extensions==4.12.2 - # via - # fastapi - # flexcache - # flexparser - # pint - # pydantic - # pydantic-core - # reactivex -tzdata==2024.2 - # via pandas -urllib3==2.2.3 - # via - # botocore - # influxdb-client - # kubernetes - # requests -uvicorn==0.31.0 -websocket-client==1.8.0 - # via kubernetes -wrapt==1.16.0 - # via aiobotocore -xlsxwriter==3.2.0 -yarl==1.13.1 - # via aiohttp diff --git a/dagster/src/.telemetry/id.yaml b/dagster/src/.telemetry/id.yaml deleted file mode 100644 index 9c4cb04..0000000 --- a/dagster/src/.telemetry/id.yaml +++ /dev/null @@ -1 +0,0 @@ -instance_id: 9a2d409d-a36a-492d-8f23-2f20c1f49bf4 diff --git a/dagster/src/app/__init__.py b/dagster/src/app/__init__.py deleted file mode 100644 index ef37bac..0000000 --- a/dagster/src/app/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -from icecream import install - -install() diff --git a/dagster/src/app/vinyl/__init__.py b/dagster/src/app/vinyl/__init__.py deleted file mode 100755 index e69de29..0000000 diff --git a/dagster/src/app/vinyl/assets.py b/dagster/src/app/vinyl/assets.py deleted file mode 100755 index b8051f7..0000000 --- a/dagster/src/app/vinyl/assets.py +++ /dev/null @@ -1,181 +0,0 @@ -from datetime import datetime -from glob import glob - -import duckdb -import polars as pl -import structlog -from duckdb.typing import DATE, VARCHAR - -from app.vinyl.plato.fetch import scrape_plato -from app.vinyl.sounds.fetch import fetch_deals -from app.vinyl.utils import parse_date -from dagster import ( - DailyPartitionsDefinition, - DimensionPartitionMapping, - Failure, - Field, - IdentityPartitionMapping, - MultiPartitionMapping, - MultiPartitionsDefinition, - OpExecutionContext, - StaticPartitionsDefinition, - TimeWindowPartitionMapping, - asset, -) - -SOURCES = ["plato", "sounds"] - -logger = structlog.get_logger() - -partitions_def = MultiPartitionsDefinition( - { - "date": DailyPartitionsDefinition(start_date="2024-09-01", end_offset=1), - "source": StaticPartitionsDefinition(SOURCES), - } -) - -partition_mapping = MultiPartitionMapping( - { - "date": DimensionPartitionMapping( - dimension_name="date", - partition_mapping=TimeWindowPartitionMapping(start_offset=-1, end_offset=0), - ), - "source": DimensionPartitionMapping( - dimension_name="source", - partition_mapping=IdentityPartitionMapping(), - ), - } -) - - -@asset( - io_manager_key="polars_parquet_io_manager", - partitions_def=partitions_def, - metadata={ - "partition_by": ["date", "source"], - }, - config_schema={ - "import_dir": Field(str, default_value="/opt/dagster/home/storage/import") - }, -) -def deals(context): - ic() - ic(context.partition_key) - ic(context.op_config) - import_dir = context.op_config["import_dir"] - partition_key = context.partition_key.keys_by_dimension - date_str = partition_key["date"] - source = partition_key["source"] - logger.info("Materializing deals", date=date_str, source=source) - - date = datetime.strptime(partition_key["date"], "%Y-%m-%d") - days = (date - datetime.today()).days - ic(days) - if days > 0: - raise Failure(f"Cannot materialize for the future: {date.date()}") - if days < -1: - if source == "sounds": - pattern = f"{import_dir}/{date.date()}_*_sounds.csv" - logger.info("Looking for existing CSV files", pattern=pattern) - files = glob(pattern) - if len(files): - file = sorted(files)[-1] - logger.info("Using existing CSV file", file=file) - try: - df = pl.read_csv(file) - logger.info("Loaded CSV file", rows=len(df)) - return df.with_columns( - **{k: pl.lit(v) for k, v in partition_key.items()} - ) - except Exception as e: - logger.error("Failed to load CSV file!", error=e) - raise Failure(f"Cannot materialize for the past: {date.date()}") - - if source == "plato": - logger.info("Scraping Plato") - df = scrape_plato() - logger.info("Scraped Plato", rows=len(df), head=df.head().to_markdown()) - ic(df.columns) - return pl.from_pandas(df.assign(**partition_key)) - if source == "sounds": - logger.info("Scraping Sounds") - df = fetch_deals() - ic(df.columns) - logger.info("Scraped Sounds", rows=len(df), head=df.head().to_markdown()) - return pl.from_pandas(df.assign(**partition_key)) - - return pl.DataFrame( - [{"date": context.partition_key, "data": f"Data for {context.partition_key}"}] - ) - - -@asset(deps=[deals], io_manager_key="polars_parquet_io_manager") -def new_deals(context: OpExecutionContext) -> pl.DataFrame: - ic() - storage_dir = context.instance.storage_directory() - asset_key = "deals" - - with duckdb.connect() as con: - con.create_function("PARSE_DATE", parse_date, [VARCHAR], DATE) - return con.execute( - f""" - WITH tmp_plato AS ( - SELECT - source, - CAST(date AS DATE) AS date, - ean AS id, - _artist AS artist, - LOWER(title) AS title, - CAST(_date AS DATE) AS release, - CAST(_price AS FLOAT) AS price, - CONCAT('https://www.platomania.nl', url) AS url, - FROM read_parquet('{storage_dir}/{asset_key}/*/plato.parquet', union_by_name = true) - ), tmp_sounds AS ( - SELECT - source, - date, - id, - LOWER(TRIM(COALESCE(artist, SPLIT(name, '-')[1]))) AS artist, - LOWER(TRIM(COALESCE(title, ARRAY_TO_STRING(split(name, '-')[2:], '-')))) AS title, - PARSE_DATE(release) AS release, - CAST(price AS FLOAT) AS price, - CONCAT('https://www.sounds.nl/detail/', id) AS url - FROM read_parquet('{storage_dir}/{asset_key}/*/sounds.parquet', union_by_name = true) - ), tmp_both AS ( - SELECT * FROM tmp_plato UNION ALL SELECT * FROM tmp_sounds - ), tmp_rn AS ( - SELECT - *, - ROW_NUMBER() OVER(PARTITION BY source, id, artist, title, price ORDER BY date DESC) as rn - FROM tmp_both - ) - SELECT - source, - date, - id, - artist, - title, - release, - price, - url - FROM tmp_rn - WHERE rn = 1 - ORDER BY date ASC - """ - ).pl() - - -@asset( - io_manager_key="polars_parquet_io_manager", -) -def works(new_deals: pl.DataFrame) -> pl.DataFrame: - # Pandas - # columns = ["artist", "title"] - # return pl.from_pandas(new_deals[columns].to_pandas().drop_duplicates()) - - # Polars - # return new_deals[columns].unique(subset=columns) - - # DuckDB - with duckdb.connect() as con: - return con.execute("SELECT DISTINCT artist, title, release FROM new_deals").pl() diff --git a/dagster/src/app/vinyl/jobs.py b/dagster/src/app/vinyl/jobs.py deleted file mode 100755 index b1d6bcd..0000000 --- a/dagster/src/app/vinyl/jobs.py +++ /dev/null @@ -1,58 +0,0 @@ -from dagster import ( - AssetKey, - AssetMaterialization, - OpExecutionContext, - define_asset_job, - job, - op, -) - -from .assets import deals, new_deals, works - -deals_job = define_asset_job( - "deals_job", selection=[deals], partitions_def=deals.partitions_def -) - - -@op -def check_partititions(context: OpExecutionContext): - # Replace with your asset/job name - asset_key = "deals" - - context.log_event( - AssetMaterialization(asset_key=asset_key, partition="2024-09-30|sounds") - ) - - # Fetch the materializations for the asset key - materializations = context.instance.get_materialized_partitions( - asset_key=AssetKey(asset_key) - ) - context.log.info("Existing partitions", extra=dict(partitions=materializations)) - - import polars as pl - - storage_dir = context.instance.storage_directory() - ic(storage_dir) - for row in ( - pl.scan_parquet(f"{storage_dir}/{asset_key}/*/*.parquet") - .select(["date", "source"]) - .unique() - .collect() - .iter_rows() - ): - partition = "|".join(row) - if partition not in materializations: - context.log.info(f"Missing partition: {partition}") - context.log_event( - AssetMaterialization(asset_key=asset_key, partition=partition) - ) - - -@job -def check_partititions_job(): - check_partititions() - - -musicbrainz_lookup_job = define_asset_job( - "musicbrainz_lookup_job", selection=[works, new_deals] -) diff --git a/dagster/src/app/vinyl/plato/__init__.py b/dagster/src/app/vinyl/plato/__init__.py deleted file mode 100755 index e69de29..0000000 diff --git a/dagster/src/app/vinyl/plato/deals.py b/dagster/src/app/vinyl/plato/deals.py deleted file mode 100755 index 1f5c348..0000000 --- a/dagster/src/app/vinyl/plato/deals.py +++ /dev/null @@ -1,154 +0,0 @@ -import os - -import boto3 -import pandas as pd -from botocore.exceptions import NoCredentialsError, PartialCredentialsError -from dotenv import load_dotenv -from fetch import scrape_plato -from utils import get - - -def update_database(articles_df=None, database_file="/home/user/plato.parquet"): - if os.path.exists(database_file): - database_df = pd.read_parquet(database_file) - else: - database_df = None - - if articles_df is None: - new_df = None if database_df is None else database_df.head(0) - else: - if database_df is None: - articles_df.to_parquet(database_file) - return articles_df, articles_df - - compare = ["ean", "_price"] - check_df = pd.merge( - database_df[compare], articles_df[compare], how="right", indicator=True - ) - new_df = ( - check_df[check_df["_merge"] == "right_only"] - .drop(columns="_merge") - .merge(articles_df) - ) - database_df = ( - pd.concat([database_df, new_df]) - .sort_values("_date") - .groupby("ean") - .last() - .reset_index() - ) - database_df.to_parquet(database_file) - - return database_df, new_df - - -def send_email(lines): - # Define the email parameters - SENDER = "mail@veenboer.xyz" - RECIPIENT = "rik.veenboer@gmail.com" - SUBJECT = "Aanbieding op plato!" - - # The email body for recipients with non-HTML email clients - BODY_TEXT = "" - - # The HTML body of the email - tmp = "\n".join(lines) - BODY_HTML = f""" - - - {tmp} - - """ - - # The character encoding for the email - CHARSET = "UTF-8" - - # Try to send the email - try: - client = boto3.client( - "ses", region_name="eu-west-1" - ) # Change the region as needed - - # Provide the contents of the email - response = client.send_email( - Destination={ - "ToAddresses": [ - RECIPIENT, - ], - }, - Message={ - "Body": { - "Html": { - "Charset": CHARSET, - "Data": BODY_HTML, - }, - "Text": { - "Charset": CHARSET, - "Data": BODY_TEXT, - }, - }, - "Subject": { - "Charset": CHARSET, - "Data": SUBJECT, - }, - }, - Source=SENDER, - ) - # Display an error if something goes wrong. - except NoCredentialsError: - print("Credentials not available") - except PartialCredentialsError: - print("Incomplete credentials provided") - except Exception as e: - print(f"Error: {e}") - else: - print("Email sent! Message ID:"), - print(response["MessageId"]) - - -def main(dry=False): - load_dotenv("/opt/.env") - - local_ip = get("http://ifconfig.me", False).text - get_ip = get("http://ifconfig.me").text - print(f"Local IP = {local_ip}") - print(f"Request IP = {get_ip}") - assert local_ip != get_ip - - artists = open("/home/user/artists.txt").read().strip().splitlines() - print(f"Number of known artists = {len(artists)}") - - if dry: - articles_df = None - else: - articles_df = scrape_plato(get=get) - database_df, new_df = update_database(articles_df) - - if dry: - new_df = database_df.sample(20) - - print(f"Database size = {len(database_df)}") - print(f"New = {len(new_df)}") - - # new_df = new_df[new_df['_artist'].isin(artists)].query('_price <= 25') - new_df = new_df.query('_price <= 25 and ean != ""') - print(f"Interesting = {len(new_df)}") - - if new_df is not None and len(new_df): - message = [] - for _, row in new_df.head(10).iterrows(): - message.append( - f'

NEW

' - ) - message.append("") - send_email(message) - - -if __name__ == "__main__": - cwd = os.path.dirname(__file__) - main(dry=False) diff --git a/dagster/src/app/vinyl/plato/fetch.py b/dagster/src/app/vinyl/plato/fetch.py deleted file mode 100755 index f574572..0000000 --- a/dagster/src/app/vinyl/plato/fetch.py +++ /dev/null @@ -1,52 +0,0 @@ -#!/root/.pyenv/versions/dev/bin/python - -import re -from datetime import datetime - -import pandas as pd - -from .scrape import get_soup, scrape_page, scrape_page_links - - -def scrape_plato(get=None): - ic() - url = "https://www.platomania.nl/vinyl-aanbiedingen?page=1" - - ic(url) - soup = get_soup(url=url, get=get) - articles_info = scrape_page(soup) - ic(len(articles_info)) - - links = sorted(set(scrape_page_links(soup)), key=lambda x: int(x.split("=")[-1])) - for link in links: - ic(link) - soup = get_soup(url=link, get=get) - tmp = scrape_page(soup) - ic(len(tmp)) - articles_info.extend(tmp) - - def clean(name): - tmp = " ".join(reversed(name.split(", "))) - tmp = tmp.lower() - tmp = re.sub(r"\s+\([^)]*\)", "", tmp) - return tmp - - articles_df = pd.DataFrame(articles_info).reindex( - columns=[ - "artist", - "title", - "url", - "label", - "release_date", - "origin", - "item_number", - "ean", - "delivery_info", - "price", - ] - ) - articles_df["_artist"] = articles_df["artist"].map(clean) - articles_df["_price"] = articles_df["price"].map(lambda x: float(x.split(" ")[-1])) - articles_df["_date"] = datetime.now() - - return articles_df diff --git a/dagster/src/app/vinyl/plato/scrape.py b/dagster/src/app/vinyl/plato/scrape.py deleted file mode 100755 index 231d5d1..0000000 --- a/dagster/src/app/vinyl/plato/scrape.py +++ /dev/null @@ -1,79 +0,0 @@ -import requests -from bs4 import BeautifulSoup - - -def get_soup(url, get=None): - # Send a GET request to the specified URL - if get is None: - get = requests.get - response = get(url) - - # Check if the request was successful - if response.status_code == 200: - # Parse the HTML content of the page - return BeautifulSoup(response.content, "html.parser") - else: - raise ValueError( - f"Failed to retrieve the page. Status code: {response.status_code}" - ) - - -def scrape_page_links(soup): - # Find all
  • elements with class "page-item" - page_items = soup.find_all("li", class_="page-item") - - # Extract the href attribute of tags within these
  • elements - links = [] - for item in page_items: - a_tag = item.find("a", class_="page-link") - if a_tag and "href" in a_tag.attrs: - links.append(a_tag["href"]) - - return links - - -def extract_article_info(article): - info = {} - - # Extract the artist name - artist_tag = article.find("h1", class_="product-card__artist") - info["artist"] = artist_tag.text.strip() if artist_tag else None - - # Extract the title and URL - title_tag = article.find("h2", class_="product-card__title") - info["title"] = title_tag.text.strip() if title_tag else None - url_tag = title_tag.find_parent("a") if title_tag else None - info["url"] = url_tag["href"] if url_tag else None - - # Extract additional details - details = article.find_all("div", class_="article-details__text") - for detail in details: - text = detail.text.strip() - if "Label:" in text: - info["label"] = text.replace("Label: ", "").strip() - elif "Releasedatum:" in text: - info["release_date"] = text.replace("Releasedatum: ", "").strip() - elif "Herkomst:" in text: - info["origin"] = text.replace("Herkomst: ", "").strip() - elif "Item-nr:" in text: - info["item_number"] = text.replace("Item-nr: ", "").strip() - elif "EAN:" in text: - info["ean"] = text.replace("EAN:", "").strip() - - # Extract delivery information - delivery_tag = article.find("div", class_="article-details__delivery-text") - info["delivery_info"] = delivery_tag.text.strip() if delivery_tag else None - - # Extract price - price_tag = article.find("div", class_="article__price") - info["price"] = price_tag.text.strip() if price_tag else None - - return info - - -def scrape_page(soup): - # Find all article blocks - article_blocks = soup.find_all("article", class_="article LP") - - # Extract information from each article block - return [extract_article_info(article) for article in article_blocks] diff --git a/dagster/src/app/vinyl/plato/utils.py b/dagster/src/app/vinyl/plato/utils.py deleted file mode 100755 index d2e2e54..0000000 --- a/dagster/src/app/vinyl/plato/utils.py +++ /dev/null @@ -1,10 +0,0 @@ -import requests - - -def get(url, proxy=True): - if proxy: - tmp = "socks5://localhost:1080" - kwargs = dict(proxies=dict(http=tmp, https=tmp)) - else: - kwargs = {} - return requests.get(url, **kwargs) diff --git a/dagster/src/app/vinyl/repo.py b/dagster/src/app/vinyl/repo.py deleted file mode 100755 index 5eb239b..0000000 --- a/dagster/src/app/vinyl/repo.py +++ /dev/null @@ -1,31 +0,0 @@ -from collections.abc import Sequence - -from dagster_duckdb import DuckDBIOManager -from dagster_duckdb.io_manager import DbTypeHandler -from dagster_duckdb_pandas import DuckDBPandasTypeHandler -from dagster_polars import PolarsParquetIOManager - -from dagster import Definitions - -from .assets import deals, new_deals, works -from .jobs import check_partititions_job, deals_job, musicbrainz_lookup_job -from .schedules import deals_schedule -from .sensors import musicbrainz_lookup_sensor - - -class PandasDuckDBIOManager(DuckDBIOManager): - @staticmethod - def type_handlers() -> Sequence[DbTypeHandler]: - return [DuckDBPandasTypeHandler()] - - -vinyl = Definitions( - assets=[deals, new_deals, works], - resources={ - "polars_parquet_io_manager": PolarsParquetIOManager(), - "duckdb_io_manager": PandasDuckDBIOManager(database="vinyl"), - }, - jobs=[deals_job, check_partititions_job, musicbrainz_lookup_job], - schedules=[deals_schedule], - sensors=[musicbrainz_lookup_sensor], -) diff --git a/dagster/src/app/vinyl/schedules.py b/dagster/src/app/vinyl/schedules.py deleted file mode 100755 index 048db31..0000000 --- a/dagster/src/app/vinyl/schedules.py +++ /dev/null @@ -1,10 +0,0 @@ -from dagster import DefaultScheduleStatus, build_schedule_from_partitioned_job - -from app.vinyl.repo import deals_job - -deals_schedule = build_schedule_from_partitioned_job( - job=deals_job, - hour_of_day=7, - # execution_timezone="Europe/Amsterdam", - default_status=DefaultScheduleStatus.RUNNING, -) diff --git a/dagster/src/app/vinyl/sensors.py b/dagster/src/app/vinyl/sensors.py deleted file mode 100755 index 7068c1f..0000000 --- a/dagster/src/app/vinyl/sensors.py +++ /dev/null @@ -1,21 +0,0 @@ -from app.vinyl.assets import deals -from app.vinyl.jobs import musicbrainz_lookup_job -from dagster import ( - DefaultSensorStatus, - EventLogEntry, - RunRequest, - SensorEvaluationContext, - asset_sensor, -) - - -@asset_sensor( - asset_key=deals.key, - job=musicbrainz_lookup_job, - default_status=DefaultSensorStatus.RUNNING, -) -def musicbrainz_lookup_sensor( - context: SensorEvaluationContext, asset_event: EventLogEntry -): - assert asset_event.dagster_event and asset_event.dagster_event.asset_key - yield RunRequest(run_key=context.cursor) diff --git a/dagster/src/app/vinyl/sounds/__init__.py b/dagster/src/app/vinyl/sounds/__init__.py deleted file mode 100755 index e69de29..0000000 diff --git a/dagster/src/app/vinyl/sounds/deals.py b/dagster/src/app/vinyl/sounds/deals.py deleted file mode 100755 index 21729af..0000000 --- a/dagster/src/app/vinyl/sounds/deals.py +++ /dev/null @@ -1,80 +0,0 @@ -#!/usr/bin/python3 - -import glob -import os -from datetime import datetime - -import pandas as pd - - -def get_csvs(directory, n): - # List all files matching the pattern *_sounds.csv - suffix = "_sounds.csv" - files = glob.glob(os.path.join(directory, f"*{suffix}")) - - # Function to extract date from filename - def extract_date_from_filename(filename): - # Extract the date string - basename = os.path.basename(filename) - date_str = basename.split(suffix)[0] - try: - return datetime.strptime(date_str, "%Y-%m-%d_%H:%M:%S") - except ValueError: - # The date string cannot be parsed - return None - - # Create a list of tuples (date, filename), ignoring files with unparsable dates - result = [(extract_date_from_filename(file), file) for file in files] - result = [item for item in result if item[0] is not None] - - # Sort the list by date in descending order (most recent first) - result.sort(key=lambda x: x[0], reverse=True) - - # Return the two most recent files - return [x[1] for x in result[:n]] - - -def analyze(df1, df2): - df1 = df1.drop_duplicates(subset="id") - df2 = df2.drop_duplicates(subset="id") - combined_df = pd.merge( - df1[["id", "price"]], df2, on="id", how="right", indicator=True - ) - combined_df["discount"] = combined_df.price_y - combined_df.price_x - combined_df.drop(columns=["price_x"], inplace=True) - combined_df.rename(columns={"price_y": "price"}, inplace=True) - - deals = combined_df.query("discount < 0").sort_values(by="discount")[ - ["id", "name", "price", "discount"] - ] - new = combined_df.query("_merge == 'right_only'").sort_values(by="price")[ - ["id", "name", "price"] - ] - return deals, new - - -if __name__ == "__main__": - csvs = get_csvs(".", 100) - - for i in range(1, len(csvs)): - print(f"Comparing {csvs[i]} with {csvs[0]}") - df_previous = pd.read_csv(csvs[i], index_col=0) - df_latest = pd.read_csv(csvs[0], index_col=0) - deals, new = analyze(df_previous, df_latest) - - done = False - - if len(deals) > 0: - print() - print("New items:") - print(new) - print() - done = True - - if len(deals) > 0: - print(f"Discounted items:") - print(deals) - done = True - - if done: - break diff --git a/dagster/src/app/vinyl/sounds/fetch.py b/dagster/src/app/vinyl/sounds/fetch.py deleted file mode 100755 index 219d1cb..0000000 --- a/dagster/src/app/vinyl/sounds/fetch.py +++ /dev/null @@ -1,110 +0,0 @@ -#!/usr/bin/python3 - -import time -from datetime import datetime - -import pandas as pd -import requests -from bs4 import BeautifulSoup -from tqdm import tqdm - - -def get_page_count(html_content): - soup = BeautifulSoup(html_content, "html.parser") - - # Find all pagination links - page_links = soup.select("ul.pagination li a") - - # Extract the numbers from the hrefs and convert to integers - page_numbers = [ - int(link.get_text()) for link in page_links if link.get_text().isdigit() - ] - - return max(page_numbers) - - -def parse_page(html_content): - entries = [] - soup = BeautifulSoup(html_content, "html.parser") - for product in soup.find_all("div", {"class": "search-product"}): - item_id = product.find("a", rel=True)["rel"][0] - name = product.find("h5").text.strip() - artist_title = name.split("-") - artist = artist_title[0].strip() - title = artist_title[1].strip() - price = ( - product.find("span", class_="product-price") - .text.strip() - .replace("€", "") - .strip() - ) - - entry = { - "id": item_id, - "name": name, - "artist": artist, - "title": title, - "price": price, - } - if detail := product.find("h6", {"class": "hide-for-small"}): - entry["detail"] = detail.text - if supply := product.find("div", {"class": "product-voorraad"}): - entry["supply"] = supply.text - - for info in product.find_all("div", {"class": "product-info"}): - info = info.text.split(":") - if "Genre" in info[0]: - entry["genre"] = info[1].strip() - if "Releasedatum" in info[0]: - entry["release"] = info[1].strip() - entries.append(entry) - - return pd.DataFrame(entries).reindex( - columns=[ - "id", - "name", - "artist", - "title", - "price", - "supply", - "release", - "genre", - "detail", - ] - ) - - -def fetch_deals(): - # Get page count - page_count = get_page_count( - requests.get("https://www.sounds.nl/uitverkoop/1/lp/all/art").text - ) - time.sleep(1) - print(f"Number of pages: {page_count}") - - # Parse all pages - base_url = "https://www.sounds.nl/uitverkoop/{page_number}/lp/all" - dfs = [] - for i in tqdm(range(page_count)): - df = parse_page(requests.get(base_url.format(page_number=i)).text) - dfs.append(df) - time.sleep(2) - - # Combine dfs - return pd.concat(dfs) if dfs else pd.DataFrame(columns=["id", "name", "price"]) - - -if __name__ == "__main__": - df = fetch_deals() - print(f"Found {len(df)} deals") - - # Show current deals - print(df.sort_values(by="price").head(10)) - - # Write to file - now = datetime.now() - prefix = now.strftime("%Y-%m-%d_%H:%M:%S") - directory = "/home/bram/src/python" - filepath = f"{directory}/{prefix}_sounds.csv" - print(f"Writing data to {filepath}") - df.to_csv(filepath) diff --git a/dagster/src/app/vinyl/test.py b/dagster/src/app/vinyl/test.py deleted file mode 100755 index 1eb0d95..0000000 --- a/dagster/src/app/vinyl/test.py +++ /dev/null @@ -1,41 +0,0 @@ -import warnings -from datetime import datetime - -from dagster import materialize -from dagster_polars import PolarsParquetIOManager - -from app.vinyl.assets import deals -from app.vinyl.jobs import check_partititions_job - -warnings.filterwarnings("ignore", category=UserWarning) - -import logging - -logging.getLogger().setLevel(logging.INFO) - -resources = { - "polars_parquet_io_manager": PolarsParquetIOManager(base_dir="/opt/dagster/storage") -} - - -def test_deals(source="sounds", date: str = None): - if not date: - today = datetime.today().strftime("%Y-%m-%d") - date = today - - result = materialize( - [deals], - partition_key=f"{date}|{source}", - resources=resources, - run_config={ - "loggers": {"console": {"config": {"log_level": "ERROR"}}}, - "ops": {"deals": {"config": {"import_dir": "/opt/dagster/storage/import"}}}, - }, - ) - assert result.success - ic(result.asset_value) - - -if __name__ == "__main__": - # test_deals(source="plato") - check_partititions_job.execute_in_process() diff --git a/dagster/src/app/vinyl/utils.py b/dagster/src/app/vinyl/utils.py deleted file mode 100755 index 651148e..0000000 --- a/dagster/src/app/vinyl/utils.py +++ /dev/null @@ -1,29 +0,0 @@ -import datetime - - -def parse_date(dutch_date: str): - # Create a dictionary to map Dutch month names to English - dutch_to_english_months = { - "januari": "January", - "februari": "February", - "maart": "March", - "april": "April", - "mei": "May", - "juni": "June", - "juli": "July", - "augustus": "August", - "september": "September", - "oktober": "October", - "november": "November", - "december": "December", - } - - # Split the date and replace the Dutch month with its English equivalent - day, dutch_month, year = dutch_date.split() - english_month = dutch_to_english_months[dutch_month] - - # Rebuild the date string in English format - english_date = f"{day} {english_month} {year}" - - # Parse the date using strptime - return datetime.datetime.strptime(english_date, "%d %B %Y").date() diff --git a/dagster/src/repo.py b/dagster/src/repo.py deleted file mode 100644 index de2d58d..0000000 --- a/dagster/src/repo.py +++ /dev/null @@ -1 +0,0 @@ -from app.vinyl.repo import vinyl # noqa diff --git a/dagster/workspace.yaml b/dagster/workspace.yaml deleted file mode 100644 index 47124fd..0000000 --- a/dagster/workspace.yaml +++ /dev/null @@ -1,4 +0,0 @@ -load_from: - - grpc_server: - host: user_code - port: 4000