From 5166a183b401ddf78f66a2e0c3497237b2d26dce Mon Sep 17 00:00:00 2001 From: Stijnvandenbroek Date: Wed, 4 Mar 2026 22:26:35 +0000 Subject: [PATCH] feat: finetune elementary implementation --- data_platform/assets/dbt.py | 2 +- data_platform/jobs/elementary.py | 11 ++++++--- data_platform/ops/__init__.py | 6 ++++- data_platform/ops/elementary.py | 42 ++++++++++++++++++++++++++++++-- entrypoint.sh | 3 +++ 5 files changed, 57 insertions(+), 7 deletions(-) diff --git a/data_platform/assets/dbt.py b/data_platform/assets/dbt.py index 58aeac4..7efa0d4 100644 --- a/data_platform/assets/dbt.py +++ b/data_platform/assets/dbt.py @@ -13,7 +13,7 @@ dbt_project = DbtProject(project_dir=str(DBT_PROJECT_DIR)) dbt_project.prepare_if_dev() -@dbt_assets(manifest=dbt_project.manifest_path) +@dbt_assets(manifest=dbt_project.manifest_path, exclude="package:elementary") def dbt_project_assets(context: AssetExecutionContext, dbt: DbtCliResource): """Expose every dbt model as a Dagster asset.""" yield from dbt.cli(["build"], context=context).stream() diff --git a/data_platform/jobs/elementary.py b/data_platform/jobs/elementary.py index a02fd74..14f5768 100644 --- a/data_platform/jobs/elementary.py +++ b/data_platform/jobs/elementary.py @@ -2,9 +2,14 @@ from dagster import job -from data_platform.ops.elementary import elementary_generate_report +from data_platform.ops.elementary import ( + elementary_generate_report, + elementary_run_models, +) -@job(description="Regenerate the Elementary data observability report.") +@job( + description="Ensure Elementary models exist, then regenerate the observability report." +) def elementary_refresh_job(): - elementary_generate_report() + elementary_generate_report(after=elementary_run_models()) diff --git a/data_platform/ops/__init__.py b/data_platform/ops/__init__.py index 553a3ad..2621a96 100644 --- a/data_platform/ops/__init__.py +++ b/data_platform/ops/__init__.py @@ -2,10 +2,14 @@ from data_platform.ops.check_source_freshness import ( SourceFreshnessConfig, check_source_freshness, ) -from data_platform.ops.elementary import elementary_generate_report +from data_platform.ops.elementary import ( + elementary_generate_report, + elementary_run_models, +) __all__ = [ "check_source_freshness", "SourceFreshnessConfig", + "elementary_run_models", "elementary_generate_report", ] diff --git a/data_platform/ops/elementary.py b/data_platform/ops/elementary.py index 949fe03..1457b75 100644 --- a/data_platform/ops/elementary.py +++ b/data_platform/ops/elementary.py @@ -1,14 +1,52 @@ -"""Elementary report generation op.""" +"""Elementary ops.""" +import os import subprocess from pathlib import Path -from dagster import OpExecutionContext, op +from dagster import In, Nothing, OpExecutionContext, op +from dagster_dbt import DbtCliResource +from sqlalchemy import create_engine, text _DBT_DIR = Path(__file__).parents[2] / "dbt" +def _elementary_schema_exists() -> bool: + url = "postgresql://{user}:{password}@{host}:{port}/{dbname}".format( + user=os.environ["POSTGRES_USER"], + password=os.environ["POSTGRES_PASSWORD"], + host=os.environ.get("POSTGRES_HOST", "localhost"), + port=os.environ.get("POSTGRES_PORT", "5432"), + dbname=os.environ["POSTGRES_DB"], + ) + engine = create_engine(url) + with engine.connect() as conn: + return bool( + conn.execute( + text( + "SELECT 1 FROM information_schema.schemata WHERE schema_name = 'elementary'" + ) + ).scalar() + ) + + @op +def elementary_run_models(context: OpExecutionContext, dbt: DbtCliResource) -> None: + """Run Elementary dbt models only if the elementary schema does not exist yet.""" + if _elementary_schema_exists(): + context.log.info("Elementary schema already exists, skipping model run.") + return + + context.log.info("Elementary schema not found, running dbt models.") + list( + dbt.cli( + ["run", "--select", "elementary"], + context=context, + ).stream() + ) + + +@op(ins={"after": In(Nothing)}) def elementary_generate_report(context: OpExecutionContext) -> None: """Run edr report to regenerate the Elementary HTML report.""" cmd = [ diff --git a/entrypoint.sh b/entrypoint.sh index 9cf4599..0e34764 100644 --- a/entrypoint.sh +++ b/entrypoint.sh @@ -1,6 +1,9 @@ #!/bin/sh set -e +echo "Installing dbt packages..." +dbt deps --profiles-dir /app/dbt --project-dir /app/dbt + echo "Generating dbt manifest..." dbt parse --profiles-dir /app/dbt --project-dir /app/dbt