"""Dagster assets for Funda real-estate data ingestion. Three assets form a pipeline: funda_search_results → funda_listing_details → funda_price_history Each asset is configurable from the Dagster launchpad so search parameters (location, price range, etc.) can be tweaked per run. """ import json from dagster import ( AssetExecutionContext, Config, MaterializeResult, MetadataValue, asset, ) from sqlalchemy import text from data_platform.helpers import ( format_area, format_euro, md_preview_table, safe_int, ) from data_platform.resources import FundaResource, PostgresResource # --------------------------------------------------------------------------- # Launchpad config schemas # --------------------------------------------------------------------------- class FundaSearchConfig(Config): """Launchpad parameters for the Funda search asset.""" location: str = "amsterdam" offering_type: str = "buy" price_min: int | None = None price_max: int | None = None area_min: int | None = None area_max: int | None = None plot_min: int | None = None plot_max: int | None = None object_type: str | None = None # comma-separated, e.g. "house,apartment" energy_label: str | None = None # comma-separated, e.g. "A,A+,A++" radius_km: int | None = None sort: str = "newest" max_pages: int = 3 class FundaDetailsConfig(Config): """Launchpad parameters for the listing-details asset.""" fetch_all: bool = True # fetch details for every search result class FundaPriceHistoryConfig(Config): """Launchpad parameters for the price-history asset.""" fetch_all: bool = True # fetch price history for every detailed listing # --------------------------------------------------------------------------- # SQL helpers # --------------------------------------------------------------------------- _SCHEMA = "raw_funda" _DDL_SEARCH = f""" CREATE TABLE IF NOT EXISTS {_SCHEMA}.search_results ( global_id TEXT, title TEXT, city TEXT, postcode TEXT, province TEXT, neighbourhood TEXT, price BIGINT, living_area INT, plot_area INT, bedrooms INT, rooms INT, energy_label TEXT, object_type TEXT, offering_type TEXT, construction_type TEXT, publish_date TEXT, broker_id TEXT, broker_name TEXT, raw_json JSONB, ingested_at TIMESTAMPTZ DEFAULT now() ); """ _DDL_DETAILS = f""" CREATE TABLE IF NOT EXISTS {_SCHEMA}.listing_details ( global_id TEXT, tiny_id TEXT, title TEXT, city TEXT, postcode TEXT, province TEXT, neighbourhood TEXT, municipality TEXT, price BIGINT, price_formatted TEXT, status TEXT, offering_type TEXT, object_type TEXT, house_type TEXT, construction_type TEXT, construction_year TEXT, energy_label TEXT, living_area INT, plot_area INT, bedrooms INT, rooms INT, description TEXT, publication_date TEXT, latitude DOUBLE PRECISION, longitude DOUBLE PRECISION, has_garden BOOLEAN, has_balcony BOOLEAN, has_solar_panels BOOLEAN, has_heat_pump BOOLEAN, has_roof_terrace BOOLEAN, is_energy_efficient BOOLEAN, is_monument BOOLEAN, url TEXT, photo_count INT, views INT, saves INT, raw_json JSONB, ingested_at TIMESTAMPTZ DEFAULT now() ); """ _DDL_PRICE_HISTORY = f""" CREATE TABLE IF NOT EXISTS {_SCHEMA}.price_history ( global_id TEXT, price BIGINT, human_price TEXT, date TEXT, timestamp TEXT, source TEXT, status TEXT, ingested_at TIMESTAMPTZ DEFAULT now() ); """ # --------------------------------------------------------------------------- # Assets # --------------------------------------------------------------------------- @asset( group_name="funda", kinds={"python", "postgres"}, description="Search Funda listings and store results in Postgres.", ) def funda_search_results( context: AssetExecutionContext, config: FundaSearchConfig, funda: FundaResource, postgres: PostgresResource, ) -> MaterializeResult: client = funda.get_client() # Build search kwargs from launchpad config kwargs: dict = { "location": [loc.strip() for loc in config.location.split(",")], "offering_type": config.offering_type, "sort": config.sort, } if config.price_min is not None: kwargs["price_min"] = config.price_min if config.price_max is not None: kwargs["price_max"] = config.price_max if config.area_min is not None: kwargs["area_min"] = config.area_min if config.area_max is not None: kwargs["area_max"] = config.area_max if config.plot_min is not None: kwargs["plot_min"] = config.plot_min if config.plot_max is not None: kwargs["plot_max"] = config.plot_max if config.object_type: kwargs["object_type"] = [t.strip() for t in config.object_type.split(",")] if config.energy_label: kwargs["energy_label"] = [lbl.strip() for lbl in config.energy_label.split(",")] if config.radius_km is not None: kwargs["radius_km"] = config.radius_km # Paginate through results all_listings = [] for page in range(config.max_pages): context.log.info(f"Fetching search page {page + 1}/{config.max_pages} …") kwargs["page"] = page results = client.search_listing(**kwargs) if not results: context.log.info("No more results.") break all_listings.extend(results) context.log.info(f" got {len(results)} listings (total: {len(all_listings)})") if not all_listings: context.log.warning("Search returned zero results.") return MaterializeResult(metadata={"count": 0}) # Write to Postgres engine = postgres.get_engine() with engine.begin() as conn: conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {_SCHEMA}")) conn.execute(text(_DDL_SEARCH)) # Truncate before inserting fresh results conn.execute(text(f"TRUNCATE TABLE {_SCHEMA}.search_results")) rows = [] for listing in all_listings: d = listing.to_dict() rows.append( { "global_id": d.get("global_id"), "title": d.get("title"), "city": d.get("city"), "postcode": d.get("postcode"), "province": d.get("province"), "neighbourhood": d.get("neighbourhood"), "price": safe_int(d.get("price")), "living_area": safe_int(d.get("living_area")), "plot_area": safe_int(d.get("plot_area")), "bedrooms": safe_int(d.get("bedrooms")), "rooms": safe_int(d.get("rooms")), "energy_label": d.get("energy_label"), "object_type": d.get("object_type"), "offering_type": d.get("offering_type"), "construction_type": d.get("construction_type"), "publish_date": d.get("publish_date"), "broker_id": str(d.get("broker_id", "")), "broker_name": d.get("broker_name"), "raw_json": json.dumps(d, default=str), } ) insert_sql = f""" INSERT INTO {_SCHEMA}.search_results (global_id, title, city, postcode, province, neighbourhood, price, living_area, plot_area, bedrooms, rooms, energy_label, object_type, offering_type, construction_type, publish_date, broker_id, broker_name, raw_json) VALUES (:global_id, :title, :city, :postcode, :province, :neighbourhood, :price, :living_area, :plot_area, :bedrooms, :rooms, :energy_label, :object_type, :offering_type, :construction_type, :publish_date, :broker_id, :broker_name, :raw_json) """ postgres.execute_many(insert_sql, rows) context.log.info( f"Inserted {len(rows)} search results into {_SCHEMA}.search_results" ) return MaterializeResult( metadata={ "count": len(rows), "location": MetadataValue.text(config.location), "offering_type": MetadataValue.text(config.offering_type), "preview": MetadataValue.md( md_preview_table( rows[:10], columns=[ ("title", "Title"), ("city", "City"), ("price", "Price"), ("living_area", "Area"), ("bedrooms", "Bedrooms"), ], formatters={"price": format_euro, "living_area": format_area}, ), ), } ) @asset( group_name="funda", kinds={"python", "postgres"}, deps=[funda_search_results], description="Fetch full listing details for each search result and store in Postgres.", ) def funda_listing_details( context: AssetExecutionContext, config: FundaDetailsConfig, funda: FundaResource, postgres: PostgresResource, ) -> MaterializeResult: client = funda.get_client() engine = postgres.get_engine() with engine.begin() as conn: conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {_SCHEMA}")) conn.execute(text(_DDL_DETAILS)) # Read listing IDs from search results with engine.connect() as conn: result = conn.execute( text(f"SELECT DISTINCT global_id FROM {_SCHEMA}.search_results") ) ids = [row[0] for row in result if row[0]] if not ids: context.log.warning("No search results found – run funda_search_results first.") return MaterializeResult(metadata={"count": 0}) context.log.info(f"Fetching details for {len(ids)} listings …") # Truncate before inserting with engine.begin() as conn: conn.execute(text(f"TRUNCATE TABLE {_SCHEMA}.listing_details")) rows = [] errors = 0 for i, gid in enumerate(ids): try: listing = client.get_listing(int(gid)) d = listing.to_dict() rows.append( { "global_id": d.get("global_id"), "tiny_id": str(d.get("tiny_id", "")), "title": d.get("title"), "city": d.get("city"), "postcode": d.get("postcode"), "province": d.get("province"), "neighbourhood": d.get("neighbourhood"), "municipality": d.get("municipality"), "price": safe_int(d.get("price")), "price_formatted": d.get("price_formatted"), "status": d.get("status"), "offering_type": d.get("offering_type"), "object_type": d.get("object_type"), "house_type": d.get("house_type"), "construction_type": d.get("construction_type"), "construction_year": d.get("construction_year"), "energy_label": d.get("energy_label"), "living_area": safe_int(d.get("living_area")), "plot_area": safe_int(d.get("plot_area")), "bedrooms": safe_int(d.get("bedrooms")), "rooms": safe_int(d.get("rooms")), "description": d.get("description"), "publication_date": d.get("publication_date"), "latitude": d.get("latitude"), "longitude": d.get("longitude"), "has_garden": d.get("has_garden"), "has_balcony": d.get("has_balcony"), "has_solar_panels": d.get("has_solar_panels"), "has_heat_pump": d.get("has_heat_pump"), "has_roof_terrace": d.get("has_roof_terrace"), "is_energy_efficient": d.get("is_energy_efficient"), "is_monument": d.get("is_monument"), "url": d.get("url"), "photo_count": safe_int(d.get("photo_count")), "views": safe_int(d.get("views")), "saves": safe_int(d.get("saves")), "raw_json": json.dumps(d, default=str), } ) except Exception as e: errors += 1 context.log.warning(f"Failed to fetch listing {gid}: {e}") continue if (i + 1) % 10 == 0: context.log.info(f" fetched {i + 1}/{len(ids)} …") if rows: insert_sql = f""" INSERT INTO {_SCHEMA}.listing_details (global_id, tiny_id, title, city, postcode, province, neighbourhood, municipality, price, price_formatted, status, offering_type, object_type, house_type, construction_type, construction_year, energy_label, living_area, plot_area, bedrooms, rooms, description, publication_date, latitude, longitude, has_garden, has_balcony, has_solar_panels, has_heat_pump, has_roof_terrace, is_energy_efficient, is_monument, url, photo_count, views, saves, raw_json) VALUES (:global_id, :tiny_id, :title, :city, :postcode, :province, :neighbourhood, :municipality, :price, :price_formatted, :status, :offering_type, :object_type, :house_type, :construction_type, :construction_year, :energy_label, :living_area, :plot_area, :bedrooms, :rooms, :description, :publication_date, :latitude, :longitude, :has_garden, :has_balcony, :has_solar_panels, :has_heat_pump, :has_roof_terrace, :is_energy_efficient, :is_monument, :url, :photo_count, :views, :saves, :raw_json) """ postgres.execute_many(insert_sql, rows) context.log.info( f"Inserted {len(rows)} listing details ({errors} errors) into {_SCHEMA}.listing_details" ) return MaterializeResult( metadata={ "count": len(rows), "errors": errors, "preview": MetadataValue.md( md_preview_table( rows[:10], columns=[ ("title", "Title"), ("city", "City"), ("price", "Price"), ("status", "Status"), ("energy_label", "Energy"), ], formatters={"price": format_euro}, ), ), } ) @asset( group_name="funda", kinds={"python", "postgres"}, deps=[funda_listing_details], description="Fetch price history for each detailed listing and store in Postgres.", ) def funda_price_history( context: AssetExecutionContext, config: FundaPriceHistoryConfig, funda: FundaResource, postgres: PostgresResource, ) -> MaterializeResult: client = funda.get_client() engine = postgres.get_engine() with engine.begin() as conn: conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {_SCHEMA}")) conn.execute(text(_DDL_PRICE_HISTORY)) # Read listings from details table with engine.connect() as conn: result = conn.execute( text(f"SELECT DISTINCT global_id FROM {_SCHEMA}.listing_details") ) ids = [row[0] for row in result if row[0]] if not ids: context.log.warning( "No listing details found – run funda_listing_details first." ) return MaterializeResult(metadata={"count": 0}) context.log.info(f"Fetching price history for {len(ids)} listings …") # Truncate before inserting with engine.begin() as conn: conn.execute(text(f"TRUNCATE TABLE {_SCHEMA}.price_history")) rows = [] errors = 0 for i, gid in enumerate(ids): try: # get_price_history needs a Listing object, so fetch it first listing = client.get_listing(int(gid)) history = client.get_price_history(listing) for entry in history: rows.append( { "global_id": gid, "price": safe_int(entry.get("price")), "human_price": entry.get("human_price"), "date": entry.get("date"), "timestamp": entry.get("timestamp"), "source": entry.get("source"), "status": entry.get("status"), } ) except Exception as e: errors += 1 context.log.warning(f"Failed to fetch price history for {gid}: {e}") continue if (i + 1) % 10 == 0: context.log.info(f" fetched {i + 1}/{len(ids)} …") if rows: insert_sql = f""" INSERT INTO {_SCHEMA}.price_history (global_id, price, human_price, date, timestamp, source, status) VALUES (:global_id, :price, :human_price, :date, :timestamp, :source, :status) """ postgres.execute_many(insert_sql, rows) context.log.info( f"Inserted {len(rows)} price history records ({errors} errors) into {_SCHEMA}.price_history" ) return MaterializeResult( metadata={ "count": len(rows), "errors": errors, "listings_processed": len(ids) - errors, } )