From ebef914be6f6a9e3922aae456f35fe90e746f803 Mon Sep 17 00:00:00 2001 From: Rik Veenboer Date: Mon, 14 Oct 2024 12:49:32 +0200 Subject: [PATCH] linting --- src/app/partitions/assets.py | 16 ++--- src/app/partitions/mapping.py | 4 +- src/app/partitions/repo.py | 3 +- src/app/partitions/test.py | 12 +--- src/app/vinyl/assets.py | 26 ++++--- src/app/vinyl/jobs.py | 20 ++++-- src/app/vinyl/plato/check_plato.py | 110 ++++++++++++++++------------- src/app/vinyl/plato/scrape.py | 62 ++++++++-------- src/app/vinyl/repo.py | 4 +- src/app/vinyl/schedules.py | 2 +- src/app/vinyl/test.py | 21 ++---- 11 files changed, 138 insertions(+), 142 deletions(-) diff --git a/src/app/partitions/assets.py b/src/app/partitions/assets.py index 7a1ecd5..3e69cd7 100644 --- a/src/app/partitions/assets.py +++ b/src/app/partitions/assets.py @@ -1,15 +1,9 @@ import polars as pl -from dagster import ( - AssetIn, - DailyPartitionsDefinition, - DimensionPartitionMapping, - IdentityPartitionMapping, - MultiPartitionMapping, - MultiPartitionsDefinition, - StaticPartitionsDefinition, - TimeWindowPartitionMapping, - asset, -) +from dagster import (AssetIn, DailyPartitionsDefinition, + DimensionPartitionMapping, IdentityPartitionMapping, + MultiPartitionMapping, MultiPartitionsDefinition, + StaticPartitionsDefinition, TimeWindowPartitionMapping, + asset) partitions_def_single = DailyPartitionsDefinition(start_date="2024-09-20") diff --git a/src/app/partitions/mapping.py b/src/app/partitions/mapping.py index d155c01..128d436 100644 --- a/src/app/partitions/mapping.py +++ b/src/app/partitions/mapping.py @@ -4,9 +4,7 @@ from typing import Optional from dagster import MultiPartitionKey, PartitionMapping, PartitionsDefinition from dagster._core.definitions.partition import PartitionsSubset from dagster._core.definitions.partition_mapping import ( - MultiPartitionMapping, - UpstreamPartitionsResult, -) + MultiPartitionMapping, UpstreamPartitionsResult) from dagster._core.instance import DynamicPartitionsStore from dagster._serdes import whitelist_for_serdes diff --git a/src/app/partitions/repo.py b/src/app/partitions/repo.py index ca10abd..f23f0eb 100644 --- a/src/app/partitions/repo.py +++ b/src/app/partitions/repo.py @@ -1,7 +1,8 @@ from dagster import Definitions, define_asset_job from dagster_polars import PolarsParquetIOManager -from .assets import asset_multi_1, asset_multi_2, asset_single_1, asset_single_2 +from .assets import (asset_multi_1, asset_multi_2, asset_single_1, + asset_single_2) # Define a job that includes both assets daily_job = define_asset_job("daily_job", selection=[asset_multi_1, asset_multi_2]) diff --git a/src/app/partitions/test.py b/src/app/partitions/test.py index 7b2dfee..70cbec9 100644 --- a/src/app/partitions/test.py +++ b/src/app/partitions/test.py @@ -1,17 +1,11 @@ from dagster import materialize from dagster_polars import PolarsParquetIOManager -from app.vinyl.assets import ( - asset_multi_1, - asset_multi_2, - asset_single_1, - asset_single_2, -) +from app.vinyl.assets import (asset_multi_1, asset_multi_2, asset_single_1, + asset_single_2) resources = { - "polars_parquet_io_manager": PolarsParquetIOManager( - base_dir="/opt/dagster/storage" - ) + "polars_parquet_io_manager": PolarsParquetIOManager(base_dir="/opt/dagster/storage") } diff --git a/src/app/vinyl/assets.py b/src/app/vinyl/assets.py index eb22df0..cf53bca 100644 --- a/src/app/vinyl/assets.py +++ b/src/app/vinyl/assets.py @@ -3,16 +3,11 @@ from glob import glob import polars as pl import structlog -from dagster import ( - AssetIn, - DailyPartitionsDefinition, - DimensionPartitionMapping, - IdentityPartitionMapping, - MultiPartitionMapping, - MultiPartitionsDefinition, - StaticPartitionsDefinition, - TimeWindowPartitionMapping, - asset, Failure, Field, ) +from dagster import (AssetIn, DailyPartitionsDefinition, + DimensionPartitionMapping, Failure, Field, + IdentityPartitionMapping, MultiPartitionMapping, + MultiPartitionsDefinition, StaticPartitionsDefinition, + TimeWindowPartitionMapping, asset) from app.vinyl.plato.check_plato import scrape_plato from app.vinyl.sounds.fetch import fetch_deals @@ -48,7 +43,9 @@ partition_mapping = MultiPartitionMapping( metadata={ "partition_by": ["date", "source"], }, - config_schema={"import_dir": Field(str, default_value="/opt/dagster/home/storage/import")}, + config_schema={ + "import_dir": Field(str, default_value="/opt/dagster/home/storage/import") + }, ) def deals(context): ic() @@ -74,9 +71,11 @@ def deals(context): file = sorted(files)[-1] logger.info("Using existing CSV file", file=file) try: - df = pl.read_csv(file)[["id", "name", "price"]] + df = pl.read_csv(file) logger.info("Loaded CSV file", rows=len(df)) - return df.with_columns(**{k: pl.lit(v) for k, v in partition_key.items()}) + return df.with_columns( + **{k: pl.lit(v) for k, v in partition_key.items()} + ) except Exception as e: logger.error("Failed to load CSV file!", error=e) raise Failure(f"Cannot materialize for the past: {date.date()}") @@ -91,7 +90,6 @@ def deals(context): logger.info("Scraping Sounds") df = fetch_deals() ic(df.columns) - df = df[["id", "name", "price"]] logger.info("Scraped Sounds", rows=len(df), head=df.head().to_markdown()) return pl.from_pandas(df.assign(**partition_key)) diff --git a/src/app/vinyl/jobs.py b/src/app/vinyl/jobs.py index 03ef1fb..26bb867 100644 --- a/src/app/vinyl/jobs.py +++ b/src/app/vinyl/jobs.py @@ -1,9 +1,11 @@ -from dagster import job, OpExecutionContext, op, \ - AssetMaterialization, AssetKey, define_asset_job +from dagster import (AssetKey, AssetMaterialization, OpExecutionContext, + define_asset_job, job, op) from .assets import deals -deals_job = define_asset_job("deals_job", selection=[deals], partitions_def=deals.partitions_def) +deals_job = define_asset_job( + "deals_job", selection=[deals], partitions_def=deals.partitions_def +) @op @@ -22,11 +24,17 @@ def check_partititions(context: OpExecutionContext): context.log.info("Existing partitions", extra=dict(partitions=materializations)) import polars as pl + storage_dir = context.instance.storage_directory() ic(storage_dir) - for row in pl.scan_parquet(f'{storage_dir}/{asset_key}/*/*.parquet').select( - ['date', 'source']).unique().collect().iter_rows(): - partition = '|'.join(row) + for row in ( + pl.scan_parquet(f"{storage_dir}/{asset_key}/*/*.parquet") + .select(["date", "source"]) + .unique() + .collect() + .iter_rows() + ): + partition = "|".join(row) if partition not in materializations: context.log.info(f"Missing partition: {partition}") context.log_event( diff --git a/src/app/vinyl/plato/check_plato.py b/src/app/vinyl/plato/check_plato.py index 5ba8812..f4ddaad 100755 --- a/src/app/vinyl/plato/check_plato.py +++ b/src/app/vinyl/plato/check_plato.py @@ -14,14 +14,14 @@ from .scrape import * def scrape_plato(get=None): ic() - url = 'https://www.platomania.nl/vinyl-aanbiedingen?page=1' + url = "https://www.platomania.nl/vinyl-aanbiedingen?page=1" ic(url) soup = get_soup(url=url, get=get) articles_info = scrape_page(soup) ic(len(articles_info)) - links = sorted(set(scrape_page_links(soup)), key=lambda x: int(x.split('=')[-1])) + links = sorted(set(scrape_page_links(soup)), key=lambda x: int(x.split("=")[-1])) for link in links: ic(link) soup = get_soup(url=link, get=get) @@ -31,20 +31,20 @@ def scrape_plato(get=None): # break def clean(name): - tmp = ' '.join(reversed(name.split(', '))) + tmp = " ".join(reversed(name.split(", "))) tmp = tmp.lower() - tmp = re.sub(r'\s+\([^\)]*\)', '', tmp) + tmp = re.sub(r"\s+\([^\)]*\)", "", tmp) return tmp articles_df = pd.DataFrame(articles_info) - articles_df['_artist'] = articles_df['artist'].map(clean) - articles_df['_price'] = articles_df['price'].map(lambda x: float(x.split(' ')[-1])) - articles_df['_date'] = datetime.now() + articles_df["_artist"] = articles_df["artist"].map(clean) + articles_df["_price"] = articles_df["price"].map(lambda x: float(x.split(" ")[-1])) + articles_df["_date"] = datetime.now() return articles_df -def update_database(articles_df=None, database_file='/home/user/plato.parquet'): +def update_database(articles_df=None, database_file="/home/user/plato.parquet"): if os.path.exists(database_file): database_df = pd.read_parquet(database_file) else: @@ -57,18 +57,22 @@ def update_database(articles_df=None, database_file='/home/user/plato.parquet'): articles_df.to_parquet(database_file) return articles_df, articles_df - compare = ['ean', '_price'] + compare = ["ean", "_price"] check_df = pd.merge( - database_df[compare], - articles_df[compare], - how='right', - indicator=True + database_df[compare], articles_df[compare], how="right", indicator=True + ) + new_df = ( + check_df[check_df["_merge"] == "right_only"] + .drop(columns="_merge") + .merge(articles_df) + ) + database_df = ( + pd.concat([database_df, new_df]) + .sort_values("_date") + .groupby("ean") + .last() + .reset_index() ) - new_df = check_df[check_df['_merge'] == 'right_only'].drop(columns='_merge').merge(articles_df) - database_df = pd.concat([ - database_df, - new_df - ]).sort_values('_date').groupby('ean').last().reset_index() database_df.to_parquet(database_file) return database_df, new_df @@ -84,7 +88,7 @@ def send_email(lines): BODY_TEXT = "" # The HTML body of the email - tmp = '\n'.join(lines) + tmp = "\n".join(lines) BODY_HTML = f""" @@ -97,29 +101,31 @@ def send_email(lines): # Try to send the email try: - client = boto3.client('ses', region_name='eu-west-1') # Change the region as needed + client = boto3.client( + "ses", region_name="eu-west-1" + ) # Change the region as needed # Provide the contents of the email response = client.send_email( Destination={ - 'ToAddresses': [ + "ToAddresses": [ RECIPIENT, ], }, Message={ - 'Body': { - 'Html': { - 'Charset': CHARSET, - 'Data': BODY_HTML, + "Body": { + "Html": { + "Charset": CHARSET, + "Data": BODY_HTML, }, - 'Text': { - 'Charset': CHARSET, - 'Data': BODY_TEXT, + "Text": { + "Charset": CHARSET, + "Data": BODY_TEXT, }, }, - 'Subject': { - 'Charset': CHARSET, - 'Data': SUBJECT, + "Subject": { + "Charset": CHARSET, + "Data": SUBJECT, }, }, Source=SENDER, @@ -133,12 +139,12 @@ def send_email(lines): print(f"Error: {e}") else: print("Email sent! Message ID:"), - print(response['MessageId']) + print(response["MessageId"]) def get(url, proxy=True): if proxy: - tmp = 'socks5://localhost:1080' + tmp = "socks5://localhost:1080" kwargs = dict(proxies=dict(http=tmp, https=tmp)) else: kwargs = {} @@ -146,16 +152,16 @@ def get(url, proxy=True): def main(dry=False): - load_dotenv('/opt/.env') + load_dotenv("/opt/.env") - local_ip = get('http://ifconfig.me', False).text - get_ip = get('http://ifconfig.me').text - print(f'Local IP = {local_ip}') - print(f'Request IP = {get_ip}') + local_ip = get("http://ifconfig.me", False).text + get_ip = get("http://ifconfig.me").text + print(f"Local IP = {local_ip}") + print(f"Request IP = {get_ip}") assert local_ip != get_ip - artists = open('/home/user/artists.txt').read().strip().splitlines() - print(f'Number of known artists = {len(artists)}') + artists = open("/home/user/artists.txt").read().strip().splitlines() + print(f"Number of known artists = {len(artists)}") if dry: articles_df = None @@ -166,26 +172,28 @@ def main(dry=False): if dry: new_df = database_df.sample(20) - print(f'Database size = {len(database_df)}') - print(f'New = {len(new_df)}') + print(f"Database size = {len(database_df)}") + print(f"New = {len(new_df)}") # new_df = new_df[new_df['_artist'].isin(artists)].query('_price <= 25') new_df = new_df.query('_price <= 25 and ean != ""') - print(f'Interesting = {len(new_df)}') + print(f"Interesting = {len(new_df)}") if new_df is not None and len(new_df): message = [] for _, row in new_df.head(10).iterrows(): - message.append(f'

NEW

') - message.append('') + message.append( + f'

NEW

' + ) + message.append("") send_email(message) -if __name__ == '__main__': +if __name__ == "__main__": cwd = os.path.dirname(__file__) main(dry=False) diff --git a/src/app/vinyl/plato/scrape.py b/src/app/vinyl/plato/scrape.py index 33ce325..231d5d1 100644 --- a/src/app/vinyl/plato/scrape.py +++ b/src/app/vinyl/plato/scrape.py @@ -2,8 +2,7 @@ import requests from bs4 import BeautifulSoup - -def get_soup(url, get = None): +def get_soup(url, get=None): # Send a GET request to the specified URL if get is None: get = requests.get @@ -12,21 +11,23 @@ def get_soup(url, get = None): # Check if the request was successful if response.status_code == 200: # Parse the HTML content of the page - return BeautifulSoup(response.content, 'html.parser') + return BeautifulSoup(response.content, "html.parser") else: - raise ValueError(f"Failed to retrieve the page. Status code: {response.status_code}") + raise ValueError( + f"Failed to retrieve the page. Status code: {response.status_code}" + ) def scrape_page_links(soup): # Find all
  • elements with class "page-item" - page_items = soup.find_all('li', class_='page-item') + page_items = soup.find_all("li", class_="page-item") # Extract the href attribute of tags within these
  • elements links = [] for item in page_items: - a_tag = item.find('a', class_='page-link') - if a_tag and 'href' in a_tag.attrs: - links.append(a_tag['href']) + a_tag = item.find("a", class_="page-link") + if a_tag and "href" in a_tag.attrs: + links.append(a_tag["href"]) return links @@ -35,43 +36,44 @@ def extract_article_info(article): info = {} # Extract the artist name - artist_tag = article.find('h1', class_='product-card__artist') - info['artist'] = artist_tag.text.strip() if artist_tag else None + artist_tag = article.find("h1", class_="product-card__artist") + info["artist"] = artist_tag.text.strip() if artist_tag else None # Extract the title and URL - title_tag = article.find('h2', class_='product-card__title') - info['title'] = title_tag.text.strip() if title_tag else None - url_tag = title_tag.find_parent('a') if title_tag else None - info['url'] = url_tag['href'] if url_tag else None + title_tag = article.find("h2", class_="product-card__title") + info["title"] = title_tag.text.strip() if title_tag else None + url_tag = title_tag.find_parent("a") if title_tag else None + info["url"] = url_tag["href"] if url_tag else None # Extract additional details - details = article.find_all('div', class_='article-details__text') + details = article.find_all("div", class_="article-details__text") for detail in details: text = detail.text.strip() - if 'Label:' in text: - info['label'] = text.replace('Label: ', '').strip() - elif 'Releasedatum:' in text: - info['release_date'] = text.replace('Releasedatum: ', '').strip() - elif 'Herkomst:' in text: - info['origin'] = text.replace('Herkomst: ', '').strip() - elif 'Item-nr:' in text: - info['item_number'] = text.replace('Item-nr: ', '').strip() - elif 'EAN:' in text: - info['ean'] = text.replace('EAN:', '').strip() + if "Label:" in text: + info["label"] = text.replace("Label: ", "").strip() + elif "Releasedatum:" in text: + info["release_date"] = text.replace("Releasedatum: ", "").strip() + elif "Herkomst:" in text: + info["origin"] = text.replace("Herkomst: ", "").strip() + elif "Item-nr:" in text: + info["item_number"] = text.replace("Item-nr: ", "").strip() + elif "EAN:" in text: + info["ean"] = text.replace("EAN:", "").strip() # Extract delivery information - delivery_tag = article.find('div', class_='article-details__delivery-text') - info['delivery_info'] = delivery_tag.text.strip() if delivery_tag else None + delivery_tag = article.find("div", class_="article-details__delivery-text") + info["delivery_info"] = delivery_tag.text.strip() if delivery_tag else None # Extract price - price_tag = article.find('div', class_='article__price') - info['price'] = price_tag.text.strip() if price_tag else None + price_tag = article.find("div", class_="article__price") + info["price"] = price_tag.text.strip() if price_tag else None return info + def scrape_page(soup): # Find all article blocks - article_blocks = soup.find_all('article', class_='article LP') + article_blocks = soup.find_all("article", class_="article LP") # Extract information from each article block return [extract_article_info(article) for article in article_blocks] diff --git a/src/app/vinyl/repo.py b/src/app/vinyl/repo.py index 959238d..346fd4d 100644 --- a/src/app/vinyl/repo.py +++ b/src/app/vinyl/repo.py @@ -2,12 +2,12 @@ from dagster import Definitions from dagster_polars import PolarsParquetIOManager from .assets import deals -from .jobs import deals_job, check_partititions_job +from .jobs import check_partititions_job, deals_job from .schedules import deals_schedule vinyl = Definitions( assets=[deals], resources={"polars_parquet_io_manager": PolarsParquetIOManager()}, jobs=[deals_job, check_partititions_job], - schedules=[deals_schedule] + schedules=[deals_schedule], ) diff --git a/src/app/vinyl/schedules.py b/src/app/vinyl/schedules.py index d3f4bf1..048db31 100644 --- a/src/app/vinyl/schedules.py +++ b/src/app/vinyl/schedules.py @@ -6,5 +6,5 @@ deals_schedule = build_schedule_from_partitioned_job( job=deals_job, hour_of_day=7, # execution_timezone="Europe/Amsterdam", - default_status=DefaultScheduleStatus.RUNNING + default_status=DefaultScheduleStatus.RUNNING, ) diff --git a/src/app/vinyl/test.py b/src/app/vinyl/test.py index 50d8bfb..1eb0d95 100644 --- a/src/app/vinyl/test.py +++ b/src/app/vinyl/test.py @@ -4,9 +4,7 @@ from datetime import datetime from dagster import materialize from dagster_polars import PolarsParquetIOManager -from app.vinyl.assets import ( - deals -) +from app.vinyl.assets import deals from app.vinyl.jobs import check_partititions_job warnings.filterwarnings("ignore", category=UserWarning) @@ -16,16 +14,11 @@ import logging logging.getLogger().setLevel(logging.INFO) resources = { - "polars_parquet_io_manager": PolarsParquetIOManager( - base_dir="/opt/dagster/storage" - ) + "polars_parquet_io_manager": PolarsParquetIOManager(base_dir="/opt/dagster/storage") } -def test_deals( - source="sounds", - date: str = None -): +def test_deals(source="sounds", date: str = None): if not date: today = datetime.today().strftime("%Y-%m-%d") date = today @@ -34,10 +27,10 @@ def test_deals( [deals], partition_key=f"{date}|{source}", resources=resources, - run_config={"loggers": {"console": {"config": {"log_level": "ERROR"}}}, - "ops": {"deals": {"config": {"import_dir": "/opt/dagster/storage/import"}}} - - } + run_config={ + "loggers": {"console": {"config": {"log_level": "ERROR"}}}, + "ops": {"deals": {"config": {"import_dir": "/opt/dagster/storage/import"}}}, + }, ) assert result.success ic(result.asset_value)