From 9d739242a83af7e8ac398668073e78191614e1ab Mon Sep 17 00:00:00 2001 From: Stijnvandenbroek Date: Thu, 5 Mar 2026 18:20:27 +0000 Subject: [PATCH] feat: add automation --- data_platform/definitions.py | 15 +++--- data_platform/helpers/__init__.py | 81 +++++++---------------------- data_platform/helpers/automation.py | 43 +++++++++++++++ data_platform/helpers/formatting.py | 57 ++++++++++++++++++++ data_platform/helpers/sql.py | 11 ++++ 5 files changed, 138 insertions(+), 69 deletions(-) create mode 100644 data_platform/helpers/automation.py create mode 100644 data_platform/helpers/formatting.py create mode 100644 data_platform/helpers/sql.py diff --git a/data_platform/definitions.py b/data_platform/definitions.py index d3e604a..de2e51d 100644 --- a/data_platform/definitions.py +++ b/data_platform/definitions.py @@ -7,6 +7,7 @@ from data_platform.assets.ingestion.funda import ( funda_price_history, funda_search_results, ) +from data_platform.helpers import apply_automation from data_platform.jobs import ( elementary_refresh_job, funda_ingestion_job, @@ -20,12 +21,14 @@ from data_platform.schedules import ( ) defs = Definitions( - assets=[ - dbt_project_assets, - funda_search_results, - funda_listing_details, - funda_price_history, - ], + assets=apply_automation( + [ + dbt_project_assets, + funda_search_results, + funda_listing_details, + funda_price_history, + ] + ), jobs=[funda_ingestion_job, funda_raw_quality_job, elementary_refresh_job], schedules=[ funda_ingestion_schedule, diff --git a/data_platform/helpers/__init__.py b/data_platform/helpers/__init__.py index b631d03..b2786d0 100644 --- a/data_platform/helpers/__init__.py +++ b/data_platform/helpers/__init__.py @@ -1,66 +1,21 @@ """Shared helper utilities.""" -import json -from pathlib import Path +from data_platform.helpers.automation import apply_automation +from data_platform.helpers.formatting import ( + format_area, + format_euro, + md_preview_table, + safe, + safe_int, +) +from data_platform.helpers.sql import render_sql -from jinja2 import Environment, FileSystemLoader - - -def render_sql(sql_dir: Path, path: str, **kwargs: object) -> str: - """Load and render a Jinja2 SQL template relative to sql_dir.""" - env = Environment(loader=FileSystemLoader(str(sql_dir)), autoescape=False) - return env.get_template(path).render(**kwargs) - - -def safe(val): - """Convert non-serialisable values for JSONB storage.""" - if isinstance(val, list | dict | tuple): - return json.dumps(val, default=str) - return val - - -def safe_int(val): - """Try to cast to int, return None on failure.""" - if val is None: - return None - try: - return int(val) - except (ValueError, TypeError): - try: - return int(float(val)) - except (ValueError, TypeError): - return None - - -def md_preview_table( - rows: list[dict], - columns: list[tuple[str, str]], - formatters: dict[str, callable] | None = None, -) -> str: - """Build a markdown table from a list of row dicts.""" - formatters = formatters or {} - headers = [label for _, label in columns] - lines = [ - "| " + " | ".join(headers) + " |", - "| " + " | ".join("---" for _ in headers) + " |", - ] - for r in rows: - cells = [] - for key, _ in columns: - val = r.get(key) - if key in formatters: - cells.append(formatters[key](val)) - else: - cells.append(str(val) if val is not None else "–") - lines.append("| " + " | ".join(cells) + " |") - return "\n".join(lines) - - -def format_euro(val) -> str: - """Format an integer as €-prefixed, or '–'.""" - return f"€{val:,}" if val else "–" - - -def format_area(val) -> str: - """Format an integer as m², or '–'.""" - return f"{val} m²" if val else "–" +__all__ = [ + "apply_automation", + "format_area", + "format_euro", + "md_preview_table", + "render_sql", + "safe", + "safe_int", +] diff --git a/data_platform/helpers/automation.py b/data_platform/helpers/automation.py new file mode 100644 index 0000000..5edc843 --- /dev/null +++ b/data_platform/helpers/automation.py @@ -0,0 +1,43 @@ +"""Global automation condition for all assets. + +Applies eager auto-materialization to every asset except those tagged "manual". +Also prevents duplicate runs by skipping assets that have any dependencies +currently in progress. +""" + +from collections.abc import Iterable + +from dagster import AssetsDefinition, AutomationCondition + +# Eager: materialize when any dependency updates, but skip when any upstream +# dependency is still in progress to avoid wasted / duplicate runs. +AUTOMATION_CONDITION = ( + AutomationCondition.eager() & ~AutomationCondition.any_deps_in_progress() +) + + +def apply_automation( + assets: Iterable[AssetsDefinition], +) -> list[AssetsDefinition]: + """Return a new list with the global automation condition applied. + + Assets (or individual specs inside multi-asset groups) tagged ``"manual"`` + are left untouched and will only run when triggered explicitly. + """ + result: list[AssetsDefinition] = [] + for asset_def in assets: + if _is_manual(asset_def): + result.append(asset_def) + else: + result.append( + asset_def.with_attributes(automation_condition=AUTOMATION_CONDITION) + ) + return result + + +def _is_manual(asset_def: AssetsDefinition) -> bool: + """Check whether *any* spec in the asset definition carries a ``manual`` tag.""" + for spec in asset_def.specs: + if spec.tags.get("manual"): + return True + return False diff --git a/data_platform/helpers/formatting.py b/data_platform/helpers/formatting.py new file mode 100644 index 0000000..767b88a --- /dev/null +++ b/data_platform/helpers/formatting.py @@ -0,0 +1,57 @@ +"""Data formatting and serialisation helpers.""" + +import json + + +def safe(val): + """Convert non-serialisable values for JSONB storage.""" + if isinstance(val, list | dict | tuple): + return json.dumps(val, default=str) + return val + + +def safe_int(val): + """Try to cast to int, return None on failure.""" + if val is None: + return None + try: + return int(val) + except (ValueError, TypeError): + try: + return int(float(val)) + except (ValueError, TypeError): + return None + + +def md_preview_table( + rows: list[dict], + columns: list[tuple[str, str]], + formatters: dict[str, callable] | None = None, +) -> str: + """Build a markdown table from a list of row dicts.""" + formatters = formatters or {} + headers = [label for _, label in columns] + lines = [ + "| " + " | ".join(headers) + " |", + "| " + " | ".join("---" for _ in headers) + " |", + ] + for r in rows: + cells = [] + for key, _ in columns: + val = r.get(key) + if key in formatters: + cells.append(formatters[key](val)) + else: + cells.append(str(val) if val is not None else "–") + lines.append("| " + " | ".join(cells) + " |") + return "\n".join(lines) + + +def format_euro(val) -> str: + """Format an integer as €-prefixed, or '–'.""" + return f"€{val:,}" if val else "–" + + +def format_area(val) -> str: + """Format an integer as m², or '–'.""" + return f"{val} m²" if val else "–" diff --git a/data_platform/helpers/sql.py b/data_platform/helpers/sql.py new file mode 100644 index 0000000..f5c2885 --- /dev/null +++ b/data_platform/helpers/sql.py @@ -0,0 +1,11 @@ +"""SQL rendering utilities.""" + +from pathlib import Path + +from jinja2 import Environment, FileSystemLoader + + +def render_sql(sql_dir: Path, path: str, **kwargs: object) -> str: + """Load and render a Jinja2 SQL template relative to sql_dir.""" + env = Environment(loader=FileSystemLoader(str(sql_dir)), autoescape=False) + return env.get_template(path).render(**kwargs)