refactor plato logic

This commit is contained in:
2024-10-14 13:10:24 +02:00
parent 4b69bef47f
commit 6f2c4d1249
11 changed files with 130 additions and 97 deletions

View File

@@ -68,3 +68,6 @@ authors = ["Rik Veenboer <rik.veenboer@gmail.com>"]
[tool.poetry.dependencies] [tool.poetry.dependencies]
seven = "^1.0.0" seven = "^1.0.0"
[tool.ruff]
builtins = ["ic"]

View File

@@ -1,9 +1,16 @@
import polars as pl import polars as pl
from dagster import (AssetIn, DailyPartitionsDefinition,
DimensionPartitionMapping, IdentityPartitionMapping, from dagster import (
MultiPartitionMapping, MultiPartitionsDefinition, AssetIn,
StaticPartitionsDefinition, TimeWindowPartitionMapping, DailyPartitionsDefinition,
asset) DimensionPartitionMapping,
IdentityPartitionMapping,
MultiPartitionMapping,
MultiPartitionsDefinition,
StaticPartitionsDefinition,
TimeWindowPartitionMapping,
asset,
)
partitions_def_single = DailyPartitionsDefinition(start_date="2024-09-20") partitions_def_single = DailyPartitionsDefinition(start_date="2024-09-20")

View File

@@ -1,10 +1,12 @@
from datetime import datetime from datetime import datetime, timedelta
from typing import Optional from typing import Optional
from dagster import MultiPartitionKey, PartitionMapping, PartitionsDefinition from dagster import MultiPartitionKey, PartitionMapping, PartitionsDefinition
from dagster._core.definitions.partition import PartitionsSubset from dagster._core.definitions.partition import PartitionsSubset
from dagster._core.definitions.partition_mapping import ( from dagster._core.definitions.partition_mapping import (
MultiPartitionMapping, UpstreamPartitionsResult) MultiPartitionMapping,
UpstreamPartitionsResult,
)
from dagster._core.instance import DynamicPartitionsStore from dagster._core.instance import DynamicPartitionsStore
from dagster._serdes import whitelist_for_serdes from dagster._serdes import whitelist_for_serdes

View File

@@ -1,8 +1,8 @@
from dagster import Definitions, define_asset_job
from dagster_polars import PolarsParquetIOManager from dagster_polars import PolarsParquetIOManager
from .assets import (asset_multi_1, asset_multi_2, asset_single_1, from dagster import Definitions, define_asset_job
asset_single_2)
from .assets import asset_multi_1, asset_multi_2, asset_single_1, asset_single_2
# Define a job that includes both assets # Define a job that includes both assets
daily_job = define_asset_job("daily_job", selection=[asset_multi_1, asset_multi_2]) daily_job = define_asset_job("daily_job", selection=[asset_multi_1, asset_multi_2])

View File

@@ -1,8 +1,12 @@
from dagster import materialize
from dagster_polars import PolarsParquetIOManager from dagster_polars import PolarsParquetIOManager
from app.vinyl.assets import (asset_multi_1, asset_multi_2, asset_single_1, from app.vinyl.assets import (
asset_single_2) asset_multi_1,
asset_multi_2,
asset_single_1,
asset_single_2,
)
from dagster import materialize
resources = { resources = {
"polars_parquet_io_manager": PolarsParquetIOManager(base_dir="/opt/dagster/storage") "polars_parquet_io_manager": PolarsParquetIOManager(base_dir="/opt/dagster/storage")

View File

@@ -3,14 +3,22 @@ from glob import glob
import polars as pl import polars as pl
import structlog import structlog
from dagster import (AssetIn, DailyPartitionsDefinition,
DimensionPartitionMapping, Failure, Field,
IdentityPartitionMapping, MultiPartitionMapping,
MultiPartitionsDefinition, StaticPartitionsDefinition,
TimeWindowPartitionMapping, asset)
from app.vinyl.plato.check_plato import scrape_plato from app.vinyl.plato.fetch import scrape_plato
from app.vinyl.sounds.fetch import fetch_deals from app.vinyl.sounds.fetch import fetch_deals
from dagster import (
AssetIn,
DailyPartitionsDefinition,
DimensionPartitionMapping,
Failure,
Field,
IdentityPartitionMapping,
MultiPartitionMapping,
MultiPartitionsDefinition,
StaticPartitionsDefinition,
TimeWindowPartitionMapping,
asset,
)
SOURCES = ["plato", "sounds"] SOURCES = ["plato", "sounds"]
@@ -55,7 +63,7 @@ def deals(context):
partition_key = context.partition_key.keys_by_dimension partition_key = context.partition_key.keys_by_dimension
date_str = partition_key["date"] date_str = partition_key["date"]
source = partition_key["source"] source = partition_key["source"]
logger.info("Materializing deals", date=partition_key["date"], source=source) logger.info("Materializing deals", date=date_str, source=source)
date = datetime.strptime(partition_key["date"], "%Y-%m-%d") date = datetime.strptime(partition_key["date"], "%Y-%m-%d")
days = (date - datetime.today()).days days = (date - datetime.today()).days

View File

@@ -1,5 +1,11 @@
from dagster import (AssetKey, AssetMaterialization, OpExecutionContext, from dagster import (
define_asset_job, job, op) AssetKey,
AssetMaterialization,
OpExecutionContext,
define_asset_job,
job,
op,
)
from .assets import deals from .assets import deals

View File

@@ -1,62 +1,11 @@
#!/root/.pyenv/versions/dev/bin/python
import os import os
import re
from datetime import datetime
import boto3 import boto3
import pandas as pd import pandas as pd
from botocore.exceptions import NoCredentialsError, PartialCredentialsError from botocore.exceptions import NoCredentialsError, PartialCredentialsError
from dotenv import load_dotenv from dotenv import load_dotenv
from fetch import scrape_plato
from .scrape import * from utils import get
def scrape_plato(get=None):
ic()
url = "https://www.platomania.nl/vinyl-aanbiedingen?page=1"
ic(url)
soup = get_soup(url=url, get=get)
articles_info = scrape_page(soup)
ic(len(articles_info))
links = sorted(set(scrape_page_links(soup)), key=lambda x: int(x.split("=")[-1]))
for link in links:
ic(link)
soup = get_soup(url=link, get=get)
tmp = scrape_page(soup)
ic(len(tmp))
articles_info.extend(tmp)
# break
def clean(name):
tmp = " ".join(reversed(name.split(", ")))
tmp = tmp.lower()
tmp = re.sub(r"\s+\([^\)]*\)", "", tmp)
return tmp
articles_df = pd.DataFrame(articles_info).reindex(
columns=[
[
"artist",
"title",
"url",
"label",
"release_date",
"origin",
"item_number",
"ean",
"delivery_info",
"price",
]
]
)
articles_df["_artist"] = articles_df["artist"].map(clean)
articles_df["_price"] = articles_df["price"].map(lambda x: float(x.split(" ")[-1]))
articles_df["_date"] = datetime.now()
return articles_df
def update_database(articles_df=None, database_file="/home/user/plato.parquet"): def update_database(articles_df=None, database_file="/home/user/plato.parquet"):
@@ -157,15 +106,6 @@ def send_email(lines):
print(response["MessageId"]) print(response["MessageId"])
def get(url, proxy=True):
if proxy:
tmp = "socks5://localhost:1080"
kwargs = dict(proxies=dict(http=tmp, https=tmp))
else:
kwargs = {}
return requests.get(url, **kwargs)
def main(dry=False): def main(dry=False):
load_dotenv("/opt/.env") load_dotenv("/opt/.env")

52
src/app/vinyl/plato/fetch.py Executable file
View File

@@ -0,0 +1,52 @@
#!/root/.pyenv/versions/dev/bin/python
import re
from datetime import datetime
import pandas as pd
from .scrape import get_soup, scrape_page, scrape_page_links
def scrape_plato(get=None):
ic()
url = "https://www.platomania.nl/vinyl-aanbiedingen?page=1"
ic(url)
soup = get_soup(url=url, get=get)
articles_info = scrape_page(soup)
ic(len(articles_info))
links = sorted(set(scrape_page_links(soup)), key=lambda x: int(x.split("=")[-1]))
for link in links:
ic(link)
soup = get_soup(url=link, get=get)
tmp = scrape_page(soup)
ic(len(tmp))
articles_info.extend(tmp)
def clean(name):
tmp = " ".join(reversed(name.split(", ")))
tmp = tmp.lower()
tmp = re.sub(r"\s+\([^)]*\)", "", tmp)
return tmp
articles_df = pd.DataFrame(articles_info).reindex(
columns=[
"artist",
"title",
"url",
"label",
"release_date",
"origin",
"item_number",
"ean",
"delivery_info",
"price",
]
)
articles_df["_artist"] = articles_df["artist"].map(clean)
articles_df["_price"] = articles_df["price"].map(lambda x: float(x.split(" ")[-1]))
articles_df["_date"] = datetime.now()
return articles_df

View File

@@ -0,0 +1,10 @@
import requests
def get(url, proxy=True):
if proxy:
tmp = "socks5://localhost:1080"
kwargs = dict(proxies=dict(http=tmp, https=tmp))
else:
kwargs = {}
return requests.get(url, **kwargs)

1
src/repo.py Normal file
View File

@@ -0,0 +1 @@
from app.vinyl.repo import vinyl # noqa