fix: elementary report with subprocess cli

This commit is contained in:
Stijnvandenbroek
2026-03-05 10:01:32 +00:00
parent 5166a183b4
commit 53f2ae897c

View File

@@ -5,7 +5,6 @@ import subprocess
from pathlib import Path
from dagster import In, Nothing, OpExecutionContext, op
from dagster_dbt import DbtCliResource
from sqlalchemy import create_engine, text
_DBT_DIR = Path(__file__).parents[2] / "dbt"
@@ -31,19 +30,31 @@ def _elementary_schema_exists() -> bool:
@op
def elementary_run_models(context: OpExecutionContext, dbt: DbtCliResource) -> None:
def elementary_run_models(context: OpExecutionContext) -> None:
"""Run Elementary dbt models only if the elementary schema does not exist yet."""
if _elementary_schema_exists():
context.log.info("Elementary schema already exists, skipping model run.")
return
context.log.info("Elementary schema not found, running dbt models.")
list(
dbt.cli(
["run", "--select", "elementary"],
context=context,
).stream()
)
cmd = [
"dbt",
"run",
"--select",
"elementary",
"--profiles-dir",
str(_DBT_DIR),
"--project-dir",
str(_DBT_DIR),
]
context.log.info(f"Running: {' '.join(cmd)}")
result = subprocess.run(cmd, capture_output=True, text=True)
if result.stdout:
context.log.info(result.stdout)
if result.stderr:
context.log.warning(result.stderr)
if result.returncode != 0:
raise Exception(f"dbt run elementary failed with exit code {result.returncode}")
@op(ins={"after": In(Nothing)})