diff --git a/src/app/partitions/assets.py b/src/app/partitions/assets.py index 7a1ecd5..3e69cd7 100644 --- a/src/app/partitions/assets.py +++ b/src/app/partitions/assets.py @@ -1,15 +1,9 @@ import polars as pl -from dagster import ( - AssetIn, - DailyPartitionsDefinition, - DimensionPartitionMapping, - IdentityPartitionMapping, - MultiPartitionMapping, - MultiPartitionsDefinition, - StaticPartitionsDefinition, - TimeWindowPartitionMapping, - asset, -) +from dagster import (AssetIn, DailyPartitionsDefinition, + DimensionPartitionMapping, IdentityPartitionMapping, + MultiPartitionMapping, MultiPartitionsDefinition, + StaticPartitionsDefinition, TimeWindowPartitionMapping, + asset) partitions_def_single = DailyPartitionsDefinition(start_date="2024-09-20") diff --git a/src/app/partitions/mapping.py b/src/app/partitions/mapping.py index d155c01..128d436 100644 --- a/src/app/partitions/mapping.py +++ b/src/app/partitions/mapping.py @@ -4,9 +4,7 @@ from typing import Optional from dagster import MultiPartitionKey, PartitionMapping, PartitionsDefinition from dagster._core.definitions.partition import PartitionsSubset from dagster._core.definitions.partition_mapping import ( - MultiPartitionMapping, - UpstreamPartitionsResult, -) + MultiPartitionMapping, UpstreamPartitionsResult) from dagster._core.instance import DynamicPartitionsStore from dagster._serdes import whitelist_for_serdes diff --git a/src/app/partitions/repo.py b/src/app/partitions/repo.py index ca10abd..f23f0eb 100644 --- a/src/app/partitions/repo.py +++ b/src/app/partitions/repo.py @@ -1,7 +1,8 @@ from dagster import Definitions, define_asset_job from dagster_polars import PolarsParquetIOManager -from .assets import asset_multi_1, asset_multi_2, asset_single_1, asset_single_2 +from .assets import (asset_multi_1, asset_multi_2, asset_single_1, + asset_single_2) # Define a job that includes both assets daily_job = define_asset_job("daily_job", selection=[asset_multi_1, asset_multi_2]) diff --git a/src/app/partitions/test.py b/src/app/partitions/test.py index 7b2dfee..70cbec9 100644 --- a/src/app/partitions/test.py +++ b/src/app/partitions/test.py @@ -1,17 +1,11 @@ from dagster import materialize from dagster_polars import PolarsParquetIOManager -from app.vinyl.assets import ( - asset_multi_1, - asset_multi_2, - asset_single_1, - asset_single_2, -) +from app.vinyl.assets import (asset_multi_1, asset_multi_2, asset_single_1, + asset_single_2) resources = { - "polars_parquet_io_manager": PolarsParquetIOManager( - base_dir="/opt/dagster/storage" - ) + "polars_parquet_io_manager": PolarsParquetIOManager(base_dir="/opt/dagster/storage") } diff --git a/src/app/vinyl/assets.py b/src/app/vinyl/assets.py index eb22df0..cf53bca 100644 --- a/src/app/vinyl/assets.py +++ b/src/app/vinyl/assets.py @@ -3,16 +3,11 @@ from glob import glob import polars as pl import structlog -from dagster import ( - AssetIn, - DailyPartitionsDefinition, - DimensionPartitionMapping, - IdentityPartitionMapping, - MultiPartitionMapping, - MultiPartitionsDefinition, - StaticPartitionsDefinition, - TimeWindowPartitionMapping, - asset, Failure, Field, ) +from dagster import (AssetIn, DailyPartitionsDefinition, + DimensionPartitionMapping, Failure, Field, + IdentityPartitionMapping, MultiPartitionMapping, + MultiPartitionsDefinition, StaticPartitionsDefinition, + TimeWindowPartitionMapping, asset) from app.vinyl.plato.check_plato import scrape_plato from app.vinyl.sounds.fetch import fetch_deals @@ -48,7 +43,9 @@ partition_mapping = MultiPartitionMapping( metadata={ "partition_by": ["date", "source"], }, - config_schema={"import_dir": Field(str, default_value="/opt/dagster/home/storage/import")}, + config_schema={ + "import_dir": Field(str, default_value="/opt/dagster/home/storage/import") + }, ) def deals(context): ic() @@ -74,9 +71,11 @@ def deals(context): file = sorted(files)[-1] logger.info("Using existing CSV file", file=file) try: - df = pl.read_csv(file)[["id", "name", "price"]] + df = pl.read_csv(file) logger.info("Loaded CSV file", rows=len(df)) - return df.with_columns(**{k: pl.lit(v) for k, v in partition_key.items()}) + return df.with_columns( + **{k: pl.lit(v) for k, v in partition_key.items()} + ) except Exception as e: logger.error("Failed to load CSV file!", error=e) raise Failure(f"Cannot materialize for the past: {date.date()}") @@ -91,7 +90,6 @@ def deals(context): logger.info("Scraping Sounds") df = fetch_deals() ic(df.columns) - df = df[["id", "name", "price"]] logger.info("Scraped Sounds", rows=len(df), head=df.head().to_markdown()) return pl.from_pandas(df.assign(**partition_key)) diff --git a/src/app/vinyl/jobs.py b/src/app/vinyl/jobs.py index 03ef1fb..26bb867 100644 --- a/src/app/vinyl/jobs.py +++ b/src/app/vinyl/jobs.py @@ -1,9 +1,11 @@ -from dagster import job, OpExecutionContext, op, \ - AssetMaterialization, AssetKey, define_asset_job +from dagster import (AssetKey, AssetMaterialization, OpExecutionContext, + define_asset_job, job, op) from .assets import deals -deals_job = define_asset_job("deals_job", selection=[deals], partitions_def=deals.partitions_def) +deals_job = define_asset_job( + "deals_job", selection=[deals], partitions_def=deals.partitions_def +) @op @@ -22,11 +24,17 @@ def check_partititions(context: OpExecutionContext): context.log.info("Existing partitions", extra=dict(partitions=materializations)) import polars as pl + storage_dir = context.instance.storage_directory() ic(storage_dir) - for row in pl.scan_parquet(f'{storage_dir}/{asset_key}/*/*.parquet').select( - ['date', 'source']).unique().collect().iter_rows(): - partition = '|'.join(row) + for row in ( + pl.scan_parquet(f"{storage_dir}/{asset_key}/*/*.parquet") + .select(["date", "source"]) + .unique() + .collect() + .iter_rows() + ): + partition = "|".join(row) if partition not in materializations: context.log.info(f"Missing partition: {partition}") context.log_event( diff --git a/src/app/vinyl/plato/check_plato.py b/src/app/vinyl/plato/check_plato.py index 5ba8812..f4ddaad 100755 --- a/src/app/vinyl/plato/check_plato.py +++ b/src/app/vinyl/plato/check_plato.py @@ -14,14 +14,14 @@ from .scrape import * def scrape_plato(get=None): ic() - url = 'https://www.platomania.nl/vinyl-aanbiedingen?page=1' + url = "https://www.platomania.nl/vinyl-aanbiedingen?page=1" ic(url) soup = get_soup(url=url, get=get) articles_info = scrape_page(soup) ic(len(articles_info)) - links = sorted(set(scrape_page_links(soup)), key=lambda x: int(x.split('=')[-1])) + links = sorted(set(scrape_page_links(soup)), key=lambda x: int(x.split("=")[-1])) for link in links: ic(link) soup = get_soup(url=link, get=get) @@ -31,20 +31,20 @@ def scrape_plato(get=None): # break def clean(name): - tmp = ' '.join(reversed(name.split(', '))) + tmp = " ".join(reversed(name.split(", "))) tmp = tmp.lower() - tmp = re.sub(r'\s+\([^\)]*\)', '', tmp) + tmp = re.sub(r"\s+\([^\)]*\)", "", tmp) return tmp articles_df = pd.DataFrame(articles_info) - articles_df['_artist'] = articles_df['artist'].map(clean) - articles_df['_price'] = articles_df['price'].map(lambda x: float(x.split(' ')[-1])) - articles_df['_date'] = datetime.now() + articles_df["_artist"] = articles_df["artist"].map(clean) + articles_df["_price"] = articles_df["price"].map(lambda x: float(x.split(" ")[-1])) + articles_df["_date"] = datetime.now() return articles_df -def update_database(articles_df=None, database_file='/home/user/plato.parquet'): +def update_database(articles_df=None, database_file="/home/user/plato.parquet"): if os.path.exists(database_file): database_df = pd.read_parquet(database_file) else: @@ -57,18 +57,22 @@ def update_database(articles_df=None, database_file='/home/user/plato.parquet'): articles_df.to_parquet(database_file) return articles_df, articles_df - compare = ['ean', '_price'] + compare = ["ean", "_price"] check_df = pd.merge( - database_df[compare], - articles_df[compare], - how='right', - indicator=True + database_df[compare], articles_df[compare], how="right", indicator=True + ) + new_df = ( + check_df[check_df["_merge"] == "right_only"] + .drop(columns="_merge") + .merge(articles_df) + ) + database_df = ( + pd.concat([database_df, new_df]) + .sort_values("_date") + .groupby("ean") + .last() + .reset_index() ) - new_df = check_df[check_df['_merge'] == 'right_only'].drop(columns='_merge').merge(articles_df) - database_df = pd.concat([ - database_df, - new_df - ]).sort_values('_date').groupby('ean').last().reset_index() database_df.to_parquet(database_file) return database_df, new_df @@ -84,7 +88,7 @@ def send_email(lines): BODY_TEXT = "" # The HTML body of the email - tmp = '\n'.join(lines) + tmp = "\n".join(lines) BODY_HTML = f"""
@@ -97,29 +101,31 @@ def send_email(lines): # Try to send the email try: - client = boto3.client('ses', region_name='eu-west-1') # Change the region as needed + client = boto3.client( + "ses", region_name="eu-west-1" + ) # Change the region as needed # Provide the contents of the email response = client.send_email( Destination={ - 'ToAddresses': [ + "ToAddresses": [ RECIPIENT, ], }, Message={ - 'Body': { - 'Html': { - 'Charset': CHARSET, - 'Data': BODY_HTML, + "Body": { + "Html": { + "Charset": CHARSET, + "Data": BODY_HTML, }, - 'Text': { - 'Charset': CHARSET, - 'Data': BODY_TEXT, + "Text": { + "Charset": CHARSET, + "Data": BODY_TEXT, }, }, - 'Subject': { - 'Charset': CHARSET, - 'Data': SUBJECT, + "Subject": { + "Charset": CHARSET, + "Data": SUBJECT, }, }, Source=SENDER, @@ -133,12 +139,12 @@ def send_email(lines): print(f"Error: {e}") else: print("Email sent! Message ID:"), - print(response['MessageId']) + print(response["MessageId"]) def get(url, proxy=True): if proxy: - tmp = 'socks5://localhost:1080' + tmp = "socks5://localhost:1080" kwargs = dict(proxies=dict(http=tmp, https=tmp)) else: kwargs = {} @@ -146,16 +152,16 @@ def get(url, proxy=True): def main(dry=False): - load_dotenv('/opt/.env') + load_dotenv("/opt/.env") - local_ip = get('http://ifconfig.me', False).text - get_ip = get('http://ifconfig.me').text - print(f'Local IP = {local_ip}') - print(f'Request IP = {get_ip}') + local_ip = get("http://ifconfig.me", False).text + get_ip = get("http://ifconfig.me").text + print(f"Local IP = {local_ip}") + print(f"Request IP = {get_ip}") assert local_ip != get_ip - artists = open('/home/user/artists.txt').read().strip().splitlines() - print(f'Number of known artists = {len(artists)}') + artists = open("/home/user/artists.txt").read().strip().splitlines() + print(f"Number of known artists = {len(artists)}") if dry: articles_df = None @@ -166,26 +172,28 @@ def main(dry=False): if dry: new_df = database_df.sample(20) - print(f'Database size = {len(database_df)}') - print(f'New = {len(new_df)}') + print(f"Database size = {len(database_df)}") + print(f"New = {len(new_df)}") # new_df = new_df[new_df['_artist'].isin(artists)].query('_price <= 25') new_df = new_df.query('_price <= 25 and ean != ""') - print(f'Interesting = {len(new_df)}') + print(f"Interesting = {len(new_df)}") if new_df is not None and len(new_df): message = [] for _, row in new_df.head(10).iterrows(): - message.append(f'