diff --git a/data_platform/assets/__init__.py b/data_platform/assets/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/data_platform/assets/funda.py b/data_platform/assets/funda.py new file mode 100644 index 0000000..43aa979 --- /dev/null +++ b/data_platform/assets/funda.py @@ -0,0 +1,553 @@ +"""Dagster assets for Funda real-estate data ingestion. + +Three assets form a pipeline: + + funda_search_results → funda_listing_details → funda_price_history + +Each asset is configurable from the Dagster launchpad so search +parameters (location, price range, etc.) can be tweaked per run. +""" + +import json +from datetime import datetime, timezone +from typing import Optional + +from dagster import ( + AssetExecutionContext, + Config, + MaterializeResult, + MetadataValue, + asset, +) + +from data_platform.resources import FundaResource, PostgresResource + +# --------------------------------------------------------------------------- +# Launchpad config schemas +# --------------------------------------------------------------------------- + + +class FundaSearchConfig(Config): + """Launchpad parameters for the Funda search asset.""" + + location: str = "amsterdam" + offering_type: str = "buy" + price_min: Optional[int] = None + price_max: Optional[int] = None + area_min: Optional[int] = None + area_max: Optional[int] = None + plot_min: Optional[int] = None + plot_max: Optional[int] = None + object_type: Optional[str] = None # comma-separated, e.g. "house,apartment" + energy_label: Optional[str] = None # comma-separated, e.g. "A,A+,A++" + radius_km: Optional[int] = None + sort: str = "newest" + max_pages: int = 3 + + +class FundaDetailsConfig(Config): + """Launchpad parameters for the listing-details asset.""" + + fetch_all: bool = True # fetch details for every search result + + +class FundaPriceHistoryConfig(Config): + """Launchpad parameters for the price-history asset.""" + + fetch_all: bool = True # fetch price history for every detailed listing + + +# --------------------------------------------------------------------------- +# SQL helpers +# --------------------------------------------------------------------------- + +_SCHEMA = "raw_funda" + +_DDL_SEARCH = f""" +CREATE TABLE IF NOT EXISTS {_SCHEMA}.search_results ( + global_id TEXT, + title TEXT, + city TEXT, + postcode TEXT, + province TEXT, + neighbourhood TEXT, + price BIGINT, + living_area INT, + plot_area INT, + bedrooms INT, + rooms INT, + energy_label TEXT, + object_type TEXT, + offering_type TEXT, + construction_type TEXT, + publish_date TEXT, + broker_id TEXT, + broker_name TEXT, + raw_json JSONB, + ingested_at TIMESTAMPTZ DEFAULT now() +); +""" + +_DDL_DETAILS = f""" +CREATE TABLE IF NOT EXISTS {_SCHEMA}.listing_details ( + global_id TEXT, + tiny_id TEXT, + title TEXT, + city TEXT, + postcode TEXT, + province TEXT, + neighbourhood TEXT, + municipality TEXT, + price BIGINT, + price_formatted TEXT, + status TEXT, + offering_type TEXT, + object_type TEXT, + house_type TEXT, + construction_type TEXT, + construction_year TEXT, + energy_label TEXT, + living_area INT, + plot_area INT, + bedrooms INT, + rooms INT, + description TEXT, + publication_date TEXT, + latitude DOUBLE PRECISION, + longitude DOUBLE PRECISION, + has_garden BOOLEAN, + has_balcony BOOLEAN, + has_solar_panels BOOLEAN, + has_heat_pump BOOLEAN, + has_roof_terrace BOOLEAN, + is_energy_efficient BOOLEAN, + is_monument BOOLEAN, + url TEXT, + photo_count INT, + views INT, + saves INT, + raw_json JSONB, + ingested_at TIMESTAMPTZ DEFAULT now() +); +""" + +_DDL_PRICE_HISTORY = f""" +CREATE TABLE IF NOT EXISTS {_SCHEMA}.price_history ( + global_id TEXT, + price BIGINT, + human_price TEXT, + date TEXT, + timestamp TEXT, + source TEXT, + status TEXT, + ingested_at TIMESTAMPTZ DEFAULT now() +); +""" + + +def _safe(val): + """Convert non-serialisable values (tuples, lists of dicts, etc.) for JSONB.""" + if isinstance(val, (list, dict, tuple)): + return json.dumps(val, default=str) + return val + + +def _safe_int(val): + """Try to cast to int, return None on failure.""" + if val is None: + return None + try: + return int(val) + except (ValueError, TypeError): + return None + + +# --------------------------------------------------------------------------- +# Assets +# --------------------------------------------------------------------------- + + +@asset( + group_name="funda", + kinds={"python", "postgres"}, + description="Search Funda listings and store results in Postgres.", +) +def funda_search_results( + context: AssetExecutionContext, + config: FundaSearchConfig, + funda: FundaResource, + postgres: PostgresResource, +) -> MaterializeResult: + client = funda.get_client() + + # Build search kwargs from launchpad config + kwargs: dict = { + "location": [loc.strip() for loc in config.location.split(",")], + "offering_type": config.offering_type, + "sort": config.sort, + } + if config.price_min is not None: + kwargs["price_min"] = config.price_min + if config.price_max is not None: + kwargs["price_max"] = config.price_max + if config.area_min is not None: + kwargs["area_min"] = config.area_min + if config.area_max is not None: + kwargs["area_max"] = config.area_max + if config.plot_min is not None: + kwargs["plot_min"] = config.plot_min + if config.plot_max is not None: + kwargs["plot_max"] = config.plot_max + if config.object_type: + kwargs["object_type"] = [t.strip() for t in config.object_type.split(",")] + if config.energy_label: + kwargs["energy_label"] = [l.strip() for l in config.energy_label.split(",")] + if config.radius_km is not None: + kwargs["radius_km"] = config.radius_km + + # Paginate through results + all_listings = [] + for page in range(config.max_pages): + context.log.info(f"Fetching search page {page + 1}/{config.max_pages} …") + kwargs["page"] = page + results = client.search_listing(**kwargs) + if not results: + context.log.info("No more results.") + break + all_listings.extend(results) + context.log.info(f" got {len(results)} listings (total: {len(all_listings)})") + + if not all_listings: + context.log.warning("Search returned zero results.") + return MaterializeResult(metadata={"count": 0}) + + # Write to Postgres + engine = postgres.get_engine() + with engine.begin() as conn: + conn.execute( + __import__("sqlalchemy").text(f"CREATE SCHEMA IF NOT EXISTS {_SCHEMA}") + ) + conn.execute(__import__("sqlalchemy").text(_DDL_SEARCH)) + # Truncate before inserting fresh results + conn.execute( + __import__("sqlalchemy").text(f"TRUNCATE TABLE {_SCHEMA}.search_results") + ) + + rows = [] + for listing in all_listings: + d = listing.to_dict() + rows.append( + { + "global_id": d.get("global_id"), + "title": d.get("title"), + "city": d.get("city"), + "postcode": d.get("postcode"), + "province": d.get("province"), + "neighbourhood": d.get("neighbourhood"), + "price": _safe_int(d.get("price")), + "living_area": _safe_int(d.get("living_area")), + "plot_area": _safe_int(d.get("plot_area")), + "bedrooms": _safe_int(d.get("bedrooms")), + "rooms": _safe_int(d.get("rooms")), + "energy_label": d.get("energy_label"), + "object_type": d.get("object_type"), + "offering_type": d.get("offering_type"), + "construction_type": d.get("construction_type"), + "publish_date": d.get("publish_date"), + "broker_id": str(d.get("broker_id", "")), + "broker_name": d.get("broker_name"), + "raw_json": json.dumps(d, default=str), + } + ) + + insert_sql = f""" + INSERT INTO {_SCHEMA}.search_results + (global_id, title, city, postcode, province, neighbourhood, + price, living_area, plot_area, bedrooms, rooms, energy_label, + object_type, offering_type, construction_type, publish_date, + broker_id, broker_name, raw_json) + VALUES + (:global_id, :title, :city, :postcode, :province, :neighbourhood, + :price, :living_area, :plot_area, :bedrooms, :rooms, :energy_label, + :object_type, :offering_type, :construction_type, :publish_date, + :broker_id, :broker_name, :raw_json) + """ + postgres.execute_many(insert_sql, rows) + + context.log.info(f"Inserted {len(rows)} search results into {_SCHEMA}.search_results") + + return MaterializeResult( + metadata={ + "count": len(rows), + "location": MetadataValue.text(config.location), + "offering_type": MetadataValue.text(config.offering_type), + "preview": MetadataValue.md( + _search_preview_table(rows[:10]), + ), + } + ) + + +@asset( + group_name="funda", + kinds={"python", "postgres"}, + deps=[funda_search_results], + description="Fetch full listing details for each search result and store in Postgres.", +) +def funda_listing_details( + context: AssetExecutionContext, + config: FundaDetailsConfig, + funda: FundaResource, + postgres: PostgresResource, +) -> MaterializeResult: + client = funda.get_client() + engine = postgres.get_engine() + + with engine.begin() as conn: + conn.execute( + __import__("sqlalchemy").text(f"CREATE SCHEMA IF NOT EXISTS {_SCHEMA}") + ) + conn.execute(__import__("sqlalchemy").text(_DDL_DETAILS)) + + # Read listing IDs from search results + with engine.connect() as conn: + result = conn.execute( + __import__("sqlalchemy").text( + f"SELECT DISTINCT global_id FROM {_SCHEMA}.search_results" + ) + ) + ids = [row[0] for row in result if row[0]] + + if not ids: + context.log.warning("No search results found – run funda_search_results first.") + return MaterializeResult(metadata={"count": 0}) + + context.log.info(f"Fetching details for {len(ids)} listings …") + + # Truncate before inserting + with engine.begin() as conn: + conn.execute( + __import__("sqlalchemy").text(f"TRUNCATE TABLE {_SCHEMA}.listing_details") + ) + + rows = [] + errors = 0 + for i, gid in enumerate(ids): + try: + listing = client.get_listing(int(gid)) + d = listing.to_dict() + rows.append( + { + "global_id": d.get("global_id"), + "tiny_id": str(d.get("tiny_id", "")), + "title": d.get("title"), + "city": d.get("city"), + "postcode": d.get("postcode"), + "province": d.get("province"), + "neighbourhood": d.get("neighbourhood"), + "municipality": d.get("municipality"), + "price": _safe_int(d.get("price")), + "price_formatted": d.get("price_formatted"), + "status": d.get("status"), + "offering_type": d.get("offering_type"), + "object_type": d.get("object_type"), + "house_type": d.get("house_type"), + "construction_type": d.get("construction_type"), + "construction_year": d.get("construction_year"), + "energy_label": d.get("energy_label"), + "living_area": _safe_int(d.get("living_area")), + "plot_area": _safe_int(d.get("plot_area")), + "bedrooms": _safe_int(d.get("bedrooms")), + "rooms": _safe_int(d.get("rooms")), + "description": d.get("description"), + "publication_date": d.get("publication_date"), + "latitude": d.get("latitude"), + "longitude": d.get("longitude"), + "has_garden": d.get("has_garden"), + "has_balcony": d.get("has_balcony"), + "has_solar_panels": d.get("has_solar_panels"), + "has_heat_pump": d.get("has_heat_pump"), + "has_roof_terrace": d.get("has_roof_terrace"), + "is_energy_efficient": d.get("is_energy_efficient"), + "is_monument": d.get("is_monument"), + "url": d.get("url"), + "photo_count": _safe_int(d.get("photo_count")), + "views": _safe_int(d.get("views")), + "saves": _safe_int(d.get("saves")), + "raw_json": json.dumps(d, default=str), + } + ) + except Exception as e: + errors += 1 + context.log.warning(f"Failed to fetch listing {gid}: {e}") + continue + + if (i + 1) % 10 == 0: + context.log.info(f" fetched {i + 1}/{len(ids)} …") + + if rows: + insert_sql = f""" + INSERT INTO {_SCHEMA}.listing_details + (global_id, tiny_id, title, city, postcode, province, + neighbourhood, municipality, price, price_formatted, + status, offering_type, object_type, house_type, + construction_type, construction_year, energy_label, + living_area, plot_area, bedrooms, rooms, description, + publication_date, latitude, longitude, + has_garden, has_balcony, has_solar_panels, has_heat_pump, + has_roof_terrace, is_energy_efficient, is_monument, + url, photo_count, views, saves, raw_json) + VALUES + (:global_id, :tiny_id, :title, :city, :postcode, :province, + :neighbourhood, :municipality, :price, :price_formatted, + :status, :offering_type, :object_type, :house_type, + :construction_type, :construction_year, :energy_label, + :living_area, :plot_area, :bedrooms, :rooms, :description, + :publication_date, :latitude, :longitude, + :has_garden, :has_balcony, :has_solar_panels, :has_heat_pump, + :has_roof_terrace, :is_energy_efficient, :is_monument, + :url, :photo_count, :views, :saves, :raw_json) + """ + postgres.execute_many(insert_sql, rows) + + context.log.info( + f"Inserted {len(rows)} listing details ({errors} errors) " + f"into {_SCHEMA}.listing_details" + ) + + return MaterializeResult( + metadata={ + "count": len(rows), + "errors": errors, + "preview": MetadataValue.md( + _details_preview_table(rows[:10]), + ), + } + ) + + +@asset( + group_name="funda", + kinds={"python", "postgres"}, + deps=[funda_listing_details], + description="Fetch price history for each detailed listing and store in Postgres.", +) +def funda_price_history( + context: AssetExecutionContext, + config: FundaPriceHistoryConfig, + funda: FundaResource, + postgres: PostgresResource, +) -> MaterializeResult: + client = funda.get_client() + engine = postgres.get_engine() + + with engine.begin() as conn: + conn.execute( + __import__("sqlalchemy").text(f"CREATE SCHEMA IF NOT EXISTS {_SCHEMA}") + ) + conn.execute(__import__("sqlalchemy").text(_DDL_PRICE_HISTORY)) + + # Read listings from details table + with engine.connect() as conn: + result = conn.execute( + __import__("sqlalchemy").text( + f"SELECT DISTINCT global_id FROM {_SCHEMA}.listing_details" + ) + ) + ids = [row[0] for row in result if row[0]] + + if not ids: + context.log.warning( + "No listing details found – run funda_listing_details first." + ) + return MaterializeResult(metadata={"count": 0}) + + context.log.info(f"Fetching price history for {len(ids)} listings …") + + # Truncate before inserting + with engine.begin() as conn: + conn.execute( + __import__("sqlalchemy").text(f"TRUNCATE TABLE {_SCHEMA}.price_history") + ) + + rows = [] + errors = 0 + for i, gid in enumerate(ids): + try: + # get_price_history needs a Listing object, so fetch it first + listing = client.get_listing(int(gid)) + history = client.get_price_history(listing) + for entry in history: + rows.append( + { + "global_id": gid, + "price": _safe_int(entry.get("price")), + "human_price": entry.get("human_price"), + "date": entry.get("date"), + "timestamp": entry.get("timestamp"), + "source": entry.get("source"), + "status": entry.get("status"), + } + ) + except Exception as e: + errors += 1 + context.log.warning(f"Failed to fetch price history for {gid}: {e}") + continue + + if (i + 1) % 10 == 0: + context.log.info(f" fetched {i + 1}/{len(ids)} …") + + if rows: + insert_sql = f""" + INSERT INTO {_SCHEMA}.price_history + (global_id, price, human_price, date, timestamp, source, status) + VALUES + (:global_id, :price, :human_price, :date, :timestamp, :source, :status) + """ + postgres.execute_many(insert_sql, rows) + + context.log.info( + f"Inserted {len(rows)} price history records ({errors} errors) " + f"into {_SCHEMA}.price_history" + ) + + return MaterializeResult( + metadata={ + "count": len(rows), + "errors": errors, + "listings_processed": len(ids) - errors, + } + ) + + +# --------------------------------------------------------------------------- +# Metadata preview helpers +# --------------------------------------------------------------------------- + + +def _search_preview_table(rows: list[dict]) -> str: + """Build a markdown table for search result metadata preview.""" + lines = ["| Title | City | Price | Area | Bedrooms |", "| --- | --- | --- | --- | --- |"] + for r in rows: + price = f"€{r['price']:,}" if r.get("price") else "–" + area = f"{r['living_area']} m²" if r.get("living_area") else "–" + lines.append( + f"| {r.get('title', '–')} | {r.get('city', '–')} " + f"| {price} | {area} | {r.get('bedrooms', '–')} |" + ) + return "\n".join(lines) + + +def _details_preview_table(rows: list[dict]) -> str: + """Build a markdown table for listing details metadata preview.""" + lines = [ + "| Title | City | Price | Status | Energy |", + "| --- | --- | --- | --- | --- |", + ] + for r in rows: + price = f"€{r['price']:,}" if r.get("price") else "–" + lines.append( + f"| {r.get('title', '–')} | {r.get('city', '–')} " + f"| {price} | {r.get('status', '–')} | {r.get('energy_label', '–')} |" + ) + return "\n".join(lines) diff --git a/data_platform/definitions.py b/data_platform/definitions.py index 3eb2300..fed4a74 100644 --- a/data_platform/definitions.py +++ b/data_platform/definitions.py @@ -3,6 +3,13 @@ from pathlib import Path from dagster import Definitions from dagster_dbt import DbtCliResource, DbtProject, dbt_assets +from data_platform.assets.funda import ( + funda_listing_details, + funda_price_history, + funda_search_results, +) +from data_platform.resources import FundaResource, PostgresResource + # --------------------------------------------------------------------------- # dbt project # --------------------------------------------------------------------------- @@ -29,8 +36,15 @@ def dbt_project_assets(context, dbt: DbtCliResource): # --------------------------------------------------------------------------- defs = Definitions( - assets=[dbt_project_assets], + assets=[ + dbt_project_assets, + funda_search_results, + funda_listing_details, + funda_price_history, + ], resources={ "dbt": DbtCliResource(project_dir=str(DBT_PROJECT_DIR)), + "funda": FundaResource(), + "postgres": PostgresResource(), }, ) diff --git a/data_platform/resources.py b/data_platform/resources.py new file mode 100644 index 0000000..d47eab4 --- /dev/null +++ b/data_platform/resources.py @@ -0,0 +1,40 @@ +"""Shared Dagster resources for the data platform.""" + +import os + +from dagster import ConfigurableResource, EnvVar +from funda import Funda +from sqlalchemy import create_engine, text + + +class FundaResource(ConfigurableResource): + """Wrapper around the pyfunda client.""" + + timeout: int = 30 + + def get_client(self) -> Funda: + return Funda(timeout=self.timeout) + + +class PostgresResource(ConfigurableResource): + """Lightweight Postgres resource for raw ingestion writes.""" + + host: str = os.getenv("POSTGRES_HOST", "localhost") + port: int = int(os.getenv("POSTGRES_PORT", "5432")) + user: str = os.getenv("POSTGRES_USER", "") + password: str = os.getenv("POSTGRES_PASSWORD", "") + dbname: str = os.getenv("POSTGRES_DB", "") + + def get_engine(self): + url = f"postgresql://{self.user}:{self.password}@{self.host}:{self.port}/{self.dbname}" + return create_engine(url) + + def execute(self, statement: str, params: dict | None = None): + engine = self.get_engine() + with engine.begin() as conn: + conn.execute(text(statement), params or {}) + + def execute_many(self, statement: str, rows: list[dict]): + engine = self.get_engine() + with engine.begin() as conn: + conn.execute(text(statement), rows) diff --git a/dbt/.user.yml b/dbt/.user.yml new file mode 100644 index 0000000..ea06c7b --- /dev/null +++ b/dbt/.user.yml @@ -0,0 +1 @@ +id: 404f3c98-ccdb-49f0-b2eb-66b3932add63 diff --git a/dbt/models/staging/schema.yml b/dbt/models/staging/schema.yml index 47169dc..54eca7a 100644 --- a/dbt/models/staging/schema.yml +++ b/dbt/models/staging/schema.yml @@ -1,12 +1,27 @@ version: 2 models: - - name: stg_example + - name: stg_funda_listings description: > - A placeholder staging model. Replace with your actual source tables. + Cleaned Funda listing details – one row per property. columns: - - name: id - description: Primary key. + - name: global_id + description: Funda internal listing ID. tests: - unique - not_null + - name: city + description: City name. + - name: price + description: Asking or rental price in euros. + + - name: stg_funda_price_history + description: > + Historical price events per listing (asking prices, WOZ assessments, sales). + columns: + - name: global_id + description: Funda internal listing ID. + tests: + - not_null + - name: price + description: Price at this point in time. diff --git a/dbt/models/staging/sources.yml b/dbt/models/staging/sources.yml new file mode 100644 index 0000000..0a31a37 --- /dev/null +++ b/dbt/models/staging/sources.yml @@ -0,0 +1,51 @@ +version: 2 + +sources: + - name: raw_funda + schema: raw_funda + description: > + Raw Funda real-estate data ingested by Dagster assets. + tables: + - name: search_results + description: Funda search results (broad overview of matching listings). + columns: + - name: global_id + description: Funda internal listing ID. + - name: title + description: Property address / title. + - name: city + description: City name. + - name: price + description: Asking or rental price in euros. + - name: ingested_at + description: Timestamp when the row was written. + + - name: listing_details + description: > + Full listing details fetched per search result (50+ fields). + columns: + - name: global_id + description: Funda internal listing ID. + - name: tiny_id + description: Public ID used in Funda URLs. + - name: price + description: Asking or rental price in euros. + - name: status + description: Listing status (available or sold). + - name: ingested_at + description: Timestamp when the row was written. + + - name: price_history + description: > + Historical price data per listing (asking prices, WOZ, sales). + columns: + - name: global_id + description: Funda internal listing ID. + - name: price + description: Price at this point in time. + - name: source + description: Price data source (Funda or WOZ). + - name: status + description: Price event type (asking_price, sold, or woz). + - name: ingested_at + description: Timestamp when the row was written. diff --git a/dbt/models/staging/stg_example.sql b/dbt/models/staging/stg_example.sql deleted file mode 100644 index d4eae0a..0000000 --- a/dbt/models/staging/stg_example.sql +++ /dev/null @@ -1,4 +0,0 @@ --- Placeholder staging model. --- Replace with your actual source query using the source() macro. - -select 1 as id, 'example' as name diff --git a/dbt/models/staging/stg_funda_listings.sql b/dbt/models/staging/stg_funda_listings.sql new file mode 100644 index 0000000..8abdfbc --- /dev/null +++ b/dbt/models/staging/stg_funda_listings.sql @@ -0,0 +1,48 @@ +-- Staging model: one row per Funda listing with cleaned core fields. + +with source as ( + select * from {{ source('raw_funda', 'listing_details') }} +), + +staged as ( + select + global_id, + tiny_id, + title, + city, + postcode, + province, + neighbourhood, + municipality, + price, + price_formatted, + status, + offering_type, + object_type, + house_type, + construction_type, + construction_year, + energy_label, + living_area, + plot_area, + bedrooms, + rooms, + publication_date, + latitude, + longitude, + has_garden, + has_balcony, + has_solar_panels, + has_heat_pump, + has_roof_terrace, + is_energy_efficient, + is_monument, + url, + photo_count, + views, + saves, + ingested_at + from source +) + +select * from staged diff --git a/dbt/models/staging/stg_funda_price_history.sql b/dbt/models/staging/stg_funda_price_history.sql new file mode 100644 index 0000000..71f69ce --- /dev/null +++ b/dbt/models/staging/stg_funda_price_history.sql @@ -0,0 +1,20 @@ +-- Staging model: one row per price-history event per listing. + +with source as ( + select * from {{ source('raw_funda', 'price_history') }} +), + +staged as ( + select + global_id, + price, + human_price, + date as price_date, + timestamp as price_timestamp, + source as price_source, + status as price_status, + ingested_at + from source +) + +select * from staged diff --git a/pyproject.toml b/pyproject.toml index 20742ae..4fdbd11 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -9,6 +9,7 @@ dependencies = [ "dagster-dbt", "dbt-core", "dbt-postgres", + "pyfunda", ] [build-system]