From a0a0bbd110c0a0cef002a5be75a159e1f43399be Mon Sep 17 00:00:00 2001 From: Rik Veenboer Date: Sat, 16 Aug 2025 13:48:34 +0200 Subject: [PATCH] refactor --- apps/vinyl/src/assets.py | 8 +- apps/vinyl/src/plato/deals.py | 154 ------------------ apps/vinyl/src/plato/fetch.py | 52 ------ apps/vinyl/src/plato/scrape.py | 60 +++++-- apps/vinyl/src/sounds/deals.py | 80 --------- apps/vinyl/src/sounds/{fetch.py => scrape.py} | 25 +-- 6 files changed, 58 insertions(+), 321 deletions(-) delete mode 100755 apps/vinyl/src/plato/deals.py delete mode 100755 apps/vinyl/src/plato/fetch.py mode change 100644 => 100755 apps/vinyl/src/plato/scrape.py delete mode 100644 apps/vinyl/src/sounds/deals.py rename apps/vinyl/src/sounds/{fetch.py => scrape.py} (81%) diff --git a/apps/vinyl/src/assets.py b/apps/vinyl/src/assets.py index 9c0bc0d..4754f8f 100644 --- a/apps/vinyl/src/assets.py +++ b/apps/vinyl/src/assets.py @@ -12,11 +12,11 @@ from dagster_polars.patito import patito_model_to_dagster_type from jinja2 import Environment, FileSystemLoader from models import Deal from partitions import daily_partitions_def, multi_partitions_def -from plato.fetch import scrape_plato from plato.parse import parse as parse_plato +from plato.scrape import scrape_plato from shared.utils import get_partition_keys, parse_partition_keys -from sounds.fetch import fetch_deals from sounds.parse import parse as parse_sounds +from sounds.scrape import scrape_sounds from utils.email import EmailService import dagster as dg @@ -71,9 +71,9 @@ def deals(context: dg.AssetExecutionContext) -> pl.DataFrame: return pl.from_pandas(df.assign(**partition_key)) if source == "sounds": logger.info("Scraping Sounds") - df = fetch_deals() - ic(df.columns) + df = scrape_sounds() logger.info("Scraped Sounds", rows=len(df), head=df.head().to_markdown()) + ic(df.columns) return pl.from_pandas(df.assign(**partition_key)) return pl.DataFrame( diff --git a/apps/vinyl/src/plato/deals.py b/apps/vinyl/src/plato/deals.py deleted file mode 100755 index 1f5c348..0000000 --- a/apps/vinyl/src/plato/deals.py +++ /dev/null @@ -1,154 +0,0 @@ -import os - -import boto3 -import pandas as pd -from botocore.exceptions import NoCredentialsError, PartialCredentialsError -from dotenv import load_dotenv -from fetch import scrape_plato -from utils import get - - -def update_database(articles_df=None, database_file="/home/user/plato.parquet"): - if os.path.exists(database_file): - database_df = pd.read_parquet(database_file) - else: - database_df = None - - if articles_df is None: - new_df = None if database_df is None else database_df.head(0) - else: - if database_df is None: - articles_df.to_parquet(database_file) - return articles_df, articles_df - - compare = ["ean", "_price"] - check_df = pd.merge( - database_df[compare], articles_df[compare], how="right", indicator=True - ) - new_df = ( - check_df[check_df["_merge"] == "right_only"] - .drop(columns="_merge") - .merge(articles_df) - ) - database_df = ( - pd.concat([database_df, new_df]) - .sort_values("_date") - .groupby("ean") - .last() - .reset_index() - ) - database_df.to_parquet(database_file) - - return database_df, new_df - - -def send_email(lines): - # Define the email parameters - SENDER = "mail@veenboer.xyz" - RECIPIENT = "rik.veenboer@gmail.com" - SUBJECT = "Aanbieding op plato!" - - # The email body for recipients with non-HTML email clients - BODY_TEXT = "" - - # The HTML body of the email - tmp = "\n".join(lines) - BODY_HTML = f""" - - - {tmp} - - """ - - # The character encoding for the email - CHARSET = "UTF-8" - - # Try to send the email - try: - client = boto3.client( - "ses", region_name="eu-west-1" - ) # Change the region as needed - - # Provide the contents of the email - response = client.send_email( - Destination={ - "ToAddresses": [ - RECIPIENT, - ], - }, - Message={ - "Body": { - "Html": { - "Charset": CHARSET, - "Data": BODY_HTML, - }, - "Text": { - "Charset": CHARSET, - "Data": BODY_TEXT, - }, - }, - "Subject": { - "Charset": CHARSET, - "Data": SUBJECT, - }, - }, - Source=SENDER, - ) - # Display an error if something goes wrong. - except NoCredentialsError: - print("Credentials not available") - except PartialCredentialsError: - print("Incomplete credentials provided") - except Exception as e: - print(f"Error: {e}") - else: - print("Email sent! Message ID:"), - print(response["MessageId"]) - - -def main(dry=False): - load_dotenv("/opt/.env") - - local_ip = get("http://ifconfig.me", False).text - get_ip = get("http://ifconfig.me").text - print(f"Local IP = {local_ip}") - print(f"Request IP = {get_ip}") - assert local_ip != get_ip - - artists = open("/home/user/artists.txt").read().strip().splitlines() - print(f"Number of known artists = {len(artists)}") - - if dry: - articles_df = None - else: - articles_df = scrape_plato(get=get) - database_df, new_df = update_database(articles_df) - - if dry: - new_df = database_df.sample(20) - - print(f"Database size = {len(database_df)}") - print(f"New = {len(new_df)}") - - # new_df = new_df[new_df['_artist'].isin(artists)].query('_price <= 25') - new_df = new_df.query('_price <= 25 and ean != ""') - print(f"Interesting = {len(new_df)}") - - if new_df is not None and len(new_df): - message = [] - for _, row in new_df.head(10).iterrows(): - message.append( - f'

NEW

' - ) - message.append("") - send_email(message) - - -if __name__ == "__main__": - cwd = os.path.dirname(__file__) - main(dry=False) diff --git a/apps/vinyl/src/plato/fetch.py b/apps/vinyl/src/plato/fetch.py deleted file mode 100755 index f574572..0000000 --- a/apps/vinyl/src/plato/fetch.py +++ /dev/null @@ -1,52 +0,0 @@ -#!/root/.pyenv/versions/dev/bin/python - -import re -from datetime import datetime - -import pandas as pd - -from .scrape import get_soup, scrape_page, scrape_page_links - - -def scrape_plato(get=None): - ic() - url = "https://www.platomania.nl/vinyl-aanbiedingen?page=1" - - ic(url) - soup = get_soup(url=url, get=get) - articles_info = scrape_page(soup) - ic(len(articles_info)) - - links = sorted(set(scrape_page_links(soup)), key=lambda x: int(x.split("=")[-1])) - for link in links: - ic(link) - soup = get_soup(url=link, get=get) - tmp = scrape_page(soup) - ic(len(tmp)) - articles_info.extend(tmp) - - def clean(name): - tmp = " ".join(reversed(name.split(", "))) - tmp = tmp.lower() - tmp = re.sub(r"\s+\([^)]*\)", "", tmp) - return tmp - - articles_df = pd.DataFrame(articles_info).reindex( - columns=[ - "artist", - "title", - "url", - "label", - "release_date", - "origin", - "item_number", - "ean", - "delivery_info", - "price", - ] - ) - articles_df["_artist"] = articles_df["artist"].map(clean) - articles_df["_price"] = articles_df["price"].map(lambda x: float(x.split(" ")[-1])) - articles_df["_date"] = datetime.now() - - return articles_df diff --git a/apps/vinyl/src/plato/scrape.py b/apps/vinyl/src/plato/scrape.py old mode 100644 new mode 100755 index 231d5d1..d05c2cb --- a/apps/vinyl/src/plato/scrape.py +++ b/apps/vinyl/src/plato/scrape.py @@ -1,21 +1,61 @@ +import re +from datetime import datetime + +import pandas as pd import requests from bs4 import BeautifulSoup +def scrape_plato(get=None): + ic() + url = "https://www.platomania.nl/vinyl-aanbiedingen?page=1" + + ic(url) + soup = get_soup(url=url, get=get) + articles_info = scrape_page(soup) + ic(len(articles_info)) + + links = sorted(set(scrape_page_links(soup)), key=lambda x: int(x.split("=")[-1])) + for link in links: + ic(link) + soup = get_soup(url=link, get=get) + tmp = scrape_page(soup) + ic(len(tmp)) + articles_info.extend(tmp) + + def clean(name): + tmp = " ".join(reversed(name.split(", "))) + tmp = tmp.lower() + tmp = re.sub(r"\s+\([^)]*\)", "", tmp) + return tmp + + articles_df = pd.DataFrame(articles_info).reindex( + columns=[ + "artist", + "title", + "url", + "label", + "release_date", + "origin", + "item_number", + "ean", + "delivery_info", + "price", + ] + ) + articles_df["_artist"] = articles_df["artist"].map(clean) + articles_df["_price"] = articles_df["price"].map(lambda x: float(x.split(" ")[-1])) + articles_df["_date"] = datetime.now() + + return articles_df + + def get_soup(url, get=None): - # Send a GET request to the specified URL if get is None: get = requests.get response = get(url) - - # Check if the request was successful - if response.status_code == 200: - # Parse the HTML content of the page - return BeautifulSoup(response.content, "html.parser") - else: - raise ValueError( - f"Failed to retrieve the page. Status code: {response.status_code}" - ) + response.raise_for_status() + return BeautifulSoup(response.content, "html.parser") def scrape_page_links(soup): diff --git a/apps/vinyl/src/sounds/deals.py b/apps/vinyl/src/sounds/deals.py deleted file mode 100644 index 0fd9b03..0000000 --- a/apps/vinyl/src/sounds/deals.py +++ /dev/null @@ -1,80 +0,0 @@ -#!/usr/bin/python3 - -import glob -import os -from datetime import datetime - -import pandas as pd - - -def get_csvs(directory, n): - # List all files matching the pattern *_sounds.csv - suffix = "_sounds.csv" - files = glob.glob(os.path.join(directory, f"*{suffix}")) - - # Function to extract date from filename - def extract_date_from_filename(filename): - # Extract the date string - basename = os.path.basename(filename) - date_str = basename.split(suffix)[0] - try: - return datetime.strptime(date_str, "%Y-%m-%d_%H:%M:%S") - except ValueError: - # The date string cannot be parsed - return None - - # Create a list of tuples (date, filename), ignoring files with unparsable dates - result = [(extract_date_from_filename(file), file) for file in files] - result = [item for item in result if item[0] is not None] - - # Sort the list by date in descending order (most recent first) - result.sort(key=lambda x: x[0], reverse=True) - - # Return the two most recent files - return [x[1] for x in result[:n]] - - -def analyze(df1, df2): - df1 = df1.drop_duplicates(subset="id") - df2 = df2.drop_duplicates(subset="id") - combined_df = pd.merge( - df1[["id", "price"]], df2, on="id", how="right", indicator=True - ) - combined_df["discount"] = combined_df.price_y - combined_df.price_x - combined_df.drop(columns=["price_x"], inplace=True) - combined_df.rename(columns={"price_y": "price"}, inplace=True) - - deals = combined_df.query("discount < 0").sort_values(by="discount")[ - ["id", "name", "price", "discount"] - ] - new = combined_df.query("_merge == 'right_only'").sort_values(by="price")[ - ["id", "name", "price"] - ] - return deals, new - - -if __name__ == "__main__": - csvs = get_csvs(".", 100) - - for i in range(1, len(csvs)): - print(f"Comparing {csvs[i]} with {csvs[0]}") - df_previous = pd.read_csv(csvs[i], index_col=0) - df_latest = pd.read_csv(csvs[0], index_col=0) - deals, new = analyze(df_previous, df_latest) - - done = False - - if len(deals) > 0: - print() - print("New items:") - print(new) - print() - done = True - - if len(deals) > 0: - print("Discounted items:") - print(deals) - done = True - - if done: - break diff --git a/apps/vinyl/src/sounds/fetch.py b/apps/vinyl/src/sounds/scrape.py similarity index 81% rename from apps/vinyl/src/sounds/fetch.py rename to apps/vinyl/src/sounds/scrape.py index 219d1cb..fd98333 100644 --- a/apps/vinyl/src/sounds/fetch.py +++ b/apps/vinyl/src/sounds/scrape.py @@ -1,7 +1,4 @@ -#!/usr/bin/python3 - import time -from datetime import datetime import pandas as pd import requests @@ -74,7 +71,7 @@ def parse_page(html_content): ) -def fetch_deals(): +def scrape_sounds(): # Get page count page_count = get_page_count( requests.get("https://www.sounds.nl/uitverkoop/1/lp/all/art").text @@ -86,25 +83,11 @@ def fetch_deals(): base_url = "https://www.sounds.nl/uitverkoop/{page_number}/lp/all" dfs = [] for i in tqdm(range(page_count)): - df = parse_page(requests.get(base_url.format(page_number=i)).text) + response = requests.get(base_url.format(page_number=i)) + response.raise_for_status() + df = parse_page(response.text) dfs.append(df) time.sleep(2) # Combine dfs return pd.concat(dfs) if dfs else pd.DataFrame(columns=["id", "name", "price"]) - - -if __name__ == "__main__": - df = fetch_deals() - print(f"Found {len(df)} deals") - - # Show current deals - print(df.sort_values(by="price").head(10)) - - # Write to file - now = datetime.now() - prefix = now.strftime("%Y-%m-%d_%H:%M:%S") - directory = "/home/bram/src/python" - filepath = f"{directory}/{prefix}_sounds.csv" - print(f"Writing data to {filepath}") - df.to_csv(filepath)