diff --git a/data_platform/ops/elementary.py b/data_platform/ops/elementary.py index 5af9194..83525b5 100644 --- a/data_platform/ops/elementary.py +++ b/data_platform/ops/elementary.py @@ -58,13 +58,15 @@ def elementary_run_models(context: OpExecutionContext) -> None: str(_DBT_DIR), ] context.log.info(f"Running: {' '.join(cmd)}") - result = subprocess.run(cmd, capture_output=True, text=True) - if result.stdout: - context.log.info(result.stdout) - if result.stderr: - context.log.warning(result.stderr) - if result.returncode != 0: - raise Exception(f"dbt run elementary failed with exit code {result.returncode}") + process = subprocess.Popen( + cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True + ) + assert process.stdout is not None + for line in process.stdout: + context.log.info(line.rstrip()) + returncode = process.wait() + if returncode != 0: + raise Exception(f"dbt run elementary failed with exit code {returncode}") @op(ins={"after": In(Nothing)}) @@ -82,13 +84,19 @@ def elementary_generate_report(context: OpExecutionContext) -> None: str(_DBT_DIR), "--file-path", str(report_path), + "--days-back", + "3", + "--executions-limit", + "30", ] context.log.info(f"Running: {' '.join(cmd)}") - result = subprocess.run(cmd, capture_output=True, text=True) - if result.stdout: - context.log.info(result.stdout) - if result.stderr: - context.log.warning(result.stderr) - if result.returncode != 0: - raise Exception(f"edr report failed with exit code {result.returncode}") + process = subprocess.Popen( + cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True + ) + assert process.stdout is not None + for line in process.stdout: + context.log.info(line.rstrip()) + returncode = process.wait() + if returncode != 0: + raise Exception(f"edr report failed with exit code {returncode}") context.log.info("Elementary report generated successfully.") diff --git a/dbt/dbt_project.yml b/dbt/dbt_project.yml index 9ad25b6..0804343 100644 --- a/dbt/dbt_project.yml +++ b/dbt/dbt_project.yml @@ -20,7 +20,7 @@ on-run-end: vars: elementary: edr_interval: 24h - days_back: 7 + days_back: 3 models: data_platform: diff --git a/tests/test_ops.py b/tests/test_ops.py index 0d01391..dbb23f2 100644 --- a/tests/test_ops.py +++ b/tests/test_ops.py @@ -1,6 +1,5 @@ """Tests for Dagster ops — elementary and source freshness.""" -import subprocess from unittest.mock import patch import pytest @@ -68,6 +67,16 @@ class TestElementarySchemaExists: # elementary_run_models +def _mock_popen(returncode=0, stdout_lines=None): + """Create a mock Popen instance with streaming stdout.""" + from unittest.mock import MagicMock + + proc = MagicMock() + proc.stdout = iter(stdout_lines or []) + proc.wait.return_value = returncode + return proc + + class TestElementaryRunModels: @patch("data_platform.ops.elementary._elementary_schema_exists", return_value=True) def test_skips_when_schema_exists(self, mock_exists): @@ -75,30 +84,22 @@ class TestElementaryRunModels: elementary_run_models(context) mock_exists.assert_called_once() - @patch( - "data_platform.ops.elementary.subprocess.run", - return_value=subprocess.CompletedProcess( - args=[], returncode=0, stdout="ok", stderr="" - ), - ) + @patch("data_platform.ops.elementary.subprocess.Popen") @patch("data_platform.ops.elementary._elementary_schema_exists", return_value=False) - def test_runs_dbt_when_schema_missing(self, mock_exists, mock_run): + def test_runs_dbt_when_schema_missing(self, mock_exists, mock_popen): + mock_popen.return_value = _mock_popen(returncode=0, stdout_lines=["ok\n"]) context = build_op_context() elementary_run_models(context) - mock_run.assert_called_once() - args = mock_run.call_args[0][0] + mock_popen.assert_called_once() + args = mock_popen.call_args[0][0] assert "dbt" in args assert "run" in args assert "elementary" in args - @patch( - "data_platform.ops.elementary.subprocess.run", - return_value=subprocess.CompletedProcess( - args=[], returncode=1, stdout="", stderr="error" - ), - ) + @patch("data_platform.ops.elementary.subprocess.Popen") @patch("data_platform.ops.elementary._elementary_schema_exists", return_value=False) - def test_raises_on_dbt_failure(self, mock_exists, mock_run): + def test_raises_on_dbt_failure(self, mock_exists, mock_popen): + mock_popen.return_value = _mock_popen(returncode=1, stdout_lines=["error\n"]) context = build_op_context() with pytest.raises(Exception, match="dbt run elementary failed"): elementary_run_models(context) @@ -108,38 +109,30 @@ class TestElementaryRunModels: class TestElementaryGenerateReport: - @patch( - "data_platform.ops.elementary.subprocess.run", - return_value=subprocess.CompletedProcess( - args=[], returncode=0, stdout="report generated", stderr="" - ), - ) - def test_calls_edr_report(self, mock_run): + @patch("data_platform.ops.elementary.subprocess.Popen") + def test_calls_edr_report(self, mock_popen): + mock_popen.return_value = _mock_popen( + returncode=0, stdout_lines=["report generated\n"] + ) context = build_op_context() elementary_generate_report(context) - mock_run.assert_called_once() - args = mock_run.call_args[0][0] + mock_popen.assert_called_once() + args = mock_popen.call_args[0][0] assert "edr" in args assert "report" in args - @patch( - "data_platform.ops.elementary.subprocess.run", - return_value=subprocess.CompletedProcess( - args=[], returncode=1, stdout="", stderr="fatal error" - ), - ) - def test_raises_on_failure(self, mock_run): + @patch("data_platform.ops.elementary.subprocess.Popen") + def test_raises_on_failure(self, mock_popen): + mock_popen.return_value = _mock_popen( + returncode=1, stdout_lines=["fatal error\n"] + ) context = build_op_context() with pytest.raises(Exception, match="edr report failed"): elementary_generate_report(context) - @patch( - "data_platform.ops.elementary.subprocess.run", - return_value=subprocess.CompletedProcess( - args=[], returncode=0, stdout="done", stderr="" - ), - ) - def test_success_returns_none(self, mock_run): + @patch("data_platform.ops.elementary.subprocess.Popen") + def test_success_returns_none(self, mock_popen): + mock_popen.return_value = _mock_popen(returncode=0, stdout_lines=["done\n"]) context = build_op_context() result = elementary_generate_report(context) assert result is None