handle missing partitions

This commit is contained in:
2025-07-30 21:53:10 +02:00
parent 02db619c6d
commit 1d7df06dcf
6 changed files with 95 additions and 50 deletions

View File

@@ -84,14 +84,16 @@ def deals(context: dg.AssetExecutionContext) -> pl.DataFrame:
io_manager_key="polars_parquet_io_manager", io_manager_key="polars_parquet_io_manager",
partitions_def=deals.partitions_def, partitions_def=deals.partitions_def,
ins={"df": dg.AssetIn(key=deals.key)}, ins={"df": dg.AssetIn(key=deals.key)},
automation_condition=dg.AutomationCondition.on_missing().without( automation_condition=dg.AutomationCondition.eager(),
dg.AutomationCondition.in_latest_time_window() output_required=False,
),
) )
def cleaned_deals( def cleaned_deals(
context: dg.AssetExecutionContext, df: pl.LazyFrame context: dg.AssetExecutionContext, df: pl.LazyFrame | None
) -> Deal.DataFrame: ) -> Iterator[dg.Output[Deal.DataFrame]]:
"""Clean and parse deals from the raw source tables.""" """Clean and parse deals from the raw source tables."""
if df is None:
return
ic() ic()
partition_keys = get_partition_keys(context) partition_keys = get_partition_keys(context)
ic(partition_keys) ic(partition_keys)
@@ -104,27 +106,27 @@ def cleaned_deals(
parsed_df = parse_sounds(df) parsed_df = parse_sounds(df)
case _: case _:
context.log.warning(f"Unknown source: {source}!") context.log.warning(f"Unknown source: {source}!")
return Deal.DataFrame() return
ic(parsed_df.collect_schema()) ic(parsed_df.collect_schema())
# Deduplicate and sort the DataFrame # Deduplicate and sort the DataFrame
columns = ["source", "id", "artist", "title", "price"] columns = ["source", "id", "artist", "title", "price"]
return Deal.DataFrame( yield dg.Output(
parsed_df.sort("date", descending=True) Deal.DataFrame(
.unique(subset=columns, keep="first") parsed_df.sort("date", descending=True)
.sort("date", descending=False) .unique(subset=columns, keep="first")
.select(*columns, "date", "release", "url") .sort("date", descending=False)
.collect() .select(*columns, "date", "release", "url")
.collect()
)
) )
@asset( @asset(
deps=[cleaned_deals], deps=[cleaned_deals],
io_manager_key="polars_parquet_io_manager", io_manager_key="polars_parquet_io_manager",
automation_condition=dg.AutomationCondition.on_missing().without( automation_condition=dg.AutomationCondition.eager(),
dg.AutomationCondition.in_latest_time_window()
),
) )
def works(context: dg.AssetExecutionContext) -> pl.DataFrame | None: def works(context: dg.AssetExecutionContext) -> pl.DataFrame | None:
"""Aggregate works from cleaned deals.""" """Aggregate works from cleaned deals."""
@@ -167,15 +169,21 @@ def works(context: dg.AssetExecutionContext) -> pl.DataFrame | None:
automation_condition=dg.AutomationCondition.eager(), automation_condition=dg.AutomationCondition.eager(),
) )
def new_deals( def new_deals(
context: dg.AssetExecutionContext, partitions: dict[str, pl.LazyFrame] context: dg.AssetExecutionContext, partitions: dict[str, pl.LazyFrame | None]
) -> Iterator[dg.Output[Deal.DataFrame]]: ) -> Iterator[dg.Output[Deal.DataFrame]]:
"""Fetch new deals from all sources.""" """Fetch new deals from all sources."""
ic() ic()
partition_keys = get_partition_keys(context) ic(partitions.keys())
parsed_partition_keys = parse_partition_keys(context, "partitions") if not (partitions := {k: v for k, v in partitions.items() if v is not None}):
return
ic(partitions.keys())
partition_keys = get_partition_keys(context)
ic(partition_keys) ic(partition_keys)
parsed_partition_keys = parse_partition_keys(context, "partitions")
ic(parsed_partition_keys)
if len(partition_keys := sorted(partitions.keys())) < 2: if len(partition_keys := sorted(partitions.keys())) < 2:
context.log.warning("Not enough partitions to fetch new deals!") context.log.warning("Not enough partitions to fetch new deals!")
@@ -245,7 +253,7 @@ def good_deals(
] ]
# Render HTML from Jinja template # Render HTML from Jinja template
env = Environment(loader=FileSystemLoader("..")) env = Environment(loader=FileSystemLoader(f"/apps/{APP}"))
template = env.get_template("email.html") template = env.get_template("email.html")
html_content = template.render(deals=deals) html_content = template.render(deals=deals)

View File

@@ -1,19 +1,16 @@
import os
import assets import assets
from config import APP from config import APP
from dagster_polars import PolarsParquetIOManager
from icecream import install from icecream import install
from jobs import check_partitions_job, deals_job from jobs import check_partitions_job, deals_job
from resources import MyIOManager
from schedules import deals_schedule from schedules import deals_schedule
from shared.config import STORAGE_DIR
from utils.email import EmailService from utils.email import EmailService
import dagster as dg import dagster as dg
install() install()
storage_dir = os.environ.get("STORAGE_DIR", "/storage") + f"/{APP}"
definitions = dg.Definitions( definitions = dg.Definitions(
assets=[ assets=[
asset.with_attributes( asset.with_attributes(
@@ -23,7 +20,7 @@ definitions = dg.Definitions(
for asset in dg.load_assets_from_modules([assets]) for asset in dg.load_assets_from_modules([assets])
], ],
resources={ resources={
"polars_parquet_io_manager": PolarsParquetIOManager(base_dir=storage_dir), "polars_parquet_io_manager": MyIOManager(base_dir=STORAGE_DIR),
"email_service": EmailService( "email_service": EmailService(
smtp_server=dg.EnvVar("SMTP_SERVER"), smtp_server=dg.EnvVar("SMTP_SERVER"),
smtp_port=dg.EnvVar.int("SMTP_PORT"), smtp_port=dg.EnvVar.int("SMTP_PORT"),

View File

@@ -1,7 +1,7 @@
from functools import partial from functools import partial
import assets
import polars as pl import polars as pl
from assets import deals
from config import APP from config import APP
import dagster as dg import dagster as dg
@@ -11,36 +11,50 @@ job = partial(dg.job, **kwargs)
define_asset_job = partial(dg.define_asset_job, **kwargs) define_asset_job = partial(dg.define_asset_job, **kwargs)
deals_job = define_asset_job( deals_job = define_asset_job(
"deals_job", selection=[deals.key], partitions_def=deals.partitions_def "deals_job",
selection=[assets.deals.key],
partitions_def=assets.deals.partitions_def,
) )
@dg.op(required_resource_keys={"polars_parquet_io_manager"}) @dg.op(required_resource_keys={"polars_parquet_io_manager"})
def check_partitions(context: dg.OpExecutionContext) -> None: def check_partitions(context: dg.OpExecutionContext) -> None:
asset_key = deals.key for asset in [assets.deals, assets.cleaned_deals]:
asset_key = asset.key
# Fetch the materialized partitions for the asset key # Fetch the materialized partitions for the asset key
materialized_partitions = context.instance.get_materialized_partitions(asset_key) materialized_partitions = context.instance.get_materialized_partitions(
ic(materialized_partitions) asset_key
storage_dir = context.resources.polars_parquet_io_manager.base_dir
asset_path = "/".join(asset_key.path)
ic(storage_dir, asset_key, asset_path)
for row in (
pl.scan_parquet(
f"{storage_dir}/{asset_path}/*/*.parquet", extra_columns="ignore"
) )
.select(["date", "source"]) ic(materialized_partitions)
.unique()
.collect() storage_dir = context.resources.polars_parquet_io_manager.base_dir
.iter_rows() asset_path = "/".join(asset_key.path)
): ic(storage_dir, asset_key, asset_path)
partition = "|".join(row)
if partition not in materialized_partitions: partitions = []
context.log.info(f"Missing partition: {partition}") for row in (
context.log_event( pl.scan_parquet(
dg.AssetMaterialization(asset_key=asset_key, partition=partition) f"{storage_dir}/{asset_path}/*/*.parquet", extra_columns="ignore"
) )
.select(["date", "source"]) # asset.partitions_def.names ?
.unique()
.with_columns(pl.exclude(pl.String).cast(str))
.collect()
.iter_rows()
):
partition = "|".join(row)
if partition not in materialized_partitions:
context.log.info(f"[{asset_key}] Adding partition: {partition}")
context.log_event(
dg.AssetMaterialization(asset_key=asset_key, partition=partition)
)
partitions.append(partition)
missing = set(materialized_partitions) - set(partitions)
ic(missing)
for partition in missing:
context.log.info(f"[{asset_key}] Should remove partition: {partition}")
@job @job

View File

@@ -0,0 +1,26 @@
from typing import Any, Union
import polars as pl
from dagster_polars import PolarsParquetIOManager
from upath import UPath
from dagster import InputContext
class MyIOManager(PolarsParquetIOManager):
extension: str = ".parquet" # not sure why this does not inherit from super class
def load_from_path(self, context: InputContext, path: "UPath") -> Union[
pl.DataFrame,
pl.LazyFrame,
tuple[pl.DataFrame, dict[str, Any]],
tuple[pl.LazyFrame, dict[str, Any]],
None,
]:
# print(path)
# print(self.extension)
# print(self.__class__.__mro__)
try:
return super().load_from_path(context, path)
except FileNotFoundError:
return None

View File

@@ -4,6 +4,6 @@ import dagster as dg
deals_schedule = dg.build_schedule_from_partitioned_job( deals_schedule = dg.build_schedule_from_partitioned_job(
job=deals_job, job=deals_job,
hour_of_day=7, hour_of_day=9,
default_status=dg.DefaultScheduleStatus.RUNNING, default_status=dg.DefaultScheduleStatus.RUNNING,
) )

View File

@@ -26,7 +26,7 @@ class EmailService(dg.ConfigurableResource):
sender_email: str sender_email: str
receiver_email: str receiver_email: str
def send_email(self, body: str, subject="Aanbieding op plato (new)!") -> None: def send_email(self, body: str, subject="Vinyl aanbiedingen!") -> None:
msg = MIMEMultipart() msg = MIMEMultipart()
msg["Subject"] = subject msg["Subject"] = subject
msg["From"] = self.sender_email msg["From"] = self.sender_email