Files
data-platform/data_platform/assets/ingestion/funda/funda.py
2026-03-05 21:36:53 +00:00

436 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Funda real-estate ingestion assets."""
import json
import time
from pathlib import Path
from dagster import (
AssetExecutionContext,
Config,
MaterializeResult,
MetadataValue,
asset,
)
from funda import Listing
from sqlalchemy import text
from data_platform.helpers import (
format_area,
format_euro,
md_preview_table,
render_sql,
safe_int,
)
from data_platform.resources import FundaResource, PostgresResource
_SQL_DIR = Path(__file__).parent / "sql"
_SCHEMA = "raw_funda"
class FundaSearchConfig(Config):
"""Search parameters for Funda."""
location: str = "woerden"
offering_type: str = "buy"
price_min: int | None = 300000
price_max: int | None = 500000
area_min: int | None = None
area_max: int | None = None
plot_min: int | None = None
plot_max: int | None = None
object_type: str | None = None
energy_label: str | None = None
radius_km: int | None = 50
sort: str = "newest"
max_pages: int = 10
class FundaDetailsConfig(Config):
"""Config for listing details fetch."""
fetch_all: bool = False
class FundaPriceHistoryConfig(Config):
"""Config for price history fetch."""
fetch_all: bool = False
@asset(
group_name="funda",
kinds={"python", "postgres"},
description="Search Funda listings and store results in Postgres.",
)
def raw_funda_search_results(
context: AssetExecutionContext,
config: FundaSearchConfig,
funda: FundaResource,
postgres: PostgresResource,
) -> MaterializeResult:
client = funda.get_client()
kwargs: dict = {
"location": [loc.strip() for loc in config.location.split(",")],
"offering_type": config.offering_type,
"sort": config.sort,
}
if config.price_min is not None:
kwargs["price_min"] = config.price_min
if config.price_max is not None:
kwargs["price_max"] = config.price_max
if config.area_min is not None:
kwargs["area_min"] = config.area_min
if config.area_max is not None:
kwargs["area_max"] = config.area_max
if config.plot_min is not None:
kwargs["plot_min"] = config.plot_min
if config.plot_max is not None:
kwargs["plot_max"] = config.plot_max
if config.object_type:
kwargs["object_type"] = [t.strip() for t in config.object_type.split(",")]
if config.energy_label:
kwargs["energy_label"] = [lbl.strip() for lbl in config.energy_label.split(",")]
if config.radius_km is not None:
kwargs["radius_km"] = config.radius_km
all_listings = []
for page in range(config.max_pages):
context.log.info(f"Fetching search page {page + 1}/{config.max_pages}...")
kwargs["page"] = page
results = client.search_listing(**kwargs)
if not results:
context.log.info("No more results.")
break
all_listings.extend(results)
context.log.info(f"Got {len(results)} listings (total: {len(all_listings)}).")
if not all_listings:
context.log.warning("Search returned zero results.")
return MaterializeResult(metadata={"count": 0})
engine = postgres.get_engine()
with engine.begin() as conn:
conn.execute(
text(render_sql(_SQL_DIR, "ddl/create_schema.sql", schema=_SCHEMA))
)
conn.execute(
text(render_sql(_SQL_DIR, "ddl/create_search_results.sql", schema=_SCHEMA))
)
rows = []
for listing in all_listings:
d = listing.to_dict()
rows.append(
{
"global_id": d.get("global_id"),
"title": d.get("title"),
"city": d.get("city"),
"postcode": d.get("postcode"),
"province": d.get("province"),
"neighbourhood": d.get("neighbourhood"),
"price": safe_int(d.get("price")),
"living_area": safe_int(d.get("living_area")),
"plot_area": safe_int(d.get("plot_area")),
"bedrooms": safe_int(d.get("bedrooms")),
"rooms": safe_int(d.get("rooms")),
"energy_label": d.get("energy_label"),
"object_type": d.get("object_type"),
"offering_type": d.get("offering_type"),
"construction_type": d.get("construction_type"),
"publish_date": d.get("publish_date"),
"broker_id": str(d.get("broker_id", "")),
"broker_name": d.get("broker_name"),
"raw_json": json.dumps(d, default=str),
}
)
postgres.execute_many(
render_sql(_SQL_DIR, "dml/insert_search_results.sql", schema=_SCHEMA), rows
)
# Mark listings not seen in the last 7 days as inactive.
engine = postgres.get_engine()
with engine.begin() as conn:
result = conn.execute(
text(
render_sql(
_SQL_DIR,
"dml/mark_inactive_search_results.sql",
schema=_SCHEMA,
)
)
)
newly_inactive = result.rowcount
context.log.info(
f"Inserted {len(rows)} search results into {_SCHEMA}.search_results"
f" ({newly_inactive} listings marked inactive)"
)
return MaterializeResult(
metadata={
"count": len(rows),
"newly_inactive": newly_inactive,
"location": MetadataValue.text(config.location),
"offering_type": MetadataValue.text(config.offering_type),
"preview": MetadataValue.md(
md_preview_table(
rows[:10],
columns=[
("title", "Title"),
("city", "City"),
("price", "Price"),
("living_area", "Area"),
("bedrooms", "Bedrooms"),
],
formatters={"price": format_euro, "living_area": format_area},
),
),
}
)
@asset(
group_name="funda",
kinds={"python", "postgres"},
deps=[raw_funda_search_results],
description="Fetch full listing details for each search result and store in Postgres.",
)
def raw_funda_listing_details(
context: AssetExecutionContext,
config: FundaDetailsConfig,
funda: FundaResource,
postgres: PostgresResource,
) -> MaterializeResult:
client = funda.get_client()
engine = postgres.get_engine()
with engine.begin() as conn:
conn.execute(
text(render_sql(_SQL_DIR, "ddl/create_schema.sql", schema=_SCHEMA))
)
conn.execute(
text(render_sql(_SQL_DIR, "ddl/create_listing_details.sql", schema=_SCHEMA))
)
with engine.connect() as conn:
if config.fetch_all:
query = text(
render_sql(_SQL_DIR, "dml/select_all_detail_ids.sql", schema=_SCHEMA)
)
else:
query = text(
render_sql(_SQL_DIR, "dml/select_new_detail_ids.sql", schema=_SCHEMA)
)
result = conn.execute(query)
ids = [row[0] for row in result if row[0]]
if not ids:
context.log.warning("No search results found run funda_search_results first.")
return MaterializeResult(metadata={"count": 0})
context.log.info(f"Fetching details for {len(ids)} listings...")
rows = []
errors = 0
for i, gid in enumerate(ids):
try:
listing = client.get_listing(int(gid))
d = listing.to_dict()
rows.append(
{
"global_id": d.get("global_id"),
"tiny_id": str(d.get("tiny_id", "")),
"title": d.get("title"),
"city": d.get("city"),
"postcode": d.get("postcode"),
"province": d.get("province"),
"neighbourhood": d.get("neighbourhood"),
"municipality": d.get("municipality"),
"price": safe_int(d.get("price")),
"price_formatted": d.get("price_formatted"),
"status": d.get("status"),
"offering_type": d.get("offering_type"),
"object_type": d.get("object_type"),
"house_type": d.get("house_type"),
"construction_type": d.get("construction_type"),
"construction_year": d.get("construction_year"),
"energy_label": d.get("energy_label"),
"living_area": safe_int(d.get("living_area")),
"plot_area": safe_int(d.get("plot_area")),
"bedrooms": safe_int(d.get("bedrooms")),
"rooms": safe_int(d.get("rooms")),
"description": d.get("description"),
"publication_date": d.get("publication_date"),
"latitude": d.get("latitude"),
"longitude": d.get("longitude"),
"has_garden": d.get("has_garden"),
"has_balcony": d.get("has_balcony"),
"has_solar_panels": d.get("has_solar_panels"),
"has_heat_pump": d.get("has_heat_pump"),
"has_roof_terrace": d.get("has_roof_terrace"),
"is_energy_efficient": d.get("is_energy_efficient"),
"is_monument": d.get("is_monument"),
"url": d.get("url"),
"photo_count": safe_int(d.get("photo_count")),
"views": safe_int(d.get("views")),
"saves": safe_int(d.get("saves")),
"raw_json": json.dumps(d, default=str),
}
)
except Exception as e:
errors += 1
context.log.warning(f"Failed to fetch listing {gid}: {e}")
continue
if (i + 1) % 10 == 0:
context.log.info(f"Fetched {i + 1}/{len(ids)} listings.")
time.sleep(1)
if rows:
postgres.execute_many(
render_sql(_SQL_DIR, "dml/insert_listing_details.sql", schema=_SCHEMA), rows
)
# Mark details as stale where the parent search listing is no longer active.
with engine.begin() as conn:
conn.execute(
text(
render_sql(
_SQL_DIR,
"dml/mark_stale_listing_details.sql",
schema=_SCHEMA,
)
)
)
context.log.info(
f"Inserted {len(rows)} listing details ({errors} errors) into {_SCHEMA}.listing_details"
)
return MaterializeResult(
metadata={
"count": len(rows),
"errors": errors,
"preview": MetadataValue.md(
md_preview_table(
rows[:10],
columns=[
("title", "Title"),
("city", "City"),
("price", "Price"),
("status", "Status"),
("energy_label", "Energy"),
],
formatters={"price": format_euro},
),
),
}
)
@asset(
group_name="funda",
kinds={"python", "postgres"},
deps=[raw_funda_listing_details],
description="Fetch price history for each detailed listing and store in Postgres.",
)
def raw_funda_price_history(
context: AssetExecutionContext,
config: FundaPriceHistoryConfig,
funda: FundaResource,
postgres: PostgresResource,
) -> MaterializeResult:
client = funda.get_client()
engine = postgres.get_engine()
with engine.begin() as conn:
conn.execute(
text(render_sql(_SQL_DIR, "ddl/create_schema.sql", schema=_SCHEMA))
)
conn.execute(
text(render_sql(_SQL_DIR, "ddl/create_price_history.sql", schema=_SCHEMA))
)
# Fetch listing metadata (url, title, postcode) from the DB so we can call
# get_price_history without re-fetching each listing from the Funda API.
with engine.connect() as conn:
if config.fetch_all:
query = text(
render_sql(
_SQL_DIR,
"dml/select_all_price_history_listings.sql",
schema=_SCHEMA,
)
)
else:
query = text(
render_sql(
_SQL_DIR,
"dml/select_new_price_history_listings.sql",
schema=_SCHEMA,
)
)
result = conn.execute(query)
listings = [(row[0], row[1], row[2], row[3]) for row in result if row[0]]
if not listings:
context.log.warning(
"No listing details found run funda_listing_details first."
)
return MaterializeResult(metadata={"count": 0})
context.log.info(f"Fetching price history for {len(listings)} listings...")
batch_size = 25
total_rows = 0
errors = 0
for i, (gid, url, title, postcode) in enumerate(listings):
try:
stub = Listing(data={"url": url, "title": title, "postcode": postcode})
history = client.get_price_history(stub)
rows = [
{
"global_id": gid,
"price": safe_int(entry.get("price")),
"human_price": entry.get("human_price"),
"date": entry.get("date"),
"timestamp": entry.get("timestamp"),
"source": entry.get("source"),
"status": entry.get("status"),
}
for entry in history
]
if rows:
postgres.execute_many(
render_sql(
_SQL_DIR, "dml/insert_price_history.sql", schema=_SCHEMA
),
rows,
)
total_rows += len(rows)
except Exception as e:
errors += 1
context.log.warning(f"Failed to fetch price history for {gid}: {e}")
continue
if (i + 1) % batch_size == 0:
context.log.info(f"Fetched {i + 1}/{len(listings)} price histories.")
time.sleep(1)
context.log.info(
f"Inserted {total_rows} price history records ({errors} errors) "
f"into {_SCHEMA}.price_history"
)
return MaterializeResult(
metadata={
"count": total_rows,
"errors": errors,
"listings_processed": len(listings) - errors,
}
)