feat: implement linting and testing

This commit is contained in:
Stijnvandenbroek
2026-03-03 22:02:25 +00:00
parent 8dd6a7b890
commit fc43570506
16 changed files with 884 additions and 56 deletions

View File

@@ -9,8 +9,6 @@ parameters (location, price range, etc.) can be tweaked per run.
"""
import json
from datetime import datetime, timezone
from typing import Optional
from dagster import (
AssetExecutionContext,
@@ -19,6 +17,7 @@ from dagster import (
MetadataValue,
asset,
)
from sqlalchemy import text
from data_platform.resources import FundaResource, PostgresResource
@@ -32,15 +31,15 @@ class FundaSearchConfig(Config):
location: str = "amsterdam"
offering_type: str = "buy"
price_min: Optional[int] = None
price_max: Optional[int] = None
area_min: Optional[int] = None
area_max: Optional[int] = None
plot_min: Optional[int] = None
plot_max: Optional[int] = None
object_type: Optional[str] = None # comma-separated, e.g. "house,apartment"
energy_label: Optional[str] = None # comma-separated, e.g. "A,A+,A++"
radius_km: Optional[int] = None
price_min: int | None = None
price_max: int | None = None
area_min: int | None = None
area_max: int | None = None
plot_min: int | None = None
plot_max: int | None = None
object_type: str | None = None # comma-separated, e.g. "house,apartment"
energy_label: str | None = None # comma-separated, e.g. "A,A+,A++"
radius_km: int | None = None
sort: str = "newest"
max_pages: int = 3
@@ -147,7 +146,7 @@ CREATE TABLE IF NOT EXISTS {_SCHEMA}.price_history (
def _safe(val):
"""Convert non-serialisable values (tuples, lists of dicts, etc.) for JSONB."""
if isinstance(val, (list, dict, tuple)):
if isinstance(val, list | dict | tuple):
return json.dumps(val, default=str)
return val
@@ -159,7 +158,10 @@ def _safe_int(val):
try:
return int(val)
except (ValueError, TypeError):
return None
try:
return int(float(val))
except (ValueError, TypeError):
return None
# ---------------------------------------------------------------------------
@@ -201,7 +203,7 @@ def funda_search_results(
if config.object_type:
kwargs["object_type"] = [t.strip() for t in config.object_type.split(",")]
if config.energy_label:
kwargs["energy_label"] = [l.strip() for l in config.energy_label.split(",")]
kwargs["energy_label"] = [lbl.strip() for lbl in config.energy_label.split(",")]
if config.radius_km is not None:
kwargs["radius_km"] = config.radius_km
@@ -224,14 +226,10 @@ def funda_search_results(
# Write to Postgres
engine = postgres.get_engine()
with engine.begin() as conn:
conn.execute(
__import__("sqlalchemy").text(f"CREATE SCHEMA IF NOT EXISTS {_SCHEMA}")
)
conn.execute(__import__("sqlalchemy").text(_DDL_SEARCH))
conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {_SCHEMA}"))
conn.execute(text(_DDL_SEARCH))
# Truncate before inserting fresh results
conn.execute(
__import__("sqlalchemy").text(f"TRUNCATE TABLE {_SCHEMA}.search_results")
)
conn.execute(text(f"TRUNCATE TABLE {_SCHEMA}.search_results"))
rows = []
for listing in all_listings:
@@ -304,18 +302,12 @@ def funda_listing_details(
engine = postgres.get_engine()
with engine.begin() as conn:
conn.execute(
__import__("sqlalchemy").text(f"CREATE SCHEMA IF NOT EXISTS {_SCHEMA}")
)
conn.execute(__import__("sqlalchemy").text(_DDL_DETAILS))
conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {_SCHEMA}"))
conn.execute(text(_DDL_DETAILS))
# Read listing IDs from search results
with engine.connect() as conn:
result = conn.execute(
__import__("sqlalchemy").text(
f"SELECT DISTINCT global_id FROM {_SCHEMA}.search_results"
)
)
result = conn.execute(text(f"SELECT DISTINCT global_id FROM {_SCHEMA}.search_results"))
ids = [row[0] for row in result if row[0]]
if not ids:
@@ -326,9 +318,7 @@ def funda_listing_details(
# Truncate before inserting
with engine.begin() as conn:
conn.execute(
__import__("sqlalchemy").text(f"TRUNCATE TABLE {_SCHEMA}.listing_details")
)
conn.execute(text(f"TRUNCATE TABLE {_SCHEMA}.listing_details"))
rows = []
errors = 0
@@ -411,8 +401,7 @@ def funda_listing_details(
postgres.execute_many(insert_sql, rows)
context.log.info(
f"Inserted {len(rows)} listing details ({errors} errors) "
f"into {_SCHEMA}.listing_details"
f"Inserted {len(rows)} listing details ({errors} errors) into {_SCHEMA}.listing_details"
)
return MaterializeResult(
@@ -442,33 +431,23 @@ def funda_price_history(
engine = postgres.get_engine()
with engine.begin() as conn:
conn.execute(
__import__("sqlalchemy").text(f"CREATE SCHEMA IF NOT EXISTS {_SCHEMA}")
)
conn.execute(__import__("sqlalchemy").text(_DDL_PRICE_HISTORY))
conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {_SCHEMA}"))
conn.execute(text(_DDL_PRICE_HISTORY))
# Read listings from details table
with engine.connect() as conn:
result = conn.execute(
__import__("sqlalchemy").text(
f"SELECT DISTINCT global_id FROM {_SCHEMA}.listing_details"
)
)
result = conn.execute(text(f"SELECT DISTINCT global_id FROM {_SCHEMA}.listing_details"))
ids = [row[0] for row in result if row[0]]
if not ids:
context.log.warning(
"No listing details found run funda_listing_details first."
)
context.log.warning("No listing details found run funda_listing_details first.")
return MaterializeResult(metadata={"count": 0})
context.log.info(f"Fetching price history for {len(ids)} listings …")
# Truncate before inserting
with engine.begin() as conn:
conn.execute(
__import__("sqlalchemy").text(f"TRUNCATE TABLE {_SCHEMA}.price_history")
)
conn.execute(text(f"TRUNCATE TABLE {_SCHEMA}.price_history"))
rows = []
errors = 0
@@ -507,8 +486,7 @@ def funda_price_history(
postgres.execute_many(insert_sql, rows)
context.log.info(
f"Inserted {len(rows)} price history records ({errors} errors) "
f"into {_SCHEMA}.price_history"
f"Inserted {len(rows)} price history records ({errors} errors) into {_SCHEMA}.price_history"
)
return MaterializeResult(