feat: create incremental funda ingestion schedule
This commit is contained in:
@@ -35,10 +35,10 @@ from data_platform.resources import FundaResource, PostgresResource
|
||||
class FundaSearchConfig(Config):
|
||||
"""Launchpad parameters for the Funda search asset."""
|
||||
|
||||
location: str = "amsterdam"
|
||||
location: str = "woerden, utrecht, zeist, maarssen, nieuwegein, gouda"
|
||||
offering_type: str = "buy"
|
||||
price_min: int | None = None
|
||||
price_max: int | None = None
|
||||
price_min: int | None = 300000
|
||||
price_max: int | None = 500000
|
||||
area_min: int | None = None
|
||||
area_max: int | None = None
|
||||
plot_min: int | None = None
|
||||
@@ -89,7 +89,8 @@ CREATE TABLE IF NOT EXISTS {_SCHEMA}.search_results (
|
||||
broker_id TEXT,
|
||||
broker_name TEXT,
|
||||
raw_json JSONB,
|
||||
ingested_at TIMESTAMPTZ DEFAULT now()
|
||||
ingested_at TIMESTAMPTZ DEFAULT now(),
|
||||
UNIQUE (global_id)
|
||||
);
|
||||
"""
|
||||
|
||||
@@ -132,7 +133,8 @@ CREATE TABLE IF NOT EXISTS {_SCHEMA}.listing_details (
|
||||
views INT,
|
||||
saves INT,
|
||||
raw_json JSONB,
|
||||
ingested_at TIMESTAMPTZ DEFAULT now()
|
||||
ingested_at TIMESTAMPTZ DEFAULT now(),
|
||||
UNIQUE (global_id, status)
|
||||
);
|
||||
"""
|
||||
|
||||
@@ -145,10 +147,74 @@ CREATE TABLE IF NOT EXISTS {_SCHEMA}.price_history (
|
||||
timestamp TEXT,
|
||||
source TEXT,
|
||||
status TEXT,
|
||||
ingested_at TIMESTAMPTZ DEFAULT now()
|
||||
ingested_at TIMESTAMPTZ DEFAULT now(),
|
||||
UNIQUE (global_id, date, source, status)
|
||||
);
|
||||
"""
|
||||
|
||||
# Idempotent constraint migrations for tables created before the UNIQUE clauses.
|
||||
# Deduplicates existing rows (keeps the most recent) before adding the constraint.
|
||||
_MIGRATE_SEARCH_CONSTRAINT = f"""
|
||||
DO $$
|
||||
BEGIN
|
||||
IF NOT EXISTS (
|
||||
SELECT 1 FROM pg_constraint
|
||||
WHERE conrelid = '{_SCHEMA}.search_results'::regclass
|
||||
AND contype = 'u'
|
||||
) THEN
|
||||
DELETE FROM {_SCHEMA}.search_results a
|
||||
USING {_SCHEMA}.search_results b
|
||||
WHERE a.global_id = b.global_id
|
||||
AND a.ingested_at < b.ingested_at;
|
||||
|
||||
ALTER TABLE {_SCHEMA}.search_results
|
||||
ADD UNIQUE (global_id);
|
||||
END IF;
|
||||
END $$;
|
||||
"""
|
||||
|
||||
_MIGRATE_DETAILS_CONSTRAINT = f"""
|
||||
DO $$
|
||||
BEGIN
|
||||
IF NOT EXISTS (
|
||||
SELECT 1 FROM pg_constraint
|
||||
WHERE conrelid = '{_SCHEMA}.listing_details'::regclass
|
||||
AND contype = 'u'
|
||||
) THEN
|
||||
DELETE FROM {_SCHEMA}.listing_details a
|
||||
USING {_SCHEMA}.listing_details b
|
||||
WHERE a.global_id = b.global_id
|
||||
AND a.status IS NOT DISTINCT FROM b.status
|
||||
AND a.ingested_at < b.ingested_at;
|
||||
|
||||
ALTER TABLE {_SCHEMA}.listing_details
|
||||
ADD UNIQUE (global_id, status);
|
||||
END IF;
|
||||
END $$;
|
||||
"""
|
||||
|
||||
_MIGRATE_PRICE_HISTORY_CONSTRAINT = f"""
|
||||
DO $$
|
||||
BEGIN
|
||||
IF NOT EXISTS (
|
||||
SELECT 1 FROM pg_constraint
|
||||
WHERE conrelid = '{_SCHEMA}.price_history'::regclass
|
||||
AND contype = 'u'
|
||||
) THEN
|
||||
DELETE FROM {_SCHEMA}.price_history a
|
||||
USING {_SCHEMA}.price_history b
|
||||
WHERE a.global_id = b.global_id
|
||||
AND a.date IS NOT DISTINCT FROM b.date
|
||||
AND a.source IS NOT DISTINCT FROM b.source
|
||||
AND a.status IS NOT DISTINCT FROM b.status
|
||||
AND a.ingested_at < b.ingested_at;
|
||||
|
||||
ALTER TABLE {_SCHEMA}.price_history
|
||||
ADD UNIQUE (global_id, date, source, status);
|
||||
END IF;
|
||||
END $$;
|
||||
"""
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Assets
|
||||
@@ -214,8 +280,7 @@ def funda_search_results(
|
||||
with engine.begin() as conn:
|
||||
conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {_SCHEMA}"))
|
||||
conn.execute(text(_DDL_SEARCH))
|
||||
# Truncate before inserting fresh results
|
||||
conn.execute(text(f"TRUNCATE TABLE {_SCHEMA}.search_results"))
|
||||
conn.execute(text(_MIGRATE_SEARCH_CONSTRAINT))
|
||||
|
||||
rows = []
|
||||
for listing in all_listings:
|
||||
@@ -255,6 +320,26 @@ def funda_search_results(
|
||||
:price, :living_area, :plot_area, :bedrooms, :rooms, :energy_label,
|
||||
:object_type, :offering_type, :construction_type, :publish_date,
|
||||
:broker_id, :broker_name, :raw_json)
|
||||
ON CONFLICT (global_id) DO UPDATE SET
|
||||
title = EXCLUDED.title,
|
||||
city = EXCLUDED.city,
|
||||
postcode = EXCLUDED.postcode,
|
||||
province = EXCLUDED.province,
|
||||
neighbourhood = EXCLUDED.neighbourhood,
|
||||
price = EXCLUDED.price,
|
||||
living_area = EXCLUDED.living_area,
|
||||
plot_area = EXCLUDED.plot_area,
|
||||
bedrooms = EXCLUDED.bedrooms,
|
||||
rooms = EXCLUDED.rooms,
|
||||
energy_label = EXCLUDED.energy_label,
|
||||
object_type = EXCLUDED.object_type,
|
||||
offering_type = EXCLUDED.offering_type,
|
||||
construction_type = EXCLUDED.construction_type,
|
||||
publish_date = EXCLUDED.publish_date,
|
||||
broker_id = EXCLUDED.broker_id,
|
||||
broker_name = EXCLUDED.broker_name,
|
||||
raw_json = EXCLUDED.raw_json,
|
||||
ingested_at = now()
|
||||
"""
|
||||
postgres.execute_many(insert_sql, rows)
|
||||
|
||||
@@ -302,6 +387,7 @@ def funda_listing_details(
|
||||
with engine.begin() as conn:
|
||||
conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {_SCHEMA}"))
|
||||
conn.execute(text(_DDL_DETAILS))
|
||||
conn.execute(text(_MIGRATE_DETAILS_CONSTRAINT))
|
||||
|
||||
# Read listing IDs from search results
|
||||
with engine.connect() as conn:
|
||||
@@ -316,10 +402,6 @@ def funda_listing_details(
|
||||
|
||||
context.log.info(f"Fetching details for {len(ids)} listings …")
|
||||
|
||||
# Truncate before inserting
|
||||
with engine.begin() as conn:
|
||||
conn.execute(text(f"TRUNCATE TABLE {_SCHEMA}.listing_details"))
|
||||
|
||||
rows = []
|
||||
errors = 0
|
||||
for i, gid in enumerate(ids):
|
||||
@@ -397,6 +479,43 @@ def funda_listing_details(
|
||||
:has_garden, :has_balcony, :has_solar_panels, :has_heat_pump,
|
||||
:has_roof_terrace, :is_energy_efficient, :is_monument,
|
||||
:url, :photo_count, :views, :saves, :raw_json)
|
||||
ON CONFLICT (global_id, status) DO UPDATE SET
|
||||
tiny_id = EXCLUDED.tiny_id,
|
||||
title = EXCLUDED.title,
|
||||
city = EXCLUDED.city,
|
||||
postcode = EXCLUDED.postcode,
|
||||
province = EXCLUDED.province,
|
||||
neighbourhood = EXCLUDED.neighbourhood,
|
||||
municipality = EXCLUDED.municipality,
|
||||
price = EXCLUDED.price,
|
||||
price_formatted = EXCLUDED.price_formatted,
|
||||
offering_type = EXCLUDED.offering_type,
|
||||
object_type = EXCLUDED.object_type,
|
||||
house_type = EXCLUDED.house_type,
|
||||
construction_type = EXCLUDED.construction_type,
|
||||
construction_year = EXCLUDED.construction_year,
|
||||
energy_label = EXCLUDED.energy_label,
|
||||
living_area = EXCLUDED.living_area,
|
||||
plot_area = EXCLUDED.plot_area,
|
||||
bedrooms = EXCLUDED.bedrooms,
|
||||
rooms = EXCLUDED.rooms,
|
||||
description = EXCLUDED.description,
|
||||
publication_date = EXCLUDED.publication_date,
|
||||
latitude = EXCLUDED.latitude,
|
||||
longitude = EXCLUDED.longitude,
|
||||
has_garden = EXCLUDED.has_garden,
|
||||
has_balcony = EXCLUDED.has_balcony,
|
||||
has_solar_panels = EXCLUDED.has_solar_panels,
|
||||
has_heat_pump = EXCLUDED.has_heat_pump,
|
||||
has_roof_terrace = EXCLUDED.has_roof_terrace,
|
||||
is_energy_efficient = EXCLUDED.is_energy_efficient,
|
||||
is_monument = EXCLUDED.is_monument,
|
||||
url = EXCLUDED.url,
|
||||
photo_count = EXCLUDED.photo_count,
|
||||
views = EXCLUDED.views,
|
||||
saves = EXCLUDED.saves,
|
||||
raw_json = EXCLUDED.raw_json,
|
||||
ingested_at = now()
|
||||
"""
|
||||
postgres.execute_many(insert_sql, rows)
|
||||
|
||||
@@ -443,6 +562,7 @@ def funda_price_history(
|
||||
with engine.begin() as conn:
|
||||
conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {_SCHEMA}"))
|
||||
conn.execute(text(_DDL_PRICE_HISTORY))
|
||||
conn.execute(text(_MIGRATE_PRICE_HISTORY_CONSTRAINT))
|
||||
|
||||
# Read listings from details table
|
||||
with engine.connect() as conn:
|
||||
@@ -459,10 +579,6 @@ def funda_price_history(
|
||||
|
||||
context.log.info(f"Fetching price history for {len(ids)} listings …")
|
||||
|
||||
# Truncate before inserting
|
||||
with engine.begin() as conn:
|
||||
conn.execute(text(f"TRUNCATE TABLE {_SCHEMA}.price_history"))
|
||||
|
||||
rows = []
|
||||
errors = 0
|
||||
for i, gid in enumerate(ids):
|
||||
@@ -496,6 +612,11 @@ def funda_price_history(
|
||||
(global_id, price, human_price, date, timestamp, source, status)
|
||||
VALUES
|
||||
(:global_id, :price, :human_price, :date, :timestamp, :source, :status)
|
||||
ON CONFLICT (global_id, date, source, status) DO UPDATE SET
|
||||
price = EXCLUDED.price,
|
||||
human_price = EXCLUDED.human_price,
|
||||
timestamp = EXCLUDED.timestamp,
|
||||
ingested_at = now()
|
||||
"""
|
||||
postgres.execute_many(insert_sql, rows)
|
||||
|
||||
|
||||
@@ -8,6 +8,7 @@ from data_platform.assets.funda import (
|
||||
funda_search_results,
|
||||
)
|
||||
from data_platform.resources import FundaResource, PostgresResource
|
||||
from data_platform.schedules import funda_ingestion_job, funda_ingestion_schedule
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Definitions
|
||||
@@ -20,6 +21,8 @@ defs = Definitions(
|
||||
funda_listing_details,
|
||||
funda_price_history,
|
||||
],
|
||||
jobs=[funda_ingestion_job],
|
||||
schedules=[funda_ingestion_schedule],
|
||||
resources={
|
||||
"dbt": DbtCliResource(project_dir=str(DBT_PROJECT_DIR)),
|
||||
"funda": FundaResource(),
|
||||
|
||||
46
data_platform/schedules.py
Normal file
46
data_platform/schedules.py
Normal file
@@ -0,0 +1,46 @@
|
||||
"""Dagster jobs and schedules for the data platform."""
|
||||
|
||||
from dagster import (
|
||||
AssetSelection,
|
||||
RunConfig,
|
||||
ScheduleDefinition,
|
||||
define_asset_job,
|
||||
)
|
||||
|
||||
from data_platform.assets.funda import (
|
||||
FundaDetailsConfig,
|
||||
FundaPriceHistoryConfig,
|
||||
FundaSearchConfig,
|
||||
)
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Jobs
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
funda_ingestion_job = define_asset_job(
|
||||
name="funda_ingestion",
|
||||
selection=AssetSelection.assets(
|
||||
"funda_search_results",
|
||||
"funda_listing_details",
|
||||
"funda_price_history",
|
||||
),
|
||||
description="Run the full Funda ingestion pipeline (search → details → price history).",
|
||||
)
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Schedules
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
funda_ingestion_schedule = ScheduleDefinition(
|
||||
name="funda_ingestion_schedule",
|
||||
job=funda_ingestion_job,
|
||||
cron_schedule="0 */4 * * *", # every 4 hours
|
||||
run_config=RunConfig(
|
||||
ops={
|
||||
"funda_search_results": FundaSearchConfig(),
|
||||
"funda_listing_details": FundaDetailsConfig(),
|
||||
"funda_price_history": FundaPriceHistoryConfig(),
|
||||
}
|
||||
),
|
||||
default_status="RUNNING",
|
||||
)
|
||||
Reference in New Issue
Block a user