diff --git a/apps/vinyl/src/assets.py b/apps/vinyl/src/assets.py index f4758b0..7461cf7 100644 --- a/apps/vinyl/src/assets.py +++ b/apps/vinyl/src/assets.py @@ -84,14 +84,16 @@ def deals(context: dg.AssetExecutionContext) -> pl.DataFrame: io_manager_key="polars_parquet_io_manager", partitions_def=deals.partitions_def, ins={"df": dg.AssetIn(key=deals.key)}, - automation_condition=dg.AutomationCondition.on_missing().without( - dg.AutomationCondition.in_latest_time_window() - ), + automation_condition=dg.AutomationCondition.eager(), + output_required=False, ) def cleaned_deals( - context: dg.AssetExecutionContext, df: pl.LazyFrame -) -> Deal.DataFrame: + context: dg.AssetExecutionContext, df: pl.LazyFrame | None +) -> Iterator[dg.Output[Deal.DataFrame]]: """Clean and parse deals from the raw source tables.""" + if df is None: + return + ic() partition_keys = get_partition_keys(context) ic(partition_keys) @@ -104,27 +106,27 @@ def cleaned_deals( parsed_df = parse_sounds(df) case _: context.log.warning(f"Unknown source: {source}!") - return Deal.DataFrame() + return ic(parsed_df.collect_schema()) # Deduplicate and sort the DataFrame columns = ["source", "id", "artist", "title", "price"] - return Deal.DataFrame( - parsed_df.sort("date", descending=True) - .unique(subset=columns, keep="first") - .sort("date", descending=False) - .select(*columns, "date", "release", "url") - .collect() + yield dg.Output( + Deal.DataFrame( + parsed_df.sort("date", descending=True) + .unique(subset=columns, keep="first") + .sort("date", descending=False) + .select(*columns, "date", "release", "url") + .collect() + ) ) @asset( deps=[cleaned_deals], io_manager_key="polars_parquet_io_manager", - automation_condition=dg.AutomationCondition.on_missing().without( - dg.AutomationCondition.in_latest_time_window() - ), + automation_condition=dg.AutomationCondition.eager(), ) def works(context: dg.AssetExecutionContext) -> pl.DataFrame | None: """Aggregate works from cleaned deals.""" @@ -167,15 +169,21 @@ def works(context: dg.AssetExecutionContext) -> pl.DataFrame | None: automation_condition=dg.AutomationCondition.eager(), ) def new_deals( - context: dg.AssetExecutionContext, partitions: dict[str, pl.LazyFrame] + context: dg.AssetExecutionContext, partitions: dict[str, pl.LazyFrame | None] ) -> Iterator[dg.Output[Deal.DataFrame]]: """Fetch new deals from all sources.""" ic() - partition_keys = get_partition_keys(context) - parsed_partition_keys = parse_partition_keys(context, "partitions") + ic(partitions.keys()) + if not (partitions := {k: v for k, v in partitions.items() if v is not None}): + return + ic(partitions.keys()) + partition_keys = get_partition_keys(context) ic(partition_keys) + parsed_partition_keys = parse_partition_keys(context, "partitions") + ic(parsed_partition_keys) + if len(partition_keys := sorted(partitions.keys())) < 2: context.log.warning("Not enough partitions to fetch new deals!") @@ -245,7 +253,7 @@ def good_deals( ] # Render HTML from Jinja template - env = Environment(loader=FileSystemLoader("..")) + env = Environment(loader=FileSystemLoader(f"/apps/{APP}")) template = env.get_template("email.html") html_content = template.render(deals=deals) diff --git a/apps/vinyl/src/definitions.py b/apps/vinyl/src/definitions.py index 1c1f71c..c0fa1f9 100644 --- a/apps/vinyl/src/definitions.py +++ b/apps/vinyl/src/definitions.py @@ -1,19 +1,16 @@ -import os - import assets from config import APP -from dagster_polars import PolarsParquetIOManager from icecream import install from jobs import check_partitions_job, deals_job +from resources import MyIOManager from schedules import deals_schedule +from shared.config import STORAGE_DIR from utils.email import EmailService import dagster as dg install() -storage_dir = os.environ.get("STORAGE_DIR", "/storage") + f"/{APP}" - definitions = dg.Definitions( assets=[ asset.with_attributes( @@ -23,7 +20,7 @@ definitions = dg.Definitions( for asset in dg.load_assets_from_modules([assets]) ], resources={ - "polars_parquet_io_manager": PolarsParquetIOManager(base_dir=storage_dir), + "polars_parquet_io_manager": MyIOManager(base_dir=STORAGE_DIR), "email_service": EmailService( smtp_server=dg.EnvVar("SMTP_SERVER"), smtp_port=dg.EnvVar.int("SMTP_PORT"), diff --git a/apps/vinyl/src/jobs.py b/apps/vinyl/src/jobs.py index b90c44a..5ac0d99 100644 --- a/apps/vinyl/src/jobs.py +++ b/apps/vinyl/src/jobs.py @@ -1,7 +1,7 @@ from functools import partial +import assets import polars as pl -from assets import deals from config import APP import dagster as dg @@ -11,36 +11,50 @@ job = partial(dg.job, **kwargs) define_asset_job = partial(dg.define_asset_job, **kwargs) deals_job = define_asset_job( - "deals_job", selection=[deals.key], partitions_def=deals.partitions_def + "deals_job", + selection=[assets.deals.key], + partitions_def=assets.deals.partitions_def, ) @dg.op(required_resource_keys={"polars_parquet_io_manager"}) def check_partitions(context: dg.OpExecutionContext) -> None: - asset_key = deals.key + for asset in [assets.deals, assets.cleaned_deals]: + asset_key = asset.key - # Fetch the materialized partitions for the asset key - materialized_partitions = context.instance.get_materialized_partitions(asset_key) - ic(materialized_partitions) - - storage_dir = context.resources.polars_parquet_io_manager.base_dir - asset_path = "/".join(asset_key.path) - ic(storage_dir, asset_key, asset_path) - for row in ( - pl.scan_parquet( - f"{storage_dir}/{asset_path}/*/*.parquet", extra_columns="ignore" + # Fetch the materialized partitions for the asset key + materialized_partitions = context.instance.get_materialized_partitions( + asset_key ) - .select(["date", "source"]) - .unique() - .collect() - .iter_rows() - ): - partition = "|".join(row) - if partition not in materialized_partitions: - context.log.info(f"Missing partition: {partition}") - context.log_event( - dg.AssetMaterialization(asset_key=asset_key, partition=partition) + ic(materialized_partitions) + + storage_dir = context.resources.polars_parquet_io_manager.base_dir + asset_path = "/".join(asset_key.path) + ic(storage_dir, asset_key, asset_path) + + partitions = [] + for row in ( + pl.scan_parquet( + f"{storage_dir}/{asset_path}/*/*.parquet", extra_columns="ignore" ) + .select(["date", "source"]) # asset.partitions_def.names ? + .unique() + .with_columns(pl.exclude(pl.String).cast(str)) + .collect() + .iter_rows() + ): + partition = "|".join(row) + if partition not in materialized_partitions: + context.log.info(f"[{asset_key}] Adding partition: {partition}") + context.log_event( + dg.AssetMaterialization(asset_key=asset_key, partition=partition) + ) + partitions.append(partition) + + missing = set(materialized_partitions) - set(partitions) + ic(missing) + for partition in missing: + context.log.info(f"[{asset_key}] Should remove partition: {partition}") @job diff --git a/apps/vinyl/src/resources.py b/apps/vinyl/src/resources.py new file mode 100644 index 0000000..729956f --- /dev/null +++ b/apps/vinyl/src/resources.py @@ -0,0 +1,26 @@ +from typing import Any, Union + +import polars as pl +from dagster_polars import PolarsParquetIOManager +from upath import UPath + +from dagster import InputContext + + +class MyIOManager(PolarsParquetIOManager): + extension: str = ".parquet" # not sure why this does not inherit from super class + + def load_from_path(self, context: InputContext, path: "UPath") -> Union[ + pl.DataFrame, + pl.LazyFrame, + tuple[pl.DataFrame, dict[str, Any]], + tuple[pl.LazyFrame, dict[str, Any]], + None, + ]: + # print(path) + # print(self.extension) + # print(self.__class__.__mro__) + try: + return super().load_from_path(context, path) + except FileNotFoundError: + return None diff --git a/apps/vinyl/src/schedules.py b/apps/vinyl/src/schedules.py index 928a2f1..11fff29 100644 --- a/apps/vinyl/src/schedules.py +++ b/apps/vinyl/src/schedules.py @@ -4,6 +4,6 @@ import dagster as dg deals_schedule = dg.build_schedule_from_partitioned_job( job=deals_job, - hour_of_day=7, + hour_of_day=9, default_status=dg.DefaultScheduleStatus.RUNNING, ) diff --git a/apps/vinyl/src/utils/email.py b/apps/vinyl/src/utils/email.py index 4ea7c99..8fd818f 100644 --- a/apps/vinyl/src/utils/email.py +++ b/apps/vinyl/src/utils/email.py @@ -26,7 +26,7 @@ class EmailService(dg.ConfigurableResource): sender_email: str receiver_email: str - def send_email(self, body: str, subject="Aanbieding op plato (new)!") -> None: + def send_email(self, body: str, subject="Vinyl aanbiedingen!") -> None: msg = MIMEMultipart() msg["Subject"] = subject msg["From"] = self.sender_email