diff --git a/apps/vinyl/src/assets.py b/apps/vinyl/src/assets.py
index 9c0bc0d..4754f8f 100644
--- a/apps/vinyl/src/assets.py
+++ b/apps/vinyl/src/assets.py
@@ -12,11 +12,11 @@ from dagster_polars.patito import patito_model_to_dagster_type
from jinja2 import Environment, FileSystemLoader
from models import Deal
from partitions import daily_partitions_def, multi_partitions_def
-from plato.fetch import scrape_plato
from plato.parse import parse as parse_plato
+from plato.scrape import scrape_plato
from shared.utils import get_partition_keys, parse_partition_keys
-from sounds.fetch import fetch_deals
from sounds.parse import parse as parse_sounds
+from sounds.scrape import scrape_sounds
from utils.email import EmailService
import dagster as dg
@@ -71,9 +71,9 @@ def deals(context: dg.AssetExecutionContext) -> pl.DataFrame:
return pl.from_pandas(df.assign(**partition_key))
if source == "sounds":
logger.info("Scraping Sounds")
- df = fetch_deals()
- ic(df.columns)
+ df = scrape_sounds()
logger.info("Scraped Sounds", rows=len(df), head=df.head().to_markdown())
+ ic(df.columns)
return pl.from_pandas(df.assign(**partition_key))
return pl.DataFrame(
diff --git a/apps/vinyl/src/plato/deals.py b/apps/vinyl/src/plato/deals.py
deleted file mode 100755
index 1f5c348..0000000
--- a/apps/vinyl/src/plato/deals.py
+++ /dev/null
@@ -1,154 +0,0 @@
-import os
-
-import boto3
-import pandas as pd
-from botocore.exceptions import NoCredentialsError, PartialCredentialsError
-from dotenv import load_dotenv
-from fetch import scrape_plato
-from utils import get
-
-
-def update_database(articles_df=None, database_file="/home/user/plato.parquet"):
- if os.path.exists(database_file):
- database_df = pd.read_parquet(database_file)
- else:
- database_df = None
-
- if articles_df is None:
- new_df = None if database_df is None else database_df.head(0)
- else:
- if database_df is None:
- articles_df.to_parquet(database_file)
- return articles_df, articles_df
-
- compare = ["ean", "_price"]
- check_df = pd.merge(
- database_df[compare], articles_df[compare], how="right", indicator=True
- )
- new_df = (
- check_df[check_df["_merge"] == "right_only"]
- .drop(columns="_merge")
- .merge(articles_df)
- )
- database_df = (
- pd.concat([database_df, new_df])
- .sort_values("_date")
- .groupby("ean")
- .last()
- .reset_index()
- )
- database_df.to_parquet(database_file)
-
- return database_df, new_df
-
-
-def send_email(lines):
- # Define the email parameters
- SENDER = "mail@veenboer.xyz"
- RECIPIENT = "rik.veenboer@gmail.com"
- SUBJECT = "Aanbieding op plato!"
-
- # The email body for recipients with non-HTML email clients
- BODY_TEXT = ""
-
- # The HTML body of the email
- tmp = "\n".join(lines)
- BODY_HTML = f"""
-
-
- {tmp}
-
- """
-
- # The character encoding for the email
- CHARSET = "UTF-8"
-
- # Try to send the email
- try:
- client = boto3.client(
- "ses", region_name="eu-west-1"
- ) # Change the region as needed
-
- # Provide the contents of the email
- response = client.send_email(
- Destination={
- "ToAddresses": [
- RECIPIENT,
- ],
- },
- Message={
- "Body": {
- "Html": {
- "Charset": CHARSET,
- "Data": BODY_HTML,
- },
- "Text": {
- "Charset": CHARSET,
- "Data": BODY_TEXT,
- },
- },
- "Subject": {
- "Charset": CHARSET,
- "Data": SUBJECT,
- },
- },
- Source=SENDER,
- )
- # Display an error if something goes wrong.
- except NoCredentialsError:
- print("Credentials not available")
- except PartialCredentialsError:
- print("Incomplete credentials provided")
- except Exception as e:
- print(f"Error: {e}")
- else:
- print("Email sent! Message ID:"),
- print(response["MessageId"])
-
-
-def main(dry=False):
- load_dotenv("/opt/.env")
-
- local_ip = get("http://ifconfig.me", False).text
- get_ip = get("http://ifconfig.me").text
- print(f"Local IP = {local_ip}")
- print(f"Request IP = {get_ip}")
- assert local_ip != get_ip
-
- artists = open("/home/user/artists.txt").read().strip().splitlines()
- print(f"Number of known artists = {len(artists)}")
-
- if dry:
- articles_df = None
- else:
- articles_df = scrape_plato(get=get)
- database_df, new_df = update_database(articles_df)
-
- if dry:
- new_df = database_df.sample(20)
-
- print(f"Database size = {len(database_df)}")
- print(f"New = {len(new_df)}")
-
- # new_df = new_df[new_df['_artist'].isin(artists)].query('_price <= 25')
- new_df = new_df.query('_price <= 25 and ean != ""')
- print(f"Interesting = {len(new_df)}")
-
- if new_df is not None and len(new_df):
- message = []
- for _, row in new_df.head(10).iterrows():
- message.append(
- f'NEW
'
- )
- message.append("")
- message.append(f"- [artist] {row.artist}
")
- message.append(f"- [title] {row.title}
")
- message.append(f"- [price] {row.price}
")
- message.append(f"- [release] {row.release_date}
")
- message.append("
")
- send_email(message)
-
-
-if __name__ == "__main__":
- cwd = os.path.dirname(__file__)
- main(dry=False)
diff --git a/apps/vinyl/src/plato/fetch.py b/apps/vinyl/src/plato/fetch.py
deleted file mode 100755
index f574572..0000000
--- a/apps/vinyl/src/plato/fetch.py
+++ /dev/null
@@ -1,52 +0,0 @@
-#!/root/.pyenv/versions/dev/bin/python
-
-import re
-from datetime import datetime
-
-import pandas as pd
-
-from .scrape import get_soup, scrape_page, scrape_page_links
-
-
-def scrape_plato(get=None):
- ic()
- url = "https://www.platomania.nl/vinyl-aanbiedingen?page=1"
-
- ic(url)
- soup = get_soup(url=url, get=get)
- articles_info = scrape_page(soup)
- ic(len(articles_info))
-
- links = sorted(set(scrape_page_links(soup)), key=lambda x: int(x.split("=")[-1]))
- for link in links:
- ic(link)
- soup = get_soup(url=link, get=get)
- tmp = scrape_page(soup)
- ic(len(tmp))
- articles_info.extend(tmp)
-
- def clean(name):
- tmp = " ".join(reversed(name.split(", ")))
- tmp = tmp.lower()
- tmp = re.sub(r"\s+\([^)]*\)", "", tmp)
- return tmp
-
- articles_df = pd.DataFrame(articles_info).reindex(
- columns=[
- "artist",
- "title",
- "url",
- "label",
- "release_date",
- "origin",
- "item_number",
- "ean",
- "delivery_info",
- "price",
- ]
- )
- articles_df["_artist"] = articles_df["artist"].map(clean)
- articles_df["_price"] = articles_df["price"].map(lambda x: float(x.split(" ")[-1]))
- articles_df["_date"] = datetime.now()
-
- return articles_df
diff --git a/apps/vinyl/src/plato/scrape.py b/apps/vinyl/src/plato/scrape.py
old mode 100644
new mode 100755
index 231d5d1..d05c2cb
--- a/apps/vinyl/src/plato/scrape.py
+++ b/apps/vinyl/src/plato/scrape.py
@@ -1,21 +1,61 @@
+import re
+from datetime import datetime
+
+import pandas as pd
import requests
from bs4 import BeautifulSoup
+def scrape_plato(get=None):
+ ic()
+ url = "https://www.platomania.nl/vinyl-aanbiedingen?page=1"
+
+ ic(url)
+ soup = get_soup(url=url, get=get)
+ articles_info = scrape_page(soup)
+ ic(len(articles_info))
+
+ links = sorted(set(scrape_page_links(soup)), key=lambda x: int(x.split("=")[-1]))
+ for link in links:
+ ic(link)
+ soup = get_soup(url=link, get=get)
+ tmp = scrape_page(soup)
+ ic(len(tmp))
+ articles_info.extend(tmp)
+
+ def clean(name):
+ tmp = " ".join(reversed(name.split(", ")))
+ tmp = tmp.lower()
+ tmp = re.sub(r"\s+\([^)]*\)", "", tmp)
+ return tmp
+
+ articles_df = pd.DataFrame(articles_info).reindex(
+ columns=[
+ "artist",
+ "title",
+ "url",
+ "label",
+ "release_date",
+ "origin",
+ "item_number",
+ "ean",
+ "delivery_info",
+ "price",
+ ]
+ )
+ articles_df["_artist"] = articles_df["artist"].map(clean)
+ articles_df["_price"] = articles_df["price"].map(lambda x: float(x.split(" ")[-1]))
+ articles_df["_date"] = datetime.now()
+
+ return articles_df
+
+
def get_soup(url, get=None):
- # Send a GET request to the specified URL
if get is None:
get = requests.get
response = get(url)
-
- # Check if the request was successful
- if response.status_code == 200:
- # Parse the HTML content of the page
- return BeautifulSoup(response.content, "html.parser")
- else:
- raise ValueError(
- f"Failed to retrieve the page. Status code: {response.status_code}"
- )
+ response.raise_for_status()
+ return BeautifulSoup(response.content, "html.parser")
def scrape_page_links(soup):
diff --git a/apps/vinyl/src/sounds/deals.py b/apps/vinyl/src/sounds/deals.py
deleted file mode 100644
index 0fd9b03..0000000
--- a/apps/vinyl/src/sounds/deals.py
+++ /dev/null
@@ -1,80 +0,0 @@
-#!/usr/bin/python3
-
-import glob
-import os
-from datetime import datetime
-
-import pandas as pd
-
-
-def get_csvs(directory, n):
- # List all files matching the pattern *_sounds.csv
- suffix = "_sounds.csv"
- files = glob.glob(os.path.join(directory, f"*{suffix}"))
-
- # Function to extract date from filename
- def extract_date_from_filename(filename):
- # Extract the date string
- basename = os.path.basename(filename)
- date_str = basename.split(suffix)[0]
- try:
- return datetime.strptime(date_str, "%Y-%m-%d_%H:%M:%S")
- except ValueError:
- # The date string cannot be parsed
- return None
-
- # Create a list of tuples (date, filename), ignoring files with unparsable dates
- result = [(extract_date_from_filename(file), file) for file in files]
- result = [item for item in result if item[0] is not None]
-
- # Sort the list by date in descending order (most recent first)
- result.sort(key=lambda x: x[0], reverse=True)
-
- # Return the two most recent files
- return [x[1] for x in result[:n]]
-
-
-def analyze(df1, df2):
- df1 = df1.drop_duplicates(subset="id")
- df2 = df2.drop_duplicates(subset="id")
- combined_df = pd.merge(
- df1[["id", "price"]], df2, on="id", how="right", indicator=True
- )
- combined_df["discount"] = combined_df.price_y - combined_df.price_x
- combined_df.drop(columns=["price_x"], inplace=True)
- combined_df.rename(columns={"price_y": "price"}, inplace=True)
-
- deals = combined_df.query("discount < 0").sort_values(by="discount")[
- ["id", "name", "price", "discount"]
- ]
- new = combined_df.query("_merge == 'right_only'").sort_values(by="price")[
- ["id", "name", "price"]
- ]
- return deals, new
-
-
-if __name__ == "__main__":
- csvs = get_csvs(".", 100)
-
- for i in range(1, len(csvs)):
- print(f"Comparing {csvs[i]} with {csvs[0]}")
- df_previous = pd.read_csv(csvs[i], index_col=0)
- df_latest = pd.read_csv(csvs[0], index_col=0)
- deals, new = analyze(df_previous, df_latest)
-
- done = False
-
- if len(deals) > 0:
- print()
- print("New items:")
- print(new)
- print()
- done = True
-
- if len(deals) > 0:
- print("Discounted items:")
- print(deals)
- done = True
-
- if done:
- break
diff --git a/apps/vinyl/src/sounds/fetch.py b/apps/vinyl/src/sounds/scrape.py
similarity index 81%
rename from apps/vinyl/src/sounds/fetch.py
rename to apps/vinyl/src/sounds/scrape.py
index 219d1cb..fd98333 100644
--- a/apps/vinyl/src/sounds/fetch.py
+++ b/apps/vinyl/src/sounds/scrape.py
@@ -1,7 +1,4 @@
-#!/usr/bin/python3
-
import time
-from datetime import datetime
import pandas as pd
import requests
@@ -74,7 +71,7 @@ def parse_page(html_content):
)
-def fetch_deals():
+def scrape_sounds():
# Get page count
page_count = get_page_count(
requests.get("https://www.sounds.nl/uitverkoop/1/lp/all/art").text
@@ -86,25 +83,11 @@ def fetch_deals():
base_url = "https://www.sounds.nl/uitverkoop/{page_number}/lp/all"
dfs = []
for i in tqdm(range(page_count)):
- df = parse_page(requests.get(base_url.format(page_number=i)).text)
+ response = requests.get(base_url.format(page_number=i))
+ response.raise_for_status()
+ df = parse_page(response.text)
dfs.append(df)
time.sleep(2)
# Combine dfs
return pd.concat(dfs) if dfs else pd.DataFrame(columns=["id", "name", "price"])
-
-
-if __name__ == "__main__":
- df = fetch_deals()
- print(f"Found {len(df)} deals")
-
- # Show current deals
- print(df.sort_values(by="price").head(10))
-
- # Write to file
- now = datetime.now()
- prefix = now.strftime("%Y-%m-%d_%H:%M:%S")
- directory = "/home/bram/src/python"
- filepath = f"{directory}/{prefix}_sounds.csv"
- print(f"Writing data to {filepath}")
- df.to_csv(filepath)