initial commit

This commit is contained in:
2024-10-14 09:58:24 +02:00
commit 0196f8bd27
35 changed files with 2005 additions and 0 deletions

0
src/__init__.py Normal file
View File

3
src/app/__init__.py Normal file
View File

@@ -0,0 +1,3 @@
from icecream import install
install()

View File

View File

@@ -0,0 +1,97 @@
import polars as pl
from dagster import (
AssetIn,
DailyPartitionsDefinition,
DimensionPartitionMapping,
IdentityPartitionMapping,
MultiPartitionMapping,
MultiPartitionsDefinition,
StaticPartitionsDefinition,
TimeWindowPartitionMapping,
asset,
)
partitions_def_single = DailyPartitionsDefinition(start_date="2024-09-20")
partitions_def_multi = MultiPartitionsDefinition(
{
"date": DailyPartitionsDefinition(start_date="2024-09-20"),
"source": StaticPartitionsDefinition(["plato", "sounds"]),
}
)
@asset(
io_manager_key="polars_parquet_io_manager",
partitions_def=partitions_def_single,
metadata={
"partition_by": ["date"],
},
)
def asset_single_1(context):
ic()
ic(context.partition_key)
return pl.DataFrame(
[{"date": context.partition_key, "data": f"Data for {context.partition_key}"}]
)
@asset(
io_manager_key="polars_parquet_io_manager",
partitions_def=partitions_def_multi,
metadata={
"partition_by": ["date", "source"],
},
)
def asset_multi_1(context):
ic()
ic(context.partition_key)
return pl.DataFrame(
[{"date": context.partition_key, "data": f"Data for {context.partition_key}"}]
)
@asset(
partitions_def=partitions_def_single,
ins={
"asset_single_1": AssetIn(
partition_mapping=TimeWindowPartitionMapping(start_offset=-1, end_offset=0)
)
},
)
def asset_single_2(context, asset_single_1):
ic()
ic(context.partition_key)
ic(asset_single_1.keys())
partition_key = context.asset_partition_key_for_output()
return f"Processed data for {partition_key}"
partition_mapping = MultiPartitionMapping(
{
"date": DimensionPartitionMapping(
dimension_name="date",
partition_mapping=TimeWindowPartitionMapping(start_offset=-1, end_offset=0),
),
"source": DimensionPartitionMapping(
dimension_name="source",
partition_mapping=IdentityPartitionMapping(),
),
}
)
@asset(
partitions_def=partitions_def_multi,
ins={"asset_multi_1": AssetIn(partition_mapping=partition_mapping)},
)
def asset_multi_2(context, asset_multi_1):
ic()
ic(context.partition_key)
ic(context.partition_key.keys_by_dimension)
ic(asset_multi_1)
partition_key = context.asset_partition_key_for_output()
ic(partition_key)
return f"Processed data for {partition_key}"

View File

@@ -0,0 +1,108 @@
from datetime import datetime
from typing import Optional
from dagster import MultiPartitionKey, PartitionMapping, PartitionsDefinition
from dagster._core.definitions.partition import PartitionsSubset
from dagster._core.definitions.partition_mapping import (
MultiPartitionMapping,
UpstreamPartitionsResult,
)
from dagster._core.instance import DynamicPartitionsStore
from dagster._serdes import whitelist_for_serdes
# @whitelist_for_serdes
class LatestTwoPartitionsMapping(PartitionMapping):
def get_upstream_mapped_partitions_result_for_partitions(
self,
downstream_partitions_subset: Optional[PartitionsSubset],
downstream_partitions_def: Optional[PartitionsDefinition],
upstream_partitions_def: PartitionsDefinition,
current_time: Optional[datetime] = None,
dynamic_partitions_store: Optional[DynamicPartitionsStore] = None,
) -> UpstreamPartitionsResult:
ic()
# Get upstream partitions from the subset
all_partitions = upstream_partitions_def.get_partition_keys()
ic(all_partitions)
if len(all_partitions) < 2:
raise ValueError("Not enough partitions to proceed.")
# Select the last two partitions
partition_keys = [all_partitions[-2], all_partitions[-1]]
return UpstreamPartitionsResult(
upstream_partitions_def.subset_with_partition_keys(partition_keys), []
)
def get_downstream_partitions_for_partitions(
self,
upstream_partitions_subset: PartitionsSubset,
downstream_partitions_def,
upstream_partitions_def,
) -> PartitionsSubset:
ic()
# Get the downstream partition that corresponds to the latest upstream partition
downstream_partition_key = upstream_partitions_subset.get_partition_keys()[-1]
return downstream_partitions_def.subset_with_partition_keys(
[downstream_partition_key]
)
@property
def description(self):
return "Maps to the latest two upstream partitions."
@whitelist_for_serdes
class X(MultiPartitionMapping):
def get_upstream_partitions_for_partition_range(
self,
downstream_partition_range,
upstream_partitions_def,
downstream_partitions_def,
) -> UpstreamPartitionsResult:
ic()
# Extract downstream partition range keys
downstream_keys = downstream_partition_range.get_partition_keys()
# Initialize a list to hold the upstream partition keys
upstream_keys = []
# Iterate over each downstream partition key
for downstream_key in downstream_keys:
# Parse the MultiPartitionKey
downstream_mpk = MultiPartitionKey.from_str(downstream_key)
for i in [1, 2]:
# Shift the daily partition by one day
shifted_date = datetime.strptime(
downstream_mpk.keys_by_dimension["date"], "%Y-%m-%d"
) - timedelta(days=i)
# Recreate the MultiPartitionKey with the shifted daily partition
upstream_mpk = MultiPartitionKey(
{
"source": downstream_mpk.keys_by_dimension["source"],
"date": shifted_date.strftime("%Y-%m-%d"),
}
)
# Add the upstream partition key
upstream_keys.append(upstream_mpk.to_string())
return UpstreamPartitionsResult(
upstream_partitions_def.subset_with_partition_keys(upstream_keys), []
)
def get_downstream_partitions_for_partition_range(
self,
upstream_partition_range,
downstream_partitions_def,
upstream_partitions_def,
) -> PartitionsSubset:
# This method would map upstream partitions back to downstream, but for simplicity, let's assume it's symmetric.
return self.get_upstream_partitions_for_partition_range(
upstream_partition_range, upstream_partitions_def, downstream_partitions_def
)

View File

@@ -0,0 +1,13 @@
from dagster import Definitions, define_asset_job
from dagster_polars import PolarsParquetIOManager
from .assets import asset_multi_1, asset_multi_2, asset_single_1, asset_single_2
# Define a job that includes both assets
daily_job = define_asset_job("daily_job", selection=[asset_multi_1, asset_multi_2])
vinyl = Definitions(
assets=[asset_single_1, asset_multi_1, asset_single_2, asset_multi_2],
resources={"polars_parquet_io_manager": PolarsParquetIOManager()},
jobs=[daily_job],
)

View File

@@ -0,0 +1,55 @@
from dagster import materialize
from dagster_polars import PolarsParquetIOManager
from app.vinyl.assets import (
asset_multi_1,
asset_multi_2,
asset_single_1,
asset_single_2,
)
resources = {
"polars_parquet_io_manager": PolarsParquetIOManager(
base_dir="/opt/dagster/storage"
)
}
def test_single():
result = materialize(
[asset_single_1, asset_single_2],
partition_key="2024-10-02",
resources=resources,
)
assert result.success
ic(result.asset_value)
def test_multi():
# result = materialize([
# asset_multi_1
# ], partition_key="2024-10-01|plato", resources=resources
# )
# assert result.success
# ic(result.asset_value)
#
#
# result = materialize([
# asset_multi_1
# ], partition_key="2024-10-02|plato", resources=resources
# )
# assert result.success
# ic(result.asset_value)
result = materialize(
[asset_multi_1, asset_multi_2],
partition_key="2024-10-02|plato",
resources=resources,
)
assert result.success
ic(result.asset_value)
if __name__ == "__main__":
# test_single()
test_multi()

7
src/app/sync.sh Normal file
View File

@@ -0,0 +1,7 @@
#!/usr/bin/env bash
rsync -av /opt/dagster/src/app/vinyl/ \
/Volumes/dagster/src/app/vinyl/ \
--include='*.py' \
--exclude='__pycache__/' \
-progress \
--delete $*

34
src/app/test.py Normal file
View File

@@ -0,0 +1,34 @@
import time
from dagster import AssetMaterialization, Output, config_mapping, job, op
@op(config_schema={"config_param": str})
def hello(context):
time.sleep(1)
print("halllo")
return Output(123, metadata={"aa": context.op_config["config_param"]})
@op
def goodbye(context, x: int):
time.sleep(2)
print("doooei", x)
context.log_event(
AssetMaterialization(
asset_key="my_asset",
metadata={"my_meta": 444},
description="A very useful value!",
)
)
return 2
@config_mapping(config_schema={"simplified_param": str})
def simplified_config(val):
return {"ops": {"hello": {"config": {"config_param": val["simplified_param"]}}}}
@job
def my_job():
goodbye(hello())

View File

115
src/app/vinyl/assets.py Normal file
View File

@@ -0,0 +1,115 @@
from datetime import datetime
from glob import glob
import polars as pl
import structlog
from dagster import (
AssetIn,
DailyPartitionsDefinition,
DimensionPartitionMapping,
IdentityPartitionMapping,
MultiPartitionMapping,
MultiPartitionsDefinition,
StaticPartitionsDefinition,
TimeWindowPartitionMapping,
asset, Failure, Field, )
from app.vinyl.plato.check_plato import scrape_plato
from app.vinyl.sounds.fetch import fetch_deals
SOURCES = ["plato", "sounds"]
logger = structlog.get_logger()
partitions_def = MultiPartitionsDefinition(
{
"date": DailyPartitionsDefinition(start_date="2024-09-01", end_offset=1),
"source": StaticPartitionsDefinition(SOURCES),
}
)
partition_mapping = MultiPartitionMapping(
{
"date": DimensionPartitionMapping(
dimension_name="date",
partition_mapping=TimeWindowPartitionMapping(start_offset=-1, end_offset=0),
),
"source": DimensionPartitionMapping(
dimension_name="source",
partition_mapping=IdentityPartitionMapping(),
),
}
)
@asset(
io_manager_key="polars_parquet_io_manager",
partitions_def=partitions_def,
metadata={
"partition_by": ["date", "source"],
},
config_schema={"import_dir": Field(str, default_value="/opt/dagster/home/storage/import")},
)
def deals(context):
ic()
ic(context.partition_key)
ic(context.op_config)
import_dir = context.op_config["import_dir"]
partition_key = context.partition_key.keys_by_dimension
date_str = partition_key["date"]
source = partition_key["source"]
logger.info("Materializing deals", date=partition_key["date"], source=source)
date = datetime.strptime(partition_key["date"], "%Y-%m-%d")
days = (date - datetime.today()).days
ic(days)
if days > 0:
raise Failure(f"Cannot materialize for the future: {date.date()}")
if days < -1:
if source == "sounds":
pattern = f"{import_dir}/{date.date()}_*_sounds.csv"
logger.info("Looking for existing CSV files", pattern=pattern)
files = glob(pattern)
if len(files):
file = sorted(files)[-1]
logger.info("Using existing CSV file", file=file)
try:
df = pl.read_csv(file)[["id", "name", "price"]]
logger.info("Loaded CSV file", rows=len(df))
return df.with_columns(**{k: pl.lit(v) for k, v in partition_key.items()})
except Exception as e:
logger.error("Failed to load CSV file!", error=e)
raise Failure(f"Cannot materialize for the past: {date.date()}")
if source == "plato":
logger.info("Scraping Plato")
df = scrape_plato()
logger.info("Scraped Plato", rows=len(df), head=df.head().to_markdown())
ic(df.columns)
return pl.from_pandas(df.assign(**partition_key))
if source == "sounds":
logger.info("Scraping Sounds")
df = fetch_deals()
ic(df.columns)
df = df[["id", "name", "price"]]
logger.info("Scraped Sounds", rows=len(df), head=df.head().to_markdown())
return pl.from_pandas(df.assign(**partition_key))
return pl.DataFrame(
[{"date": context.partition_key, "data": f"Data for {context.partition_key}"}]
)
@asset(
partitions_def=partitions_def,
ins={"asset_multi_1": AssetIn(partition_mapping=partition_mapping)},
)
def new_deals(context, asset_multi_1):
ic()
ic(context.partition_key)
ic(context.partition_key.keys_by_dimension)
ic(asset_multi_1)
partition_key = context.asset_partition_key_for_output()
ic(partition_key)
return f"Processed data for {partition_key}"

39
src/app/vinyl/jobs.py Normal file
View File

@@ -0,0 +1,39 @@
from dagster import job, OpExecutionContext, op, \
AssetMaterialization, AssetKey, define_asset_job
from .assets import deals
deals_job = define_asset_job("deals_job", selection=[deals], partitions_def=deals.partitions_def)
@op
def check_partititions(context: OpExecutionContext):
# Replace with your asset/job name
asset_key = "deals"
context.log_event(
AssetMaterialization(asset_key=asset_key, partition="2024-09-30|sounds")
)
# Fetch the materializations for the asset key
materializations = context.instance.get_materialized_partitions(
asset_key=AssetKey(asset_key)
)
context.log.info("Existing partitions", extra=dict(partitions=materializations))
import polars as pl
storage_dir = context.instance.storage_directory()
ic(storage_dir)
for row in pl.scan_parquet(f'{storage_dir}/{asset_key}/*/*.parquet').select(
['date', 'source']).unique().collect().iter_rows():
partition = '|'.join(row)
if partition not in materializations:
context.log.info(f"Missing partition: {partition}")
context.log_event(
AssetMaterialization(asset_key=asset_key, partition=partition)
)
@job
def check_partititions_job():
check_partititions()

View File

View File

@@ -0,0 +1,191 @@
#!/root/.pyenv/versions/dev/bin/python
import os
import re
from datetime import datetime
import boto3
import pandas as pd
from botocore.exceptions import NoCredentialsError, PartialCredentialsError
from dotenv import load_dotenv
from .scrape import *
def scrape_plato(get=None):
ic()
url = 'https://www.platomania.nl/vinyl-aanbiedingen?page=1'
ic(url)
soup = get_soup(url=url, get=get)
articles_info = scrape_page(soup)
ic(len(articles_info))
links = sorted(set(scrape_page_links(soup)), key=lambda x: int(x.split('=')[-1]))
for link in links:
ic(link)
soup = get_soup(url=link, get=get)
tmp = scrape_page(soup)
ic(len(tmp))
articles_info.extend(tmp)
# break
def clean(name):
tmp = ' '.join(reversed(name.split(', ')))
tmp = tmp.lower()
tmp = re.sub(r'\s+\([^\)]*\)', '', tmp)
return tmp
articles_df = pd.DataFrame(articles_info)
articles_df['_artist'] = articles_df['artist'].map(clean)
articles_df['_price'] = articles_df['price'].map(lambda x: float(x.split(' ')[-1]))
articles_df['_date'] = datetime.now()
return articles_df
def update_database(articles_df=None, database_file='/home/user/plato.parquet'):
if os.path.exists(database_file):
database_df = pd.read_parquet(database_file)
else:
database_df = None
if articles_df is None:
new_df = None if database_df is None else database_df.head(0)
else:
if database_df is None:
articles_df.to_parquet(database_file)
return articles_df, articles_df
compare = ['ean', '_price']
check_df = pd.merge(
database_df[compare],
articles_df[compare],
how='right',
indicator=True
)
new_df = check_df[check_df['_merge'] == 'right_only'].drop(columns='_merge').merge(articles_df)
database_df = pd.concat([
database_df,
new_df
]).sort_values('_date').groupby('ean').last().reset_index()
database_df.to_parquet(database_file)
return database_df, new_df
def send_email(lines):
# Define the email parameters
SENDER = "mail@veenboer.xyz"
RECIPIENT = "rik.veenboer@gmail.com"
SUBJECT = "Aanbieding op plato!"
# The email body for recipients with non-HTML email clients
BODY_TEXT = ""
# The HTML body of the email
tmp = '\n'.join(lines)
BODY_HTML = f"""<html>
<head></head>
<body>
{tmp}
</html>
"""
# The character encoding for the email
CHARSET = "UTF-8"
# Try to send the email
try:
client = boto3.client('ses', region_name='eu-west-1') # Change the region as needed
# Provide the contents of the email
response = client.send_email(
Destination={
'ToAddresses': [
RECIPIENT,
],
},
Message={
'Body': {
'Html': {
'Charset': CHARSET,
'Data': BODY_HTML,
},
'Text': {
'Charset': CHARSET,
'Data': BODY_TEXT,
},
},
'Subject': {
'Charset': CHARSET,
'Data': SUBJECT,
},
},
Source=SENDER,
)
# Display an error if something goes wrong.
except NoCredentialsError:
print("Credentials not available")
except PartialCredentialsError:
print("Incomplete credentials provided")
except Exception as e:
print(f"Error: {e}")
else:
print("Email sent! Message ID:"),
print(response['MessageId'])
def get(url, proxy=True):
if proxy:
tmp = 'socks5://localhost:1080'
kwargs = dict(proxies=dict(http=tmp, https=tmp))
else:
kwargs = {}
return requests.get(url, **kwargs)
def main(dry=False):
load_dotenv('/opt/.env')
local_ip = get('http://ifconfig.me', False).text
get_ip = get('http://ifconfig.me').text
print(f'Local IP = {local_ip}')
print(f'Request IP = {get_ip}')
assert local_ip != get_ip
artists = open('/home/user/artists.txt').read().strip().splitlines()
print(f'Number of known artists = {len(artists)}')
if dry:
articles_df = None
else:
articles_df = scrape_plato(get=get)
database_df, new_df = update_database(articles_df)
if dry:
new_df = database_df.sample(20)
print(f'Database size = {len(database_df)}')
print(f'New = {len(new_df)}')
# new_df = new_df[new_df['_artist'].isin(artists)].query('_price <= 25')
new_df = new_df.query('_price <= 25 and ean != ""')
print(f'Interesting = {len(new_df)}')
if new_df is not None and len(new_df):
message = []
for _, row in new_df.head(10).iterrows():
message.append(f'<a href="https://www.platomania.nl{row.url}"><h1>NEW</h1></a>')
message.append('<ul>')
message.append(f'<li>[artist] {row.artist}</li>')
message.append(f'<li>[title] {row.title}</li>')
message.append(f'<li>[price] {row.price}</li>')
message.append(f'<li>[release] {row.release_date}</li>')
message.append('</ul>')
send_email(message)
if __name__ == '__main__':
cwd = os.path.dirname(__file__)
main(dry=False)

View File

@@ -0,0 +1,77 @@
import requests
from bs4 import BeautifulSoup
def get_soup(url, get = None):
# Send a GET request to the specified URL
if get is None:
get = requests.get
response = get(url)
# Check if the request was successful
if response.status_code == 200:
# Parse the HTML content of the page
return BeautifulSoup(response.content, 'html.parser')
else:
raise ValueError(f"Failed to retrieve the page. Status code: {response.status_code}")
def scrape_page_links(soup):
# Find all <li> elements with class "page-item"
page_items = soup.find_all('li', class_='page-item')
# Extract the href attribute of <a> tags within these <li> elements
links = []
for item in page_items:
a_tag = item.find('a', class_='page-link')
if a_tag and 'href' in a_tag.attrs:
links.append(a_tag['href'])
return links
def extract_article_info(article):
info = {}
# Extract the artist name
artist_tag = article.find('h1', class_='product-card__artist')
info['artist'] = artist_tag.text.strip() if artist_tag else None
# Extract the title and URL
title_tag = article.find('h2', class_='product-card__title')
info['title'] = title_tag.text.strip() if title_tag else None
url_tag = title_tag.find_parent('a') if title_tag else None
info['url'] = url_tag['href'] if url_tag else None
# Extract additional details
details = article.find_all('div', class_='article-details__text')
for detail in details:
text = detail.text.strip()
if 'Label:' in text:
info['label'] = text.replace('Label: ', '').strip()
elif 'Releasedatum:' in text:
info['release_date'] = text.replace('Releasedatum: ', '').strip()
elif 'Herkomst:' in text:
info['origin'] = text.replace('Herkomst: ', '').strip()
elif 'Item-nr:' in text:
info['item_number'] = text.replace('Item-nr: ', '').strip()
elif 'EAN:' in text:
info['ean'] = text.replace('EAN:', '').strip()
# Extract delivery information
delivery_tag = article.find('div', class_='article-details__delivery-text')
info['delivery_info'] = delivery_tag.text.strip() if delivery_tag else None
# Extract price
price_tag = article.find('div', class_='article__price')
info['price'] = price_tag.text.strip() if price_tag else None
return info
def scrape_page(soup):
# Find all article blocks
article_blocks = soup.find_all('article', class_='article LP')
# Extract information from each article block
return [extract_article_info(article) for article in article_blocks]

13
src/app/vinyl/repo.py Normal file
View File

@@ -0,0 +1,13 @@
from dagster import Definitions
from dagster_polars import PolarsParquetIOManager
from .assets import deals
from .jobs import deals_job, check_partititions_job
from .schedules import deals_schedule
vinyl = Definitions(
assets=[deals],
resources={"polars_parquet_io_manager": PolarsParquetIOManager()},
jobs=[deals_job, check_partititions_job],
schedules=[deals_schedule]
)

View File

@@ -0,0 +1,10 @@
from dagster import DefaultScheduleStatus, build_schedule_from_partitioned_job
from app.vinyl.repo import deals_job
deals_schedule = build_schedule_from_partitioned_job(
job=deals_job,
hour_of_day=7,
# execution_timezone="Europe/Amsterdam",
default_status=DefaultScheduleStatus.RUNNING
)

View File

View File

@@ -0,0 +1,80 @@
#!/usr/bin/python3
import glob
import os
from datetime import datetime
import pandas as pd
def get_csvs(directory, n):
# List all files matching the pattern *_sounds.csv
suffix = "_sounds.csv"
files = glob.glob(os.path.join(directory, f"*{suffix}"))
# Function to extract date from filename
def extract_date_from_filename(filename):
# Extract the date string
basename = os.path.basename(filename)
date_str = basename.split(suffix)[0]
try:
return datetime.strptime(date_str, "%Y-%m-%d_%H:%M:%S")
except ValueError:
# The date string cannot be parsed
return None
# Create a list of tuples (date, filename), ignoring files with unparsable dates
result = [(extract_date_from_filename(file), file) for file in files]
result = [item for item in result if item[0] is not None]
# Sort the list by date in descending order (most recent first)
result.sort(key=lambda x: x[0], reverse=True)
# Return the two most recent files
return [x[1] for x in result[:n]]
def analyze(df1, df2):
df1 = df1.drop_duplicates(subset="id")
df2 = df2.drop_duplicates(subset="id")
combined_df = pd.merge(
df1[["id", "price"]], df2, on="id", how="right", indicator=True
)
combined_df["discount"] = combined_df.price_y - combined_df.price_x
combined_df.drop(columns=["price_x"], inplace=True)
combined_df.rename(columns={"price_y": "price"}, inplace=True)
deals = combined_df.query("discount < 0").sort_values(by="discount")[
["id", "name", "price", "discount"]
]
new = combined_df.query("_merge == 'right_only'").sort_values(by="price")[
["id", "name", "price"]
]
return deals, new
if __name__ == "__main__":
csvs = get_csvs(".", 100)
for i in range(1, len(csvs)):
print(f"Comparing {csvs[i]} with {csvs[0]}")
df_previous = pd.read_csv(csvs[i], index_col=0)
df_latest = pd.read_csv(csvs[0], index_col=0)
deals, new = analyze(df_previous, df_latest)
done = False
if len(deals) > 0:
print()
print("New items:")
print(new)
print()
done = True
if len(deals) > 0:
print(f"Discounted items:")
print(deals)
done = True
if done:
break

View File

@@ -0,0 +1,84 @@
#!/usr/bin/python3
import time
from datetime import datetime
import pandas as pd
import requests
from bs4 import BeautifulSoup
from tqdm import tqdm
def get_page_count(html_content):
soup = BeautifulSoup(html_content, "html.parser")
# Find all pagination links
page_links = soup.select("ul.pagination li a")
# Extract the numbers from the hrefs and convert to integers
page_numbers = [
int(link.get_text()) for link in page_links if link.get_text().isdigit()
]
return max(page_numbers)
def parse_page(html_content):
soup = BeautifulSoup(html_content, "html.parser")
# Extract the name (artist - album) from the h5 tag
names = list(map(lambda x: x.get_text(strip=True), soup.find_all("h5")))
# Remove 'Telefoon', 'E-mail', 'Facebook'
names = list(filter(lambda x: " -" in x, names))
# Extract the numerical id from the a tag
ids = list(map(lambda x: x["rel"][0], soup.find_all("a", rel=True)))
# Extract the price
prices = list(
map(
lambda x: float(x.get_text(strip=True).split()[1]),
soup.find_all("span", class_="product-price"),
)
)
df = pd.DataFrame({"id": ids, "name": names, "price": prices})
return df
def fetch_deals():
# Get page count
page_count = get_page_count(
requests.get("https://www.sounds.nl/uitverkoop/1/lp/all/art").text
)
time.sleep(1)
print(f"Number of pages: {page_count}")
# Parse all pages
base_url = "https://www.sounds.nl/uitverkoop/{page_number}/lp/all"
dfs = []
for i in tqdm(range(page_count)):
df = parse_page(requests.get(base_url.format(page_number=i)).text)
dfs.append(df)
time.sleep(2)
# Combine dfs
return pd.concat(dfs) if dfs else pd.DataFrame(columns=["id", "name", "price"])
if __name__ == "__main__":
df = fetch_deals()
print(f"Found {len(df)} deals")
# Show current deals
print(df.sort_values(by="price").head(10))
# Write to file
now = datetime.now()
prefix = now.strftime("%Y-%m-%d_%H:%M:%S")
directory = "/home/bram/src/python"
filepath = f"{directory}/{prefix}_sounds.csv"
print(f"Writing data to {filepath}")
df.to_csv(filepath)

48
src/app/vinyl/test.py Normal file
View File

@@ -0,0 +1,48 @@
import warnings
from datetime import datetime
from dagster import materialize
from dagster_polars import PolarsParquetIOManager
from app.vinyl.assets import (
deals
)
from app.vinyl.jobs import check_partititions_job
warnings.filterwarnings("ignore", category=UserWarning)
import logging
logging.getLogger().setLevel(logging.INFO)
resources = {
"polars_parquet_io_manager": PolarsParquetIOManager(
base_dir="/opt/dagster/storage"
)
}
def test_deals(
source="sounds",
date: str = None
):
if not date:
today = datetime.today().strftime("%Y-%m-%d")
date = today
result = materialize(
[deals],
partition_key=f"{date}|{source}",
resources=resources,
run_config={"loggers": {"console": {"config": {"log_level": "ERROR"}}},
"ops": {"deals": {"config": {"import_dir": "/opt/dagster/storage/import"}}}
}
)
assert result.success
ic(result.asset_value)
if __name__ == "__main__":
# test_deals(source="plato")
check_partititions_job.execute_in_process()