diff --git a/data_platform/assets/funda.py b/data_platform/assets/funda.py index 97ff1bd..4ae33b2 100644 --- a/data_platform/assets/funda.py +++ b/data_platform/assets/funda.py @@ -35,10 +35,10 @@ from data_platform.resources import FundaResource, PostgresResource class FundaSearchConfig(Config): """Launchpad parameters for the Funda search asset.""" - location: str = "amsterdam" + location: str = "woerden, utrecht, zeist, maarssen, nieuwegein, gouda" offering_type: str = "buy" - price_min: int | None = None - price_max: int | None = None + price_min: int | None = 300000 + price_max: int | None = 500000 area_min: int | None = None area_max: int | None = None plot_min: int | None = None @@ -89,7 +89,8 @@ CREATE TABLE IF NOT EXISTS {_SCHEMA}.search_results ( broker_id TEXT, broker_name TEXT, raw_json JSONB, - ingested_at TIMESTAMPTZ DEFAULT now() + ingested_at TIMESTAMPTZ DEFAULT now(), + UNIQUE (global_id) ); """ @@ -132,7 +133,8 @@ CREATE TABLE IF NOT EXISTS {_SCHEMA}.listing_details ( views INT, saves INT, raw_json JSONB, - ingested_at TIMESTAMPTZ DEFAULT now() + ingested_at TIMESTAMPTZ DEFAULT now(), + UNIQUE (global_id, status) ); """ @@ -145,10 +147,74 @@ CREATE TABLE IF NOT EXISTS {_SCHEMA}.price_history ( timestamp TEXT, source TEXT, status TEXT, - ingested_at TIMESTAMPTZ DEFAULT now() + ingested_at TIMESTAMPTZ DEFAULT now(), + UNIQUE (global_id, date, source, status) ); """ +# Idempotent constraint migrations for tables created before the UNIQUE clauses. +# Deduplicates existing rows (keeps the most recent) before adding the constraint. +_MIGRATE_SEARCH_CONSTRAINT = f""" +DO $$ +BEGIN + IF NOT EXISTS ( + SELECT 1 FROM pg_constraint + WHERE conrelid = '{_SCHEMA}.search_results'::regclass + AND contype = 'u' + ) THEN + DELETE FROM {_SCHEMA}.search_results a + USING {_SCHEMA}.search_results b + WHERE a.global_id = b.global_id + AND a.ingested_at < b.ingested_at; + + ALTER TABLE {_SCHEMA}.search_results + ADD UNIQUE (global_id); + END IF; +END $$; +""" + +_MIGRATE_DETAILS_CONSTRAINT = f""" +DO $$ +BEGIN + IF NOT EXISTS ( + SELECT 1 FROM pg_constraint + WHERE conrelid = '{_SCHEMA}.listing_details'::regclass + AND contype = 'u' + ) THEN + DELETE FROM {_SCHEMA}.listing_details a + USING {_SCHEMA}.listing_details b + WHERE a.global_id = b.global_id + AND a.status IS NOT DISTINCT FROM b.status + AND a.ingested_at < b.ingested_at; + + ALTER TABLE {_SCHEMA}.listing_details + ADD UNIQUE (global_id, status); + END IF; +END $$; +""" + +_MIGRATE_PRICE_HISTORY_CONSTRAINT = f""" +DO $$ +BEGIN + IF NOT EXISTS ( + SELECT 1 FROM pg_constraint + WHERE conrelid = '{_SCHEMA}.price_history'::regclass + AND contype = 'u' + ) THEN + DELETE FROM {_SCHEMA}.price_history a + USING {_SCHEMA}.price_history b + WHERE a.global_id = b.global_id + AND a.date IS NOT DISTINCT FROM b.date + AND a.source IS NOT DISTINCT FROM b.source + AND a.status IS NOT DISTINCT FROM b.status + AND a.ingested_at < b.ingested_at; + + ALTER TABLE {_SCHEMA}.price_history + ADD UNIQUE (global_id, date, source, status); + END IF; +END $$; +""" + # --------------------------------------------------------------------------- # Assets @@ -214,8 +280,7 @@ def funda_search_results( with engine.begin() as conn: conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {_SCHEMA}")) conn.execute(text(_DDL_SEARCH)) - # Truncate before inserting fresh results - conn.execute(text(f"TRUNCATE TABLE {_SCHEMA}.search_results")) + conn.execute(text(_MIGRATE_SEARCH_CONSTRAINT)) rows = [] for listing in all_listings: @@ -255,6 +320,26 @@ def funda_search_results( :price, :living_area, :plot_area, :bedrooms, :rooms, :energy_label, :object_type, :offering_type, :construction_type, :publish_date, :broker_id, :broker_name, :raw_json) + ON CONFLICT (global_id) DO UPDATE SET + title = EXCLUDED.title, + city = EXCLUDED.city, + postcode = EXCLUDED.postcode, + province = EXCLUDED.province, + neighbourhood = EXCLUDED.neighbourhood, + price = EXCLUDED.price, + living_area = EXCLUDED.living_area, + plot_area = EXCLUDED.plot_area, + bedrooms = EXCLUDED.bedrooms, + rooms = EXCLUDED.rooms, + energy_label = EXCLUDED.energy_label, + object_type = EXCLUDED.object_type, + offering_type = EXCLUDED.offering_type, + construction_type = EXCLUDED.construction_type, + publish_date = EXCLUDED.publish_date, + broker_id = EXCLUDED.broker_id, + broker_name = EXCLUDED.broker_name, + raw_json = EXCLUDED.raw_json, + ingested_at = now() """ postgres.execute_many(insert_sql, rows) @@ -302,6 +387,7 @@ def funda_listing_details( with engine.begin() as conn: conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {_SCHEMA}")) conn.execute(text(_DDL_DETAILS)) + conn.execute(text(_MIGRATE_DETAILS_CONSTRAINT)) # Read listing IDs from search results with engine.connect() as conn: @@ -316,10 +402,6 @@ def funda_listing_details( context.log.info(f"Fetching details for {len(ids)} listings …") - # Truncate before inserting - with engine.begin() as conn: - conn.execute(text(f"TRUNCATE TABLE {_SCHEMA}.listing_details")) - rows = [] errors = 0 for i, gid in enumerate(ids): @@ -397,6 +479,43 @@ def funda_listing_details( :has_garden, :has_balcony, :has_solar_panels, :has_heat_pump, :has_roof_terrace, :is_energy_efficient, :is_monument, :url, :photo_count, :views, :saves, :raw_json) + ON CONFLICT (global_id, status) DO UPDATE SET + tiny_id = EXCLUDED.tiny_id, + title = EXCLUDED.title, + city = EXCLUDED.city, + postcode = EXCLUDED.postcode, + province = EXCLUDED.province, + neighbourhood = EXCLUDED.neighbourhood, + municipality = EXCLUDED.municipality, + price = EXCLUDED.price, + price_formatted = EXCLUDED.price_formatted, + offering_type = EXCLUDED.offering_type, + object_type = EXCLUDED.object_type, + house_type = EXCLUDED.house_type, + construction_type = EXCLUDED.construction_type, + construction_year = EXCLUDED.construction_year, + energy_label = EXCLUDED.energy_label, + living_area = EXCLUDED.living_area, + plot_area = EXCLUDED.plot_area, + bedrooms = EXCLUDED.bedrooms, + rooms = EXCLUDED.rooms, + description = EXCLUDED.description, + publication_date = EXCLUDED.publication_date, + latitude = EXCLUDED.latitude, + longitude = EXCLUDED.longitude, + has_garden = EXCLUDED.has_garden, + has_balcony = EXCLUDED.has_balcony, + has_solar_panels = EXCLUDED.has_solar_panels, + has_heat_pump = EXCLUDED.has_heat_pump, + has_roof_terrace = EXCLUDED.has_roof_terrace, + is_energy_efficient = EXCLUDED.is_energy_efficient, + is_monument = EXCLUDED.is_monument, + url = EXCLUDED.url, + photo_count = EXCLUDED.photo_count, + views = EXCLUDED.views, + saves = EXCLUDED.saves, + raw_json = EXCLUDED.raw_json, + ingested_at = now() """ postgres.execute_many(insert_sql, rows) @@ -443,6 +562,7 @@ def funda_price_history( with engine.begin() as conn: conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {_SCHEMA}")) conn.execute(text(_DDL_PRICE_HISTORY)) + conn.execute(text(_MIGRATE_PRICE_HISTORY_CONSTRAINT)) # Read listings from details table with engine.connect() as conn: @@ -459,10 +579,6 @@ def funda_price_history( context.log.info(f"Fetching price history for {len(ids)} listings …") - # Truncate before inserting - with engine.begin() as conn: - conn.execute(text(f"TRUNCATE TABLE {_SCHEMA}.price_history")) - rows = [] errors = 0 for i, gid in enumerate(ids): @@ -496,6 +612,11 @@ def funda_price_history( (global_id, price, human_price, date, timestamp, source, status) VALUES (:global_id, :price, :human_price, :date, :timestamp, :source, :status) + ON CONFLICT (global_id, date, source, status) DO UPDATE SET + price = EXCLUDED.price, + human_price = EXCLUDED.human_price, + timestamp = EXCLUDED.timestamp, + ingested_at = now() """ postgres.execute_many(insert_sql, rows) diff --git a/data_platform/definitions.py b/data_platform/definitions.py index c895381..3d7c539 100644 --- a/data_platform/definitions.py +++ b/data_platform/definitions.py @@ -8,6 +8,7 @@ from data_platform.assets.funda import ( funda_search_results, ) from data_platform.resources import FundaResource, PostgresResource +from data_platform.schedules import funda_ingestion_job, funda_ingestion_schedule # --------------------------------------------------------------------------- # Definitions @@ -20,6 +21,8 @@ defs = Definitions( funda_listing_details, funda_price_history, ], + jobs=[funda_ingestion_job], + schedules=[funda_ingestion_schedule], resources={ "dbt": DbtCliResource(project_dir=str(DBT_PROJECT_DIR)), "funda": FundaResource(), diff --git a/data_platform/schedules.py b/data_platform/schedules.py new file mode 100644 index 0000000..400113a --- /dev/null +++ b/data_platform/schedules.py @@ -0,0 +1,46 @@ +"""Dagster jobs and schedules for the data platform.""" + +from dagster import ( + AssetSelection, + RunConfig, + ScheduleDefinition, + define_asset_job, +) + +from data_platform.assets.funda import ( + FundaDetailsConfig, + FundaPriceHistoryConfig, + FundaSearchConfig, +) + +# --------------------------------------------------------------------------- +# Jobs +# --------------------------------------------------------------------------- + +funda_ingestion_job = define_asset_job( + name="funda_ingestion", + selection=AssetSelection.assets( + "funda_search_results", + "funda_listing_details", + "funda_price_history", + ), + description="Run the full Funda ingestion pipeline (search → details → price history).", +) + +# --------------------------------------------------------------------------- +# Schedules +# --------------------------------------------------------------------------- + +funda_ingestion_schedule = ScheduleDefinition( + name="funda_ingestion_schedule", + job=funda_ingestion_job, + cron_schedule="0 */4 * * *", # every 4 hours + run_config=RunConfig( + ops={ + "funda_search_results": FundaSearchConfig(), + "funda_listing_details": FundaDetailsConfig(), + "funda_price_history": FundaPriceHistoryConfig(), + } + ), + default_status="RUNNING", +) diff --git a/tests/test_assets_funda.py b/tests/test_assets_funda.py index 2aceae9..df79f9e 100644 --- a/tests/test_assets_funda.py +++ b/tests/test_assets_funda.py @@ -320,11 +320,12 @@ class TestFundaPriceHistory: class TestFundaSearchConfig: def test_defaults(self): cfg = FundaSearchConfig() - assert cfg.location == "amsterdam" + assert cfg.location == "woerden, utrecht, zeist, maarssen, nieuwegein, gouda" assert cfg.offering_type == "buy" assert cfg.sort == "newest" assert cfg.max_pages == 3 - assert cfg.price_min is None + assert cfg.price_min == 300000 + assert cfg.price_max == 500000 def test_custom_values(self): cfg = FundaSearchConfig(