feat: add automation

This commit is contained in:
Stijnvandenbroek
2026-03-05 18:20:27 +00:00
parent 8ada3eff12
commit 9d739242a8
5 changed files with 138 additions and 69 deletions

View File

@@ -7,6 +7,7 @@ from data_platform.assets.ingestion.funda import (
funda_price_history,
funda_search_results,
)
from data_platform.helpers import apply_automation
from data_platform.jobs import (
elementary_refresh_job,
funda_ingestion_job,
@@ -20,12 +21,14 @@ from data_platform.schedules import (
)
defs = Definitions(
assets=[
assets=apply_automation(
[
dbt_project_assets,
funda_search_results,
funda_listing_details,
funda_price_history,
],
]
),
jobs=[funda_ingestion_job, funda_raw_quality_job, elementary_refresh_job],
schedules=[
funda_ingestion_schedule,

View File

@@ -1,66 +1,21 @@
"""Shared helper utilities."""
import json
from pathlib import Path
from data_platform.helpers.automation import apply_automation
from data_platform.helpers.formatting import (
format_area,
format_euro,
md_preview_table,
safe,
safe_int,
)
from data_platform.helpers.sql import render_sql
from jinja2 import Environment, FileSystemLoader
def render_sql(sql_dir: Path, path: str, **kwargs: object) -> str:
"""Load and render a Jinja2 SQL template relative to sql_dir."""
env = Environment(loader=FileSystemLoader(str(sql_dir)), autoescape=False)
return env.get_template(path).render(**kwargs)
def safe(val):
"""Convert non-serialisable values for JSONB storage."""
if isinstance(val, list | dict | tuple):
return json.dumps(val, default=str)
return val
def safe_int(val):
"""Try to cast to int, return None on failure."""
if val is None:
return None
try:
return int(val)
except (ValueError, TypeError):
try:
return int(float(val))
except (ValueError, TypeError):
return None
def md_preview_table(
rows: list[dict],
columns: list[tuple[str, str]],
formatters: dict[str, callable] | None = None,
) -> str:
"""Build a markdown table from a list of row dicts."""
formatters = formatters or {}
headers = [label for _, label in columns]
lines = [
"| " + " | ".join(headers) + " |",
"| " + " | ".join("---" for _ in headers) + " |",
__all__ = [
"apply_automation",
"format_area",
"format_euro",
"md_preview_table",
"render_sql",
"safe",
"safe_int",
]
for r in rows:
cells = []
for key, _ in columns:
val = r.get(key)
if key in formatters:
cells.append(formatters[key](val))
else:
cells.append(str(val) if val is not None else "")
lines.append("| " + " | ".join(cells) + " |")
return "\n".join(lines)
def format_euro(val) -> str:
"""Format an integer as €-prefixed, or ''."""
return f"{val:,}" if val else ""
def format_area(val) -> str:
"""Format an integer as m², or ''."""
return f"{val}" if val else ""

View File

@@ -0,0 +1,43 @@
"""Global automation condition for all assets.
Applies eager auto-materialization to every asset except those tagged "manual".
Also prevents duplicate runs by skipping assets that have any dependencies
currently in progress.
"""
from collections.abc import Iterable
from dagster import AssetsDefinition, AutomationCondition
# Eager: materialize when any dependency updates, but skip when any upstream
# dependency is still in progress to avoid wasted / duplicate runs.
AUTOMATION_CONDITION = (
AutomationCondition.eager() & ~AutomationCondition.any_deps_in_progress()
)
def apply_automation(
assets: Iterable[AssetsDefinition],
) -> list[AssetsDefinition]:
"""Return a new list with the global automation condition applied.
Assets (or individual specs inside multi-asset groups) tagged ``"manual"``
are left untouched and will only run when triggered explicitly.
"""
result: list[AssetsDefinition] = []
for asset_def in assets:
if _is_manual(asset_def):
result.append(asset_def)
else:
result.append(
asset_def.with_attributes(automation_condition=AUTOMATION_CONDITION)
)
return result
def _is_manual(asset_def: AssetsDefinition) -> bool:
"""Check whether *any* spec in the asset definition carries a ``manual`` tag."""
for spec in asset_def.specs:
if spec.tags.get("manual"):
return True
return False

View File

@@ -0,0 +1,57 @@
"""Data formatting and serialisation helpers."""
import json
def safe(val):
"""Convert non-serialisable values for JSONB storage."""
if isinstance(val, list | dict | tuple):
return json.dumps(val, default=str)
return val
def safe_int(val):
"""Try to cast to int, return None on failure."""
if val is None:
return None
try:
return int(val)
except (ValueError, TypeError):
try:
return int(float(val))
except (ValueError, TypeError):
return None
def md_preview_table(
rows: list[dict],
columns: list[tuple[str, str]],
formatters: dict[str, callable] | None = None,
) -> str:
"""Build a markdown table from a list of row dicts."""
formatters = formatters or {}
headers = [label for _, label in columns]
lines = [
"| " + " | ".join(headers) + " |",
"| " + " | ".join("---" for _ in headers) + " |",
]
for r in rows:
cells = []
for key, _ in columns:
val = r.get(key)
if key in formatters:
cells.append(formatters[key](val))
else:
cells.append(str(val) if val is not None else "")
lines.append("| " + " | ".join(cells) + " |")
return "\n".join(lines)
def format_euro(val) -> str:
"""Format an integer as €-prefixed, or ''."""
return f"{val:,}" if val else ""
def format_area(val) -> str:
"""Format an integer as m², or ''."""
return f"{val}" if val else ""

View File

@@ -0,0 +1,11 @@
"""SQL rendering utilities."""
from pathlib import Path
from jinja2 import Environment, FileSystemLoader
def render_sql(sql_dir: Path, path: str, **kwargs: object) -> str:
"""Load and render a Jinja2 SQL template relative to sql_dir."""
env = Environment(loader=FileSystemLoader(str(sql_dir)), autoescape=False)
return env.get_template(path).render(**kwargs)