diff --git a/data_platform/ops/elementary.py b/data_platform/ops/elementary.py index 1457b75..c2d845f 100644 --- a/data_platform/ops/elementary.py +++ b/data_platform/ops/elementary.py @@ -5,7 +5,6 @@ import subprocess from pathlib import Path from dagster import In, Nothing, OpExecutionContext, op -from dagster_dbt import DbtCliResource from sqlalchemy import create_engine, text _DBT_DIR = Path(__file__).parents[2] / "dbt" @@ -31,19 +30,31 @@ def _elementary_schema_exists() -> bool: @op -def elementary_run_models(context: OpExecutionContext, dbt: DbtCliResource) -> None: +def elementary_run_models(context: OpExecutionContext) -> None: """Run Elementary dbt models only if the elementary schema does not exist yet.""" if _elementary_schema_exists(): context.log.info("Elementary schema already exists, skipping model run.") return context.log.info("Elementary schema not found, running dbt models.") - list( - dbt.cli( - ["run", "--select", "elementary"], - context=context, - ).stream() - ) + cmd = [ + "dbt", + "run", + "--select", + "elementary", + "--profiles-dir", + str(_DBT_DIR), + "--project-dir", + str(_DBT_DIR), + ] + context.log.info(f"Running: {' '.join(cmd)}") + result = subprocess.run(cmd, capture_output=True, text=True) + if result.stdout: + context.log.info(result.stdout) + if result.stderr: + context.log.warning(result.stderr) + if result.returncode != 0: + raise Exception(f"dbt run elementary failed with exit code {result.returncode}") @op(ins={"after": In(Nothing)})