diff --git a/pyproject.toml b/pyproject.toml index f400be6..c26df4d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,22 +6,22 @@ authors = [ ] version = "0.1.0" dependencies = [ - "fastapi", - "gitpython", + "fastapi", + "gitpython", "kubernetes", - "matplotlib", + "matplotlib", "seaborn", - "openpyxl", + "openpyxl", "xlsxwriter", - "pandas", - "pyarrow", - "pydantic[email]", - "pydantic-settings", - "pyyaml", - "requests", - "s3fs[boto3]", - "structlog", - "uvicorn", + "pandas", + "pyarrow", + "pydantic[email]", + "pydantic-settings", + "pyyaml", + "requests", + "s3fs[boto3]", + "structlog", + "uvicorn", "duckdb", "geopandas", "lxml", @@ -68,3 +68,6 @@ authors = ["Rik Veenboer "] [tool.poetry.dependencies] seven = "^1.0.0" + +[tool.ruff] +builtins = ["ic"] diff --git a/src/app/partitions/assets.py b/src/app/partitions/assets.py index 3e69cd7..41374d4 100644 --- a/src/app/partitions/assets.py +++ b/src/app/partitions/assets.py @@ -1,9 +1,16 @@ import polars as pl -from dagster import (AssetIn, DailyPartitionsDefinition, - DimensionPartitionMapping, IdentityPartitionMapping, - MultiPartitionMapping, MultiPartitionsDefinition, - StaticPartitionsDefinition, TimeWindowPartitionMapping, - asset) + +from dagster import ( + AssetIn, + DailyPartitionsDefinition, + DimensionPartitionMapping, + IdentityPartitionMapping, + MultiPartitionMapping, + MultiPartitionsDefinition, + StaticPartitionsDefinition, + TimeWindowPartitionMapping, + asset, +) partitions_def_single = DailyPartitionsDefinition(start_date="2024-09-20") diff --git a/src/app/partitions/mapping.py b/src/app/partitions/mapping.py index 128d436..ea4f21b 100644 --- a/src/app/partitions/mapping.py +++ b/src/app/partitions/mapping.py @@ -1,10 +1,12 @@ -from datetime import datetime +from datetime import datetime, timedelta from typing import Optional from dagster import MultiPartitionKey, PartitionMapping, PartitionsDefinition from dagster._core.definitions.partition import PartitionsSubset from dagster._core.definitions.partition_mapping import ( - MultiPartitionMapping, UpstreamPartitionsResult) + MultiPartitionMapping, + UpstreamPartitionsResult, +) from dagster._core.instance import DynamicPartitionsStore from dagster._serdes import whitelist_for_serdes diff --git a/src/app/partitions/repo.py b/src/app/partitions/repo.py index f23f0eb..c94e3c4 100644 --- a/src/app/partitions/repo.py +++ b/src/app/partitions/repo.py @@ -1,8 +1,8 @@ -from dagster import Definitions, define_asset_job from dagster_polars import PolarsParquetIOManager -from .assets import (asset_multi_1, asset_multi_2, asset_single_1, - asset_single_2) +from dagster import Definitions, define_asset_job + +from .assets import asset_multi_1, asset_multi_2, asset_single_1, asset_single_2 # Define a job that includes both assets daily_job = define_asset_job("daily_job", selection=[asset_multi_1, asset_multi_2]) diff --git a/src/app/partitions/test.py b/src/app/partitions/test.py index 70cbec9..e9ea00a 100644 --- a/src/app/partitions/test.py +++ b/src/app/partitions/test.py @@ -1,8 +1,12 @@ -from dagster import materialize from dagster_polars import PolarsParquetIOManager -from app.vinyl.assets import (asset_multi_1, asset_multi_2, asset_single_1, - asset_single_2) +from app.vinyl.assets import ( + asset_multi_1, + asset_multi_2, + asset_single_1, + asset_single_2, +) +from dagster import materialize resources = { "polars_parquet_io_manager": PolarsParquetIOManager(base_dir="/opt/dagster/storage") diff --git a/src/app/vinyl/assets.py b/src/app/vinyl/assets.py index cf53bca..c468dda 100644 --- a/src/app/vinyl/assets.py +++ b/src/app/vinyl/assets.py @@ -3,14 +3,22 @@ from glob import glob import polars as pl import structlog -from dagster import (AssetIn, DailyPartitionsDefinition, - DimensionPartitionMapping, Failure, Field, - IdentityPartitionMapping, MultiPartitionMapping, - MultiPartitionsDefinition, StaticPartitionsDefinition, - TimeWindowPartitionMapping, asset) -from app.vinyl.plato.check_plato import scrape_plato +from app.vinyl.plato.fetch import scrape_plato from app.vinyl.sounds.fetch import fetch_deals +from dagster import ( + AssetIn, + DailyPartitionsDefinition, + DimensionPartitionMapping, + Failure, + Field, + IdentityPartitionMapping, + MultiPartitionMapping, + MultiPartitionsDefinition, + StaticPartitionsDefinition, + TimeWindowPartitionMapping, + asset, +) SOURCES = ["plato", "sounds"] @@ -55,7 +63,7 @@ def deals(context): partition_key = context.partition_key.keys_by_dimension date_str = partition_key["date"] source = partition_key["source"] - logger.info("Materializing deals", date=partition_key["date"], source=source) + logger.info("Materializing deals", date=date_str, source=source) date = datetime.strptime(partition_key["date"], "%Y-%m-%d") days = (date - datetime.today()).days diff --git a/src/app/vinyl/jobs.py b/src/app/vinyl/jobs.py index 26bb867..a5ed910 100644 --- a/src/app/vinyl/jobs.py +++ b/src/app/vinyl/jobs.py @@ -1,5 +1,11 @@ -from dagster import (AssetKey, AssetMaterialization, OpExecutionContext, - define_asset_job, job, op) +from dagster import ( + AssetKey, + AssetMaterialization, + OpExecutionContext, + define_asset_job, + job, + op, +) from .assets import deals diff --git a/src/app/vinyl/plato/check_plato.py b/src/app/vinyl/plato/deals.py similarity index 73% rename from src/app/vinyl/plato/check_plato.py rename to src/app/vinyl/plato/deals.py index 61668ad..1f5c348 100755 --- a/src/app/vinyl/plato/check_plato.py +++ b/src/app/vinyl/plato/deals.py @@ -1,62 +1,11 @@ -#!/root/.pyenv/versions/dev/bin/python - import os -import re -from datetime import datetime import boto3 import pandas as pd from botocore.exceptions import NoCredentialsError, PartialCredentialsError from dotenv import load_dotenv - -from .scrape import * - - -def scrape_plato(get=None): - ic() - url = "https://www.platomania.nl/vinyl-aanbiedingen?page=1" - - ic(url) - soup = get_soup(url=url, get=get) - articles_info = scrape_page(soup) - ic(len(articles_info)) - - links = sorted(set(scrape_page_links(soup)), key=lambda x: int(x.split("=")[-1])) - for link in links: - ic(link) - soup = get_soup(url=link, get=get) - tmp = scrape_page(soup) - ic(len(tmp)) - articles_info.extend(tmp) - # break - - def clean(name): - tmp = " ".join(reversed(name.split(", "))) - tmp = tmp.lower() - tmp = re.sub(r"\s+\([^\)]*\)", "", tmp) - return tmp - - articles_df = pd.DataFrame(articles_info).reindex( - columns=[ - [ - "artist", - "title", - "url", - "label", - "release_date", - "origin", - "item_number", - "ean", - "delivery_info", - "price", - ] - ] - ) - articles_df["_artist"] = articles_df["artist"].map(clean) - articles_df["_price"] = articles_df["price"].map(lambda x: float(x.split(" ")[-1])) - articles_df["_date"] = datetime.now() - - return articles_df +from fetch import scrape_plato +from utils import get def update_database(articles_df=None, database_file="/home/user/plato.parquet"): @@ -157,15 +106,6 @@ def send_email(lines): print(response["MessageId"]) -def get(url, proxy=True): - if proxy: - tmp = "socks5://localhost:1080" - kwargs = dict(proxies=dict(http=tmp, https=tmp)) - else: - kwargs = {} - return requests.get(url, **kwargs) - - def main(dry=False): load_dotenv("/opt/.env") diff --git a/src/app/vinyl/plato/fetch.py b/src/app/vinyl/plato/fetch.py new file mode 100755 index 0000000..f574572 --- /dev/null +++ b/src/app/vinyl/plato/fetch.py @@ -0,0 +1,52 @@ +#!/root/.pyenv/versions/dev/bin/python + +import re +from datetime import datetime + +import pandas as pd + +from .scrape import get_soup, scrape_page, scrape_page_links + + +def scrape_plato(get=None): + ic() + url = "https://www.platomania.nl/vinyl-aanbiedingen?page=1" + + ic(url) + soup = get_soup(url=url, get=get) + articles_info = scrape_page(soup) + ic(len(articles_info)) + + links = sorted(set(scrape_page_links(soup)), key=lambda x: int(x.split("=")[-1])) + for link in links: + ic(link) + soup = get_soup(url=link, get=get) + tmp = scrape_page(soup) + ic(len(tmp)) + articles_info.extend(tmp) + + def clean(name): + tmp = " ".join(reversed(name.split(", "))) + tmp = tmp.lower() + tmp = re.sub(r"\s+\([^)]*\)", "", tmp) + return tmp + + articles_df = pd.DataFrame(articles_info).reindex( + columns=[ + "artist", + "title", + "url", + "label", + "release_date", + "origin", + "item_number", + "ean", + "delivery_info", + "price", + ] + ) + articles_df["_artist"] = articles_df["artist"].map(clean) + articles_df["_price"] = articles_df["price"].map(lambda x: float(x.split(" ")[-1])) + articles_df["_date"] = datetime.now() + + return articles_df diff --git a/src/app/vinyl/plato/utils.py b/src/app/vinyl/plato/utils.py new file mode 100644 index 0000000..d2e2e54 --- /dev/null +++ b/src/app/vinyl/plato/utils.py @@ -0,0 +1,10 @@ +import requests + + +def get(url, proxy=True): + if proxy: + tmp = "socks5://localhost:1080" + kwargs = dict(proxies=dict(http=tmp, https=tmp)) + else: + kwargs = {} + return requests.get(url, **kwargs) diff --git a/src/repo.py b/src/repo.py new file mode 100644 index 0000000..de2d58d --- /dev/null +++ b/src/repo.py @@ -0,0 +1 @@ +from app.vinyl.repo import vinyl # noqa