This commit is contained in:
2025-08-16 13:48:34 +02:00
parent 2b4e34ec2f
commit a0a0bbd110
6 changed files with 58 additions and 321 deletions

View File

@@ -1,154 +0,0 @@
import os
import boto3
import pandas as pd
from botocore.exceptions import NoCredentialsError, PartialCredentialsError
from dotenv import load_dotenv
from fetch import scrape_plato
from utils import get
def update_database(articles_df=None, database_file="/home/user/plato.parquet"):
if os.path.exists(database_file):
database_df = pd.read_parquet(database_file)
else:
database_df = None
if articles_df is None:
new_df = None if database_df is None else database_df.head(0)
else:
if database_df is None:
articles_df.to_parquet(database_file)
return articles_df, articles_df
compare = ["ean", "_price"]
check_df = pd.merge(
database_df[compare], articles_df[compare], how="right", indicator=True
)
new_df = (
check_df[check_df["_merge"] == "right_only"]
.drop(columns="_merge")
.merge(articles_df)
)
database_df = (
pd.concat([database_df, new_df])
.sort_values("_date")
.groupby("ean")
.last()
.reset_index()
)
database_df.to_parquet(database_file)
return database_df, new_df
def send_email(lines):
# Define the email parameters
SENDER = "mail@veenboer.xyz"
RECIPIENT = "rik.veenboer@gmail.com"
SUBJECT = "Aanbieding op plato!"
# The email body for recipients with non-HTML email clients
BODY_TEXT = ""
# The HTML body of the email
tmp = "\n".join(lines)
BODY_HTML = f"""<html>
<head></head>
<body>
{tmp}
</html>
"""
# The character encoding for the email
CHARSET = "UTF-8"
# Try to send the email
try:
client = boto3.client(
"ses", region_name="eu-west-1"
) # Change the region as needed
# Provide the contents of the email
response = client.send_email(
Destination={
"ToAddresses": [
RECIPIENT,
],
},
Message={
"Body": {
"Html": {
"Charset": CHARSET,
"Data": BODY_HTML,
},
"Text": {
"Charset": CHARSET,
"Data": BODY_TEXT,
},
},
"Subject": {
"Charset": CHARSET,
"Data": SUBJECT,
},
},
Source=SENDER,
)
# Display an error if something goes wrong.
except NoCredentialsError:
print("Credentials not available")
except PartialCredentialsError:
print("Incomplete credentials provided")
except Exception as e:
print(f"Error: {e}")
else:
print("Email sent! Message ID:"),
print(response["MessageId"])
def main(dry=False):
load_dotenv("/opt/.env")
local_ip = get("http://ifconfig.me", False).text
get_ip = get("http://ifconfig.me").text
print(f"Local IP = {local_ip}")
print(f"Request IP = {get_ip}")
assert local_ip != get_ip
artists = open("/home/user/artists.txt").read().strip().splitlines()
print(f"Number of known artists = {len(artists)}")
if dry:
articles_df = None
else:
articles_df = scrape_plato(get=get)
database_df, new_df = update_database(articles_df)
if dry:
new_df = database_df.sample(20)
print(f"Database size = {len(database_df)}")
print(f"New = {len(new_df)}")
# new_df = new_df[new_df['_artist'].isin(artists)].query('_price <= 25')
new_df = new_df.query('_price <= 25 and ean != ""')
print(f"Interesting = {len(new_df)}")
if new_df is not None and len(new_df):
message = []
for _, row in new_df.head(10).iterrows():
message.append(
f'<a href="https://www.platomania.nl{row.url}"><h1>NEW</h1></a>'
)
message.append("<ul>")
message.append(f"<li>[artist] {row.artist}</li>")
message.append(f"<li>[title] {row.title}</li>")
message.append(f"<li>[price] {row.price}</li>")
message.append(f"<li>[release] {row.release_date}</li>")
message.append("</ul>")
send_email(message)
if __name__ == "__main__":
cwd = os.path.dirname(__file__)
main(dry=False)

View File

@@ -1,52 +0,0 @@
#!/root/.pyenv/versions/dev/bin/python
import re
from datetime import datetime
import pandas as pd
from .scrape import get_soup, scrape_page, scrape_page_links
def scrape_plato(get=None):
ic()
url = "https://www.platomania.nl/vinyl-aanbiedingen?page=1"
ic(url)
soup = get_soup(url=url, get=get)
articles_info = scrape_page(soup)
ic(len(articles_info))
links = sorted(set(scrape_page_links(soup)), key=lambda x: int(x.split("=")[-1]))
for link in links:
ic(link)
soup = get_soup(url=link, get=get)
tmp = scrape_page(soup)
ic(len(tmp))
articles_info.extend(tmp)
def clean(name):
tmp = " ".join(reversed(name.split(", ")))
tmp = tmp.lower()
tmp = re.sub(r"\s+\([^)]*\)", "", tmp)
return tmp
articles_df = pd.DataFrame(articles_info).reindex(
columns=[
"artist",
"title",
"url",
"label",
"release_date",
"origin",
"item_number",
"ean",
"delivery_info",
"price",
]
)
articles_df["_artist"] = articles_df["artist"].map(clean)
articles_df["_price"] = articles_df["price"].map(lambda x: float(x.split(" ")[-1]))
articles_df["_date"] = datetime.now()
return articles_df

60
apps/vinyl/src/plato/scrape.py Normal file → Executable file
View File

@@ -1,21 +1,61 @@
import re
from datetime import datetime
import pandas as pd
import requests
from bs4 import BeautifulSoup
def scrape_plato(get=None):
ic()
url = "https://www.platomania.nl/vinyl-aanbiedingen?page=1"
ic(url)
soup = get_soup(url=url, get=get)
articles_info = scrape_page(soup)
ic(len(articles_info))
links = sorted(set(scrape_page_links(soup)), key=lambda x: int(x.split("=")[-1]))
for link in links:
ic(link)
soup = get_soup(url=link, get=get)
tmp = scrape_page(soup)
ic(len(tmp))
articles_info.extend(tmp)
def clean(name):
tmp = " ".join(reversed(name.split(", ")))
tmp = tmp.lower()
tmp = re.sub(r"\s+\([^)]*\)", "", tmp)
return tmp
articles_df = pd.DataFrame(articles_info).reindex(
columns=[
"artist",
"title",
"url",
"label",
"release_date",
"origin",
"item_number",
"ean",
"delivery_info",
"price",
]
)
articles_df["_artist"] = articles_df["artist"].map(clean)
articles_df["_price"] = articles_df["price"].map(lambda x: float(x.split(" ")[-1]))
articles_df["_date"] = datetime.now()
return articles_df
def get_soup(url, get=None):
# Send a GET request to the specified URL
if get is None:
get = requests.get
response = get(url)
# Check if the request was successful
if response.status_code == 200:
# Parse the HTML content of the page
return BeautifulSoup(response.content, "html.parser")
else:
raise ValueError(
f"Failed to retrieve the page. Status code: {response.status_code}"
)
response.raise_for_status()
return BeautifulSoup(response.content, "html.parser")
def scrape_page_links(soup):