From cf38ac521c1a8728fbe344f567f0c0464b42964b Mon Sep 17 00:00:00 2001 From: Stijnvandenbroek Date: Wed, 4 Mar 2026 10:36:54 +0000 Subject: [PATCH] chore: restructure python --- data_platform/assets/dbt.py | 19 ++++ data_platform/assets/funda.py | 114 +++++++++-------------- data_platform/assets/helpers.py | 65 +++++++++++++ data_platform/definitions.py | 27 +----- tests/test_helpers.py | 157 +++++++++++++++----------------- 5 files changed, 200 insertions(+), 182 deletions(-) create mode 100644 data_platform/assets/dbt.py create mode 100644 data_platform/assets/helpers.py diff --git a/data_platform/assets/dbt.py b/data_platform/assets/dbt.py new file mode 100644 index 0000000..fa5d3f9 --- /dev/null +++ b/data_platform/assets/dbt.py @@ -0,0 +1,19 @@ +"""dbt assets for the data platform.""" + +from pathlib import Path + +from dagster import AssetExecutionContext +from dagster_dbt import DbtCliResource, DbtProject, dbt_assets + +DBT_PROJECT_DIR = Path(__file__).parent.parent.parent / "dbt" + +dbt_project = DbtProject(project_dir=str(DBT_PROJECT_DIR)) + +# When running locally outside Docker, generate/refresh the manifest automatically. +dbt_project.prepare_if_dev() + + +@dbt_assets(manifest=dbt_project.manifest_path) +def dbt_project_assets(context: AssetExecutionContext, dbt: DbtCliResource): + """Every dbt model/test/snapshot becomes a Dagster asset.""" + yield from dbt.cli(["build"], context=context).stream() diff --git a/data_platform/assets/funda.py b/data_platform/assets/funda.py index 0af42e3..2b75e13 100644 --- a/data_platform/assets/funda.py +++ b/data_platform/assets/funda.py @@ -19,6 +19,12 @@ from dagster import ( ) from sqlalchemy import text +from data_platform.assets.helpers import ( + format_area, + format_euro, + md_preview_table, + safe_int, +) from data_platform.resources import FundaResource, PostgresResource # --------------------------------------------------------------------------- @@ -144,26 +150,6 @@ CREATE TABLE IF NOT EXISTS {_SCHEMA}.price_history ( """ -def _safe(val): - """Convert non-serialisable values (tuples, lists of dicts, etc.) for JSONB.""" - if isinstance(val, list | dict | tuple): - return json.dumps(val, default=str) - return val - - -def _safe_int(val): - """Try to cast to int, return None on failure.""" - if val is None: - return None - try: - return int(val) - except (ValueError, TypeError): - try: - return int(float(val)) - except (ValueError, TypeError): - return None - - # --------------------------------------------------------------------------- # Assets # --------------------------------------------------------------------------- @@ -242,11 +228,11 @@ def funda_search_results( "postcode": d.get("postcode"), "province": d.get("province"), "neighbourhood": d.get("neighbourhood"), - "price": _safe_int(d.get("price")), - "living_area": _safe_int(d.get("living_area")), - "plot_area": _safe_int(d.get("plot_area")), - "bedrooms": _safe_int(d.get("bedrooms")), - "rooms": _safe_int(d.get("rooms")), + "price": safe_int(d.get("price")), + "living_area": safe_int(d.get("living_area")), + "plot_area": safe_int(d.get("plot_area")), + "bedrooms": safe_int(d.get("bedrooms")), + "rooms": safe_int(d.get("rooms")), "energy_label": d.get("energy_label"), "object_type": d.get("object_type"), "offering_type": d.get("offering_type"), @@ -282,7 +268,17 @@ def funda_search_results( "location": MetadataValue.text(config.location), "offering_type": MetadataValue.text(config.offering_type), "preview": MetadataValue.md( - _search_preview_table(rows[:10]), + md_preview_table( + rows[:10], + columns=[ + ("title", "Title"), + ("city", "City"), + ("price", "Price"), + ("living_area", "Area"), + ("bedrooms", "Bedrooms"), + ], + formatters={"price": format_euro, "living_area": format_area}, + ), ), } ) @@ -340,7 +336,7 @@ def funda_listing_details( "province": d.get("province"), "neighbourhood": d.get("neighbourhood"), "municipality": d.get("municipality"), - "price": _safe_int(d.get("price")), + "price": safe_int(d.get("price")), "price_formatted": d.get("price_formatted"), "status": d.get("status"), "offering_type": d.get("offering_type"), @@ -349,10 +345,10 @@ def funda_listing_details( "construction_type": d.get("construction_type"), "construction_year": d.get("construction_year"), "energy_label": d.get("energy_label"), - "living_area": _safe_int(d.get("living_area")), - "plot_area": _safe_int(d.get("plot_area")), - "bedrooms": _safe_int(d.get("bedrooms")), - "rooms": _safe_int(d.get("rooms")), + "living_area": safe_int(d.get("living_area")), + "plot_area": safe_int(d.get("plot_area")), + "bedrooms": safe_int(d.get("bedrooms")), + "rooms": safe_int(d.get("rooms")), "description": d.get("description"), "publication_date": d.get("publication_date"), "latitude": d.get("latitude"), @@ -365,9 +361,9 @@ def funda_listing_details( "is_energy_efficient": d.get("is_energy_efficient"), "is_monument": d.get("is_monument"), "url": d.get("url"), - "photo_count": _safe_int(d.get("photo_count")), - "views": _safe_int(d.get("views")), - "saves": _safe_int(d.get("saves")), + "photo_count": safe_int(d.get("photo_count")), + "views": safe_int(d.get("views")), + "saves": safe_int(d.get("saves")), "raw_json": json.dumps(d, default=str), } ) @@ -413,7 +409,17 @@ def funda_listing_details( "count": len(rows), "errors": errors, "preview": MetadataValue.md( - _details_preview_table(rows[:10]), + md_preview_table( + rows[:10], + columns=[ + ("title", "Title"), + ("city", "City"), + ("price", "Price"), + ("status", "Status"), + ("energy_label", "Energy"), + ], + formatters={"price": format_euro}, + ), ), } ) @@ -468,7 +474,7 @@ def funda_price_history( rows.append( { "global_id": gid, - "price": _safe_int(entry.get("price")), + "price": safe_int(entry.get("price")), "human_price": entry.get("human_price"), "date": entry.get("date"), "timestamp": entry.get("timestamp"), @@ -504,39 +510,3 @@ def funda_price_history( "listings_processed": len(ids) - errors, } ) - - -# --------------------------------------------------------------------------- -# Metadata preview helpers -# --------------------------------------------------------------------------- - - -def _search_preview_table(rows: list[dict]) -> str: - """Build a markdown table for search result metadata preview.""" - lines = [ - "| Title | City | Price | Area | Bedrooms |", - "| --- | --- | --- | --- | --- |", - ] - for r in rows: - price = f"€{r['price']:,}" if r.get("price") else "–" - area = f"{r['living_area']} m²" if r.get("living_area") else "–" - lines.append( - f"| {r.get('title', '–')} | {r.get('city', '–')} " - f"| {price} | {area} | {r.get('bedrooms', '–')} |" - ) - return "\n".join(lines) - - -def _details_preview_table(rows: list[dict]) -> str: - """Build a markdown table for listing details metadata preview.""" - lines = [ - "| Title | City | Price | Status | Energy |", - "| --- | --- | --- | --- | --- |", - ] - for r in rows: - price = f"€{r['price']:,}" if r.get("price") else "–" - lines.append( - f"| {r.get('title', '–')} | {r.get('city', '–')} " - f"| {price} | {r.get('status', '–')} | {r.get('energy_label', '–')} |" - ) - return "\n".join(lines) diff --git a/data_platform/assets/helpers.py b/data_platform/assets/helpers.py new file mode 100644 index 0000000..36f1a5a --- /dev/null +++ b/data_platform/assets/helpers.py @@ -0,0 +1,65 @@ +"""Shared helper utilities for Dagster assets.""" + +import json + + +def safe(val): + """Convert non-serialisable values (tuples, lists of dicts, etc.) for JSONB.""" + if isinstance(val, list | dict | tuple): + return json.dumps(val, default=str) + return val + + +def safe_int(val): + """Try to cast to int, return None on failure.""" + if val is None: + return None + try: + return int(val) + except (ValueError, TypeError): + try: + return int(float(val)) + except (ValueError, TypeError): + return None + + +def md_preview_table( + rows: list[dict], + columns: list[tuple[str, str]], + formatters: dict[str, callable] | None = None, +) -> str: + """Build a markdown table from a list of row dicts. + + Args: + rows: List of dictionaries containing row data. + columns: List of (key, header_label) tuples defining the columns. + formatters: Optional dict mapping column keys to formatting callables. + Each callable receives the raw value and returns a display string. + Columns without a formatter fall back to the raw value or "–". + """ + formatters = formatters or {} + headers = [label for _, label in columns] + lines = [ + "| " + " | ".join(headers) + " |", + "| " + " | ".join("---" for _ in headers) + " |", + ] + for r in rows: + cells = [] + for key, _ in columns: + val = r.get(key) + if key in formatters: + cells.append(formatters[key](val)) + else: + cells.append(str(val) if val is not None else "–") + lines.append("| " + " | ".join(cells) + " |") + return "\n".join(lines) + + +def format_euro(val) -> str: + """Format an integer as €-prefixed with thousands separators, or '–'.""" + return f"€{val:,}" if val else "–" + + +def format_area(val) -> str: + """Format an integer as m² area, or '–'.""" + return f"{val} m²" if val else "–" diff --git a/data_platform/definitions.py b/data_platform/definitions.py index 8708a6a..c895381 100644 --- a/data_platform/definitions.py +++ b/data_platform/definitions.py @@ -1,8 +1,7 @@ -from pathlib import Path - from dagster import Definitions -from dagster_dbt import DbtCliResource, DbtProject, dbt_assets +from dagster_dbt import DbtCliResource +from data_platform.assets.dbt import DBT_PROJECT_DIR, dbt_project_assets from data_platform.assets.funda import ( funda_listing_details, funda_price_history, @@ -10,28 +9,6 @@ from data_platform.assets.funda import ( ) from data_platform.resources import FundaResource, PostgresResource -# --------------------------------------------------------------------------- -# dbt project -# --------------------------------------------------------------------------- - -DBT_PROJECT_DIR = Path(__file__).parent.parent / "dbt" - -dbt_project = DbtProject(project_dir=str(DBT_PROJECT_DIR)) - -# When running locally outside Docker, generate/refresh the manifest automatically. -dbt_project.prepare_if_dev() - - -# --------------------------------------------------------------------------- -# dbt assets – every dbt model/test/snapshot becomes a Dagster asset -# --------------------------------------------------------------------------- - - -@dbt_assets(manifest=dbt_project.manifest_path) -def dbt_project_assets(context, dbt: DbtCliResource): - yield from dbt.cli(["build"], context=context).stream() - - # --------------------------------------------------------------------------- # Definitions # --------------------------------------------------------------------------- diff --git a/tests/test_helpers.py b/tests/test_helpers.py index f831875..b6ab8ca 100644 --- a/tests/test_helpers.py +++ b/tests/test_helpers.py @@ -1,156 +1,143 @@ -"""Tests for pure helper functions in data_platform.assets.funda.""" +"""Tests for pure helper functions in data_platform.assets.helpers.""" -from data_platform.assets.funda import ( - _details_preview_table, - _safe, - _safe_int, - _search_preview_table, +from data_platform.assets.helpers import ( + format_area, + format_euro, + md_preview_table, + safe, + safe_int, ) -# ── _safe_int ─────────────────────────────────────────────────────────────── +# ── safe_int ──────────────────────────────────────────────────────────────── class TestSafeInt: def test_none_returns_none(self): - assert _safe_int(None) is None + assert safe_int(None) is None def test_integer_passthrough(self): - assert _safe_int(42) == 42 + assert safe_int(42) == 42 def test_negative_integer(self): - assert _safe_int(-10) == -10 + assert safe_int(-10) == -10 def test_zero(self): - assert _safe_int(0) == 0 + assert safe_int(0) == 0 def test_string_int(self): - assert _safe_int("123") == 123 + assert safe_int("123") == 123 def test_float_truncated(self): - assert _safe_int(3.9) == 3 + assert safe_int(3.9) == 3 def test_float_string(self): - assert _safe_int("7.0") == 7 + assert safe_int("7.0") == 7 def test_non_numeric_string_returns_none(self): - assert _safe_int("abc") is None + assert safe_int("abc") is None def test_empty_string_returns_none(self): - assert _safe_int("") is None + assert safe_int("") is None def test_list_returns_none(self): - assert _safe_int([1, 2, 3]) is None + assert safe_int([1, 2, 3]) is None -# ── _safe ──────────────────────────────────────────────────────────────────── +# ── safe ───────────────────────────────────────────────────────────────────── class TestSafe: def test_dict_becomes_json_string(self): - result = _safe({"key": "val"}) + result = safe({"key": "val"}) assert result == '{"key": "val"}' def test_list_becomes_json_string(self): - result = _safe([1, 2, 3]) + result = safe([1, 2, 3]) assert result == "[1, 2, 3]" def test_tuple_becomes_json_string(self): - result = _safe((1, 2)) + result = safe((1, 2)) assert result == "[1, 2]" def test_string_passthrough(self): - assert _safe("hello") == "hello" + assert safe("hello") == "hello" def test_integer_passthrough(self): - assert _safe(99) == 99 + assert safe(99) == 99 def test_none_passthrough(self): - assert _safe(None) is None + assert safe(None) is None def test_nested_dict_serialised(self): data = {"a": {"b": [1, 2]}} - result = _safe(data) + result = safe(data) import json assert json.loads(result) == data -# ── _search_preview_table ──────────────────────────────────────────────────── +# ── format_euro ────────────────────────────────────────────────────────────── -class TestSearchPreviewTable: +class TestFormatEuro: + def test_formats_price(self): + assert format_euro(350000) == "€350,000" + + def test_none_returns_dash(self): + assert format_euro(None) == "–" + + def test_zero_returns_dash(self): + assert format_euro(0) == "–" + + +# ── format_area ────────────────────────────────────────────────────────────── + + +class TestFormatArea: + def test_formats_area(self): + assert format_area(80) == "80 m²" + + def test_none_returns_dash(self): + assert format_area(None) == "–" + + def test_zero_returns_dash(self): + assert format_area(0) == "–" + + +# ── md_preview_table ───────────────────────────────────────────────────────── + + +class TestMdPreviewTable: def test_empty_rows_returns_header_only(self): - result = _search_preview_table([]) + result = md_preview_table([], columns=[("title", "Title"), ("city", "City")]) lines = result.split("\n") assert len(lines) == 2 assert "Title" in lines[0] assert "---" in lines[1] def test_single_row_appears(self): - rows = [ - { - "title": "Teststraat 1", - "city": "Amsterdam", - "price": 350000, - "living_area": 80, - "bedrooms": 3, - } - ] - result = _search_preview_table(rows) + rows = [{"title": "Teststraat 1", "city": "Amsterdam", "price": 350000}] + result = md_preview_table( + rows, + columns=[("title", "Title"), ("city", "City"), ("price", "Price")], + formatters={"price": format_euro}, + ) assert "Teststraat 1" in result assert "Amsterdam" in result assert "€350,000" in result - assert "80 m²" in result - assert "3" in result - def test_missing_price_shows_dash(self): - rows = [{"title": "No Price", "city": "Rotterdam", "price": None}] - result = _search_preview_table(rows) - assert "–" in result - - def test_missing_area_shows_dash(self): - rows = [{"title": "No Area", "city": "Utrecht", "living_area": None}] - result = _search_preview_table(rows) + def test_missing_value_shows_dash(self): + rows = [{"title": "No Price", "city": "Rotterdam"}] + result = md_preview_table( + rows, + columns=[("title", "Title"), ("city", "City"), ("price", "Price")], + formatters={"price": format_euro}, + ) assert "–" in result def test_multiple_rows_correct_count(self): - rows = [ - {"title": f"St {i}", "city": "City", "price": i * 1000} for i in range(5) - ] - result = _search_preview_table(rows) + rows = [{"title": f"St {i}", "city": "City"} for i in range(5)] + result = md_preview_table(rows, columns=[("title", "Title"), ("city", "City")]) lines = result.split("\n") # header + separator + 5 data rows assert len(lines) == 7 - - -# ── _details_preview_table ─────────────────────────────────────────────────── - - -class TestDetailsPreviewTable: - def test_empty_rows_returns_header_only(self): - result = _details_preview_table([]) - lines = result.split("\n") - assert len(lines) == 2 - assert "Title" in lines[0] - - def test_row_with_all_fields(self): - rows = [ - { - "title": "Kerkstraat 5", - "city": "Haarlem", - "price": 425000, - "status": "available", - "energy_label": "A", - } - ] - result = _details_preview_table(rows) - assert "Kerkstraat 5" in result - assert "Haarlem" in result - assert "€425,000" in result - assert "available" in result - assert "A" in result - - def test_missing_price_shows_dash(self): - rows = [{"title": "T", "city": "C", "price": None, "status": "sold"}] - result = _details_preview_table(rows) - assert "–" in result