feat: restructure sql ingestion

This commit is contained in:
Stijnvandenbroek
2026-03-04 16:54:23 +00:00
parent 78d648069b
commit 34a284d96b
23 changed files with 318 additions and 271 deletions

View File

@@ -0,0 +1 @@
"""Ingestion assets."""

View File

@@ -0,0 +1,9 @@
"""Funda ingestion assets."""
from data_platform.assets.ingestion.funda.funda import (
funda_listing_details,
funda_price_history,
funda_search_results,
)
__all__ = ["funda_listing_details", "funda_price_history", "funda_search_results"]

View File

@@ -1,6 +1,7 @@
"""Funda real-estate ingestion assets."""
import json
from pathlib import Path
from dagster import (
AssetExecutionContext,
@@ -15,10 +16,14 @@ from data_platform.helpers import (
format_area,
format_euro,
md_preview_table,
render_sql,
safe_int,
)
from data_platform.resources import FundaResource, PostgresResource
_SQL_DIR = Path(__file__).parent / "sql"
_SCHEMA = "raw_funda"
class FundaSearchConfig(Config):
"""Search parameters for Funda."""
@@ -50,155 +55,6 @@ class FundaPriceHistoryConfig(Config):
fetch_all: bool = True
_SCHEMA = "raw_funda"
_DDL_SEARCH = f"""
CREATE TABLE IF NOT EXISTS {_SCHEMA}.search_results (
global_id TEXT,
title TEXT,
city TEXT,
postcode TEXT,
province TEXT,
neighbourhood TEXT,
price BIGINT,
living_area INT,
plot_area INT,
bedrooms INT,
rooms INT,
energy_label TEXT,
object_type TEXT,
offering_type TEXT,
construction_type TEXT,
publish_date TEXT,
broker_id TEXT,
broker_name TEXT,
raw_json JSONB,
ingested_at TIMESTAMPTZ DEFAULT now(),
UNIQUE (global_id)
);
"""
_DDL_DETAILS = f"""
CREATE TABLE IF NOT EXISTS {_SCHEMA}.listing_details (
global_id TEXT,
tiny_id TEXT,
title TEXT,
city TEXT,
postcode TEXT,
province TEXT,
neighbourhood TEXT,
municipality TEXT,
price BIGINT,
price_formatted TEXT,
status TEXT,
offering_type TEXT,
object_type TEXT,
house_type TEXT,
construction_type TEXT,
construction_year TEXT,
energy_label TEXT,
living_area INT,
plot_area INT,
bedrooms INT,
rooms INT,
description TEXT,
publication_date TEXT,
latitude DOUBLE PRECISION,
longitude DOUBLE PRECISION,
has_garden BOOLEAN,
has_balcony BOOLEAN,
has_solar_panels BOOLEAN,
has_heat_pump BOOLEAN,
has_roof_terrace BOOLEAN,
is_energy_efficient BOOLEAN,
is_monument BOOLEAN,
url TEXT,
photo_count INT,
views INT,
saves INT,
raw_json JSONB,
ingested_at TIMESTAMPTZ DEFAULT now(),
UNIQUE (global_id, status)
);
"""
_DDL_PRICE_HISTORY = f"""
CREATE TABLE IF NOT EXISTS {_SCHEMA}.price_history (
global_id TEXT,
price BIGINT,
human_price TEXT,
date TEXT,
timestamp TEXT,
source TEXT,
status TEXT,
ingested_at TIMESTAMPTZ DEFAULT now(),
UNIQUE (global_id, date, source, status)
);
"""
# Deduplicate existing rows and add constraints for tables created before UNIQUE clauses.
_MIGRATE_SEARCH_CONSTRAINT = f"""
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM pg_constraint
WHERE conrelid = '{_SCHEMA}.search_results'::regclass
AND contype = 'u'
) THEN
DELETE FROM {_SCHEMA}.search_results a
USING {_SCHEMA}.search_results b
WHERE a.global_id = b.global_id
AND a.ingested_at < b.ingested_at;
ALTER TABLE {_SCHEMA}.search_results
ADD UNIQUE (global_id);
END IF;
END $$;
"""
_MIGRATE_DETAILS_CONSTRAINT = f"""
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM pg_constraint
WHERE conrelid = '{_SCHEMA}.listing_details'::regclass
AND contype = 'u'
) THEN
DELETE FROM {_SCHEMA}.listing_details a
USING {_SCHEMA}.listing_details b
WHERE a.global_id = b.global_id
AND a.status IS NOT DISTINCT FROM b.status
AND a.ingested_at < b.ingested_at;
ALTER TABLE {_SCHEMA}.listing_details
ADD UNIQUE (global_id, status);
END IF;
END $$;
"""
_MIGRATE_PRICE_HISTORY_CONSTRAINT = f"""
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM pg_constraint
WHERE conrelid = '{_SCHEMA}.price_history'::regclass
AND contype = 'u'
) THEN
DELETE FROM {_SCHEMA}.price_history a
USING {_SCHEMA}.price_history b
WHERE a.global_id = b.global_id
AND a.date IS NOT DISTINCT FROM b.date
AND a.source IS NOT DISTINCT FROM b.source
AND a.status IS NOT DISTINCT FROM b.status
AND a.ingested_at < b.ingested_at;
ALTER TABLE {_SCHEMA}.price_history
ADD UNIQUE (global_id, date, source, status);
END IF;
END $$;
"""
@asset(
group_name="funda",
kinds={"python", "postgres"},
@@ -254,8 +110,16 @@ def funda_search_results(
engine = postgres.get_engine()
with engine.begin() as conn:
conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {_SCHEMA}"))
conn.execute(text(_DDL_SEARCH))
conn.execute(text(_MIGRATE_SEARCH_CONSTRAINT))
conn.execute(
text(render_sql(_SQL_DIR, "ddl/create_search_results.sql", schema=_SCHEMA))
)
conn.execute(
text(
render_sql(
_SQL_DIR, "ddl/migrate_search_constraint.sql", schema=_SCHEMA
)
)
)
rows = []
for listing in all_listings:
@@ -284,39 +148,9 @@ def funda_search_results(
}
)
insert_sql = f"""
INSERT INTO {_SCHEMA}.search_results
(global_id, title, city, postcode, province, neighbourhood,
price, living_area, plot_area, bedrooms, rooms, energy_label,
object_type, offering_type, construction_type, publish_date,
broker_id, broker_name, raw_json)
VALUES
(:global_id, :title, :city, :postcode, :province, :neighbourhood,
:price, :living_area, :plot_area, :bedrooms, :rooms, :energy_label,
:object_type, :offering_type, :construction_type, :publish_date,
:broker_id, :broker_name, :raw_json)
ON CONFLICT (global_id) DO UPDATE SET
title = EXCLUDED.title,
city = EXCLUDED.city,
postcode = EXCLUDED.postcode,
province = EXCLUDED.province,
neighbourhood = EXCLUDED.neighbourhood,
price = EXCLUDED.price,
living_area = EXCLUDED.living_area,
plot_area = EXCLUDED.plot_area,
bedrooms = EXCLUDED.bedrooms,
rooms = EXCLUDED.rooms,
energy_label = EXCLUDED.energy_label,
object_type = EXCLUDED.object_type,
offering_type = EXCLUDED.offering_type,
construction_type = EXCLUDED.construction_type,
publish_date = EXCLUDED.publish_date,
broker_id = EXCLUDED.broker_id,
broker_name = EXCLUDED.broker_name,
raw_json = EXCLUDED.raw_json,
ingested_at = now()
"""
postgres.execute_many(insert_sql, rows)
postgres.execute_many(
render_sql(_SQL_DIR, "dml/insert_search_results.sql", schema=_SCHEMA), rows
)
context.log.info(
f"Inserted {len(rows)} search results into {_SCHEMA}.search_results"
@@ -361,8 +195,16 @@ def funda_listing_details(
with engine.begin() as conn:
conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {_SCHEMA}"))
conn.execute(text(_DDL_DETAILS))
conn.execute(text(_MIGRATE_DETAILS_CONSTRAINT))
conn.execute(
text(render_sql(_SQL_DIR, "ddl/create_listing_details.sql", schema=_SCHEMA))
)
conn.execute(
text(
render_sql(
_SQL_DIR, "ddl/migrate_details_constraint.sql", schema=_SCHEMA
)
)
)
with engine.connect() as conn:
result = conn.execute(
@@ -432,66 +274,9 @@ def funda_listing_details(
context.log.info(f" fetched {i + 1}/{len(ids)}")
if rows:
insert_sql = f"""
INSERT INTO {_SCHEMA}.listing_details
(global_id, tiny_id, title, city, postcode, province,
neighbourhood, municipality, price, price_formatted,
status, offering_type, object_type, house_type,
construction_type, construction_year, energy_label,
living_area, plot_area, bedrooms, rooms, description,
publication_date, latitude, longitude,
has_garden, has_balcony, has_solar_panels, has_heat_pump,
has_roof_terrace, is_energy_efficient, is_monument,
url, photo_count, views, saves, raw_json)
VALUES
(:global_id, :tiny_id, :title, :city, :postcode, :province,
:neighbourhood, :municipality, :price, :price_formatted,
:status, :offering_type, :object_type, :house_type,
:construction_type, :construction_year, :energy_label,
:living_area, :plot_area, :bedrooms, :rooms, :description,
:publication_date, :latitude, :longitude,
:has_garden, :has_balcony, :has_solar_panels, :has_heat_pump,
:has_roof_terrace, :is_energy_efficient, :is_monument,
:url, :photo_count, :views, :saves, :raw_json)
ON CONFLICT (global_id, status) DO UPDATE SET
tiny_id = EXCLUDED.tiny_id,
title = EXCLUDED.title,
city = EXCLUDED.city,
postcode = EXCLUDED.postcode,
province = EXCLUDED.province,
neighbourhood = EXCLUDED.neighbourhood,
municipality = EXCLUDED.municipality,
price = EXCLUDED.price,
price_formatted = EXCLUDED.price_formatted,
offering_type = EXCLUDED.offering_type,
object_type = EXCLUDED.object_type,
house_type = EXCLUDED.house_type,
construction_type = EXCLUDED.construction_type,
construction_year = EXCLUDED.construction_year,
energy_label = EXCLUDED.energy_label,
living_area = EXCLUDED.living_area,
plot_area = EXCLUDED.plot_area,
bedrooms = EXCLUDED.bedrooms,
rooms = EXCLUDED.rooms,
description = EXCLUDED.description,
publication_date = EXCLUDED.publication_date,
latitude = EXCLUDED.latitude,
longitude = EXCLUDED.longitude,
has_garden = EXCLUDED.has_garden,
has_balcony = EXCLUDED.has_balcony,
has_solar_panels = EXCLUDED.has_solar_panels,
has_heat_pump = EXCLUDED.has_heat_pump,
has_roof_terrace = EXCLUDED.has_roof_terrace,
is_energy_efficient = EXCLUDED.is_energy_efficient,
is_monument = EXCLUDED.is_monument,
url = EXCLUDED.url,
photo_count = EXCLUDED.photo_count,
views = EXCLUDED.views,
saves = EXCLUDED.saves,
raw_json = EXCLUDED.raw_json,
ingested_at = now()
"""
postgres.execute_many(insert_sql, rows)
postgres.execute_many(
render_sql(_SQL_DIR, "dml/insert_listing_details.sql", schema=_SCHEMA), rows
)
context.log.info(
f"Inserted {len(rows)} listing details ({errors} errors) into {_SCHEMA}.listing_details"
@@ -535,8 +320,16 @@ def funda_price_history(
with engine.begin() as conn:
conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {_SCHEMA}"))
conn.execute(text(_DDL_PRICE_HISTORY))
conn.execute(text(_MIGRATE_PRICE_HISTORY_CONSTRAINT))
conn.execute(
text(render_sql(_SQL_DIR, "ddl/create_price_history.sql", schema=_SCHEMA))
)
conn.execute(
text(
render_sql(
_SQL_DIR, "ddl/migrate_price_history_constraint.sql", schema=_SCHEMA
)
)
)
with engine.connect() as conn:
result = conn.execute(
@@ -579,18 +372,9 @@ def funda_price_history(
context.log.info(f" fetched {i + 1}/{len(ids)}")
if rows:
insert_sql = f"""
INSERT INTO {_SCHEMA}.price_history
(global_id, price, human_price, date, timestamp, source, status)
VALUES
(:global_id, :price, :human_price, :date, :timestamp, :source, :status)
ON CONFLICT (global_id, date, source, status) DO UPDATE SET
price = EXCLUDED.price,
human_price = EXCLUDED.human_price,
timestamp = EXCLUDED.timestamp,
ingested_at = now()
"""
postgres.execute_many(insert_sql, rows)
postgres.execute_many(
render_sql(_SQL_DIR, "dml/insert_price_history.sql", schema=_SCHEMA), rows
)
context.log.info(
f"Inserted {len(rows)} price history records ({errors} errors) into {_SCHEMA}.price_history"

View File

@@ -0,0 +1,41 @@
CREATE TABLE IF NOT EXISTS {{ schema }}.listing_details (
global_id TEXT,
tiny_id TEXT,
title TEXT,
city TEXT,
postcode TEXT,
province TEXT,
neighbourhood TEXT,
municipality TEXT,
price BIGINT,
price_formatted TEXT,
status TEXT,
offering_type TEXT,
object_type TEXT,
house_type TEXT,
construction_type TEXT,
construction_year TEXT,
energy_label TEXT,
living_area INT,
plot_area INT,
bedrooms INT,
rooms INT,
description TEXT,
publication_date TEXT,
latitude DOUBLE PRECISION,
longitude DOUBLE PRECISION,
has_garden BOOLEAN,
has_balcony BOOLEAN,
has_solar_panels BOOLEAN,
has_heat_pump BOOLEAN,
has_roof_terrace BOOLEAN,
is_energy_efficient BOOLEAN,
is_monument BOOLEAN,
url TEXT,
photo_count INT,
views INT,
saves INT,
raw_json JSONB,
ingested_at TIMESTAMPTZ DEFAULT now(),
UNIQUE (global_id, status)
);

View File

@@ -0,0 +1,11 @@
CREATE TABLE IF NOT EXISTS {{ schema }}.price_history (
global_id TEXT,
price BIGINT,
human_price TEXT,
date TEXT,
timestamp TEXT,
source TEXT,
status TEXT,
ingested_at TIMESTAMPTZ DEFAULT now(),
UNIQUE (global_id, date, source, status)
);

View File

@@ -0,0 +1,23 @@
CREATE TABLE IF NOT EXISTS {{ schema }}.search_results (
global_id TEXT,
title TEXT,
city TEXT,
postcode TEXT,
province TEXT,
neighbourhood TEXT,
price BIGINT,
living_area INT,
plot_area INT,
bedrooms INT,
rooms INT,
energy_label TEXT,
object_type TEXT,
offering_type TEXT,
construction_type TEXT,
publish_date TEXT,
broker_id TEXT,
broker_name TEXT,
raw_json JSONB,
ingested_at TIMESTAMPTZ DEFAULT now(),
UNIQUE (global_id)
);

View File

@@ -0,0 +1,18 @@
-- Deduplicate and add UNIQUE constraint to listing_details if it doesn't exist yet.
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM pg_constraint
WHERE conrelid = '{{ schema }}.listing_details'::regclass
AND contype = 'u'
) THEN
DELETE FROM {{ schema }}.listing_details a
USING {{ schema }}.listing_details b
WHERE a.global_id = b.global_id
AND a.status IS NOT DISTINCT FROM b.status
AND a.ingested_at < b.ingested_at;
ALTER TABLE {{ schema }}.listing_details
ADD UNIQUE (global_id, status);
END IF;
END $$;

View File

@@ -0,0 +1,20 @@
-- Deduplicate and add UNIQUE constraint to price_history if it doesn't exist yet.
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM pg_constraint
WHERE conrelid = '{{ schema }}.price_history'::regclass
AND contype = 'u'
) THEN
DELETE FROM {{ schema }}.price_history a
USING {{ schema }}.price_history b
WHERE a.global_id = b.global_id
AND a.date IS NOT DISTINCT FROM b.date
AND a.source IS NOT DISTINCT FROM b.source
AND a.status IS NOT DISTINCT FROM b.status
AND a.ingested_at < b.ingested_at;
ALTER TABLE {{ schema }}.price_history
ADD UNIQUE (global_id, date, source, status);
END IF;
END $$;

View File

@@ -0,0 +1,17 @@
-- Deduplicate and add UNIQUE constraint to search_results if it doesn't exist yet.
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM pg_constraint
WHERE conrelid = '{{ schema }}.search_results'::regclass
AND contype = 'u'
) THEN
DELETE FROM {{ schema }}.search_results a
USING {{ schema }}.search_results b
WHERE a.global_id = b.global_id
AND a.ingested_at < b.ingested_at;
ALTER TABLE {{ schema }}.search_results
ADD UNIQUE (global_id);
END IF;
END $$;

View File

@@ -0,0 +1,4 @@
[sqlfluff]
# SQLAlchemy :named_param bind parameters are misinterpreted by sqlfluff
# as cast operators, causing false LT01 spacing violations. Disable here.
exclude_rules = LT01

View File

@@ -0,0 +1,59 @@
INSERT INTO {{ schema }}.listing_details (
global_id, tiny_id, title, city, postcode, province,
neighbourhood, municipality, price, price_formatted,
status, offering_type, object_type, house_type,
construction_type, construction_year, energy_label,
living_area, plot_area, bedrooms, rooms, description,
publication_date, latitude, longitude,
has_garden, has_balcony, has_solar_panels, has_heat_pump,
has_roof_terrace, is_energy_efficient, is_monument,
url, photo_count, views, saves, raw_json
)
VALUES (
:global_id, :tiny_id, :title, :city, :postcode, :province,
:neighbourhood, :municipality, :price, :price_formatted,
:status, :offering_type, :object_type, :house_type,
:construction_type, :construction_year, :energy_label,
:living_area, :plot_area, :bedrooms, :rooms, :description,
:publication_date, :latitude, :longitude,
:has_garden, :has_balcony, :has_solar_panels, :has_heat_pump,
:has_roof_terrace, :is_energy_efficient, :is_monument,
:url, :photo_count, :views, :saves, :raw_json
)
ON CONFLICT (global_id, status) DO UPDATE SET
tiny_id = excluded.tiny_id,
title = excluded.title,
city = excluded.city,
postcode = excluded.postcode,
province = excluded.province,
neighbourhood = excluded.neighbourhood,
municipality = excluded.municipality,
price = excluded.price,
price_formatted = excluded.price_formatted,
offering_type = excluded.offering_type,
object_type = excluded.object_type,
house_type = excluded.house_type,
construction_type = excluded.construction_type,
construction_year = excluded.construction_year,
energy_label = excluded.energy_label,
living_area = excluded.living_area,
plot_area = excluded.plot_area,
bedrooms = excluded.bedrooms,
rooms = excluded.rooms,
description = excluded.description,
publication_date = excluded.publication_date,
latitude = excluded.latitude,
longitude = excluded.longitude,
has_garden = excluded.has_garden,
has_balcony = excluded.has_balcony,
has_solar_panels = excluded.has_solar_panels,
has_heat_pump = excluded.has_heat_pump,
has_roof_terrace = excluded.has_roof_terrace,
is_energy_efficient = excluded.is_energy_efficient,
is_monument = excluded.is_monument,
url = excluded.url,
photo_count = excluded.photo_count,
views = excluded.views,
saves = excluded.saves,
raw_json = excluded.raw_json,
ingested_at = now()

View File

@@ -0,0 +1,11 @@
INSERT INTO {{ schema }}.price_history (
global_id, price, human_price, date, timestamp, source, status
)
VALUES (
:global_id, :price, :human_price, :date, :timestamp, :source, :status
)
ON CONFLICT (global_id, date, source, status) DO UPDATE SET
price = excluded.price,
human_price = excluded.human_price,
timestamp = excluded.timestamp,
ingested_at = now()

View File

@@ -0,0 +1,32 @@
INSERT INTO {{ schema }}.search_results (
global_id, title, city, postcode, province, neighbourhood,
price, living_area, plot_area, bedrooms, rooms, energy_label,
object_type, offering_type, construction_type, publish_date,
broker_id, broker_name, raw_json
)
VALUES (
:global_id, :title, :city, :postcode, :province, :neighbourhood,
:price, :living_area, :plot_area, :bedrooms, :rooms, :energy_label,
:object_type, :offering_type, :construction_type, :publish_date,
:broker_id, :broker_name, :raw_json
)
ON CONFLICT (global_id) DO UPDATE SET
title = excluded.title,
city = excluded.city,
postcode = excluded.postcode,
province = excluded.province,
neighbourhood = excluded.neighbourhood,
price = excluded.price,
living_area = excluded.living_area,
plot_area = excluded.plot_area,
bedrooms = excluded.bedrooms,
rooms = excluded.rooms,
energy_label = excluded.energy_label,
object_type = excluded.object_type,
offering_type = excluded.offering_type,
construction_type = excluded.construction_type,
publish_date = excluded.publish_date,
broker_id = excluded.broker_id,
broker_name = excluded.broker_name,
raw_json = excluded.raw_json,
ingested_at = now()

View File

@@ -2,7 +2,7 @@ from dagster import Definitions
from dagster_dbt import DbtCliResource
from data_platform.assets.dbt import DBT_PROJECT_DIR, dbt_project_assets
from data_platform.assets.funda import (
from data_platform.assets.ingestion.funda import (
funda_listing_details,
funda_price_history,
funda_search_results,

View File

@@ -1,6 +1,15 @@
"""Shared helper utilities."""
import json
from pathlib import Path
from jinja2 import Environment, FileSystemLoader
def render_sql(sql_dir: Path, path: str, **kwargs: object) -> str:
"""Load and render a Jinja2 SQL template relative to sql_dir."""
env = Environment(loader=FileSystemLoader(str(sql_dir)), autoescape=False)
return env.get_template(path).render(**kwargs)
def safe(val):

View File

@@ -8,7 +8,7 @@ from dagster import (
define_asset_job,
)
from data_platform.assets.funda import (
from data_platform.assets.ingestion.funda.funda import (
FundaDetailsConfig,
FundaPriceHistoryConfig,
FundaSearchConfig,