diff --git a/apps/vinyl/email.html b/apps/vinyl/email.html new file mode 100644 index 0000000..89c5263 --- /dev/null +++ b/apps/vinyl/email.html @@ -0,0 +1,21 @@ +
+

🎶 New Deals

+ + {% for source, rows in deals.items() %} +

{{ source|capitalize }}

+ + {% for row in rows %} +
+ +

🆕 {{ row.artist|title }} - {{ row.title|title }}

+
+ +
+ {% endfor %} + {% endfor %} +
diff --git a/apps/vinyl/requirements.txt b/apps/vinyl/requirements.txt index f7c9dd8..53702fb 100644 --- a/apps/vinyl/requirements.txt +++ b/apps/vinyl/requirements.txt @@ -147,7 +147,9 @@ idna==3.10 # requests # yarl jinja2==3.1.6 - # via dagster + # via + # dev (pyproject.toml) + # dagster jmespath==1.0.1 # via # boto3 diff --git a/apps/vinyl/src/assets.py b/apps/vinyl/src/assets.py index 6be3dc5..5f5cd9a 100644 --- a/apps/vinyl/src/assets.py +++ b/apps/vinyl/src/assets.py @@ -6,8 +6,9 @@ from types import SimpleNamespace import polars as pl import structlog from dagster_polars.patito import patito_model_to_dagster_type +from jinja2 import Environment, FileSystemLoader from models import Deal -from partitions import multi_partitions_def +from partitions import daily_partitions_def, multi_partitions_def from plato.fetch import scrape_plato from plato.parse import parse as parse_plato from shared.utils import get_partition_keys, load_partitions, parse_partition_keys @@ -163,7 +164,7 @@ def works(context: dg.AssetExecutionContext) -> pl.DataFrame | None: automation_condition=dg.AutomationCondition.eager(), ) def new_deals( - context: dg.AssetExecutionContext, partitions: dict[str, pl.LazyFrame | None] + context: dg.AssetExecutionContext, partitions: dict[str, pl.LazyFrame] ) -> Iterator[dg.Output[Deal.DataFrame]]: """Fetch new deals from all sources.""" ic() @@ -203,50 +204,47 @@ def new_deals( @dg.asset( io_manager_key="polars_parquet_io_manager", - partitions_def=multi_partitions_def, - ins={"df": dg.AssetIn(key=new_deals.key)}, + partitions_def=daily_partitions_def, + metadata={ + "partition_by": ["date", "source", "release"], + }, + ins={"partitions": dg.AssetIn(key=new_deals.key)}, output_required=False, automation_condition=dg.AutomationCondition.eager(), ) def good_deals( - context: dg.AssetExecutionContext, email_service: EmailService, df: pl.LazyFrame + context: dg.AssetExecutionContext, + email_service: EmailService, + partitions: dict[str, pl.LazyFrame], ) -> Iterator[dg.Output[Deal.DataFrame]]: - filtered_df = df.filter(pl.col("price") <= 25).collect() - num_rows = filtered_df.height - if not num_rows: - context.log.info("No good deals found!") + parsed_partition_keys = parse_partition_keys(context, "partitions") + ic(parsed_partition_keys) + + df = pl.concat(partitions.values(), how="vertical_relaxed").collect() + + counts = dict(df.group_by("source").len().iter_rows()) + logger.info(f"Processing new deals ({df.height}x).", counts=counts) + + filtered_df = df.filter(pl.col("price") <= 25) + if filtered_df.is_empty(): + logger.info("No good deals found!") return - context.log.info(f"Good deals found ({num_rows}x)!") + logger.info(f"Good deals found ({filtered_df.height}x)!", counts=counts) yield dg.Output(Deal.DataFrame(filtered_df)) - lines = [] - lines.append( - """ -
-

🎶 New Music Releases

- """ - ) + # Prepare data for email + deals: dict[str, list[SimpleNamespace]] = {} + for source in filtered_df.select("source").unique().to_series(): + group_df = filtered_df.filter(pl.col("source") == source) + deals[source] = [ + SimpleNamespace(**row) for row in group_df.head(10).iter_rows(named=True) + ] - # Each item - for data in filtered_df.head(10).iter_rows(named=True): - row = SimpleNamespace(**data) - lines.append( - f""" -
- -

🆕 {row.artist} - {row.title}

-
- -
- """ - ) + # Render HTML from Jinja template + env = Environment(loader=FileSystemLoader("..")) + template = env.get_template("email.html") + html_content = template.render(deals=deals) - # Email footer - lines.append("
") - email_service.send_email("\n".join(lines)) + # Send the email + email_service.send_email(html_content) diff --git a/apps/vinyl/src/definitions.py b/apps/vinyl/src/definitions.py index d89f323..d601ca9 100644 --- a/apps/vinyl/src/definitions.py +++ b/apps/vinyl/src/definitions.py @@ -1,4 +1,5 @@ import os +from pathlib import Path import assets from dagster_polars import PolarsParquetIOManager @@ -9,7 +10,7 @@ from utils.email import EmailService import dagster as dg -APP = os.environ["APP"] +APP = os.environ.get("APP", Path(__file__).parent.parent.name) install() diff --git a/apps/vinyl/src/test.py b/apps/vinyl/src/test.py index 8065ea7..3e9119d 100644 --- a/apps/vinyl/src/test.py +++ b/apps/vinyl/src/test.py @@ -68,7 +68,7 @@ if __name__ == "__main__": dg.materialize( assets=definitions.assets, selection=[good_deals.key], - partition_key=f"{today_str()}|{source}", + partition_key=f"{today_str()}", resources=resources, ) case _: diff --git a/pyproject.toml b/pyproject.toml index 0c1fb1e..61c55ea 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -53,7 +53,9 @@ dagster = [ "dagster-duckdb-pandas", "dagit" ] -vinyl = [] +vinyl = [ + "Jinja2" +] stocks = [ "selenium" ]