diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index b60aff0..7fe0328 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -43,7 +43,9 @@ jobs: run: uv sync - name: SQLFluff lint - run: uv run sqlfluff lint dbt/models --dialect postgres + run: + uv run sqlfluff lint dbt/models data_platform/assets/ingestion/funda/sql --dialect + postgres lint-yaml-json-md: name: Prettier (YAML / JSON / Markdown) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 52232d1..d66572d 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -19,7 +19,7 @@ repos: entry: uv run sqlfluff lint --dialect postgres language: system types: [sql] - files: ^dbt/models/ + files: ^(dbt/models/|data_platform/assets/ingestion/) - id: prettier name: prettier diff --git a/.sqlfluff b/.sqlfluff index de49250..d938c01 100644 --- a/.sqlfluff +++ b/.sqlfluff @@ -3,8 +3,8 @@ templater = jinja dialect = postgres max_line_length = 100 # Exclude generated/vendor paths -# Don't require quoted identifiers -exclude_rules = RF05 +# Don't require quoted identifiers; allow type-name identifiers (date, timestamp, etc.) +exclude_rules = RF04, RF05 output_line_length = 120 [sqlfluff:templater:jinja] @@ -13,6 +13,9 @@ output_line_length = 120 source = "{% macro source(source_name, table_name) %}{{ source_name }}.{{ table_name }}{% endmacro %}" ref = "{% macro ref(model_name) %}{{ model_name }}{% endmacro %}" +[sqlfluff:templater:jinja:context] +schema = raw_funda + [sqlfluff:indentation] indent_unit = space tab_space_size = 4 diff --git a/Makefile b/Makefile index a6dd0b3..777e822 100644 --- a/Makefile +++ b/Makefile @@ -92,8 +92,8 @@ lint-python: ## Ruff lint + format check uv run ruff check . uv run ruff format --check . -lint-sql: ## SQLFluff lint on dbt/models - uv run sqlfluff lint dbt/models --dialect postgres +lint-sql: ## SQLFluff lint on dbt/models and ingestion SQL + uv run sqlfluff lint dbt/models data_platform/assets/ingestion/funda/sql --dialect postgres lint-format: ## Prettier check (YAML / Markdown) npx --yes prettier --check "**/*.yml" "**/*.yaml" "**/*.md" \ @@ -102,7 +102,7 @@ lint-format: ## Prettier check (YAML / Markdown) lint-fix: ## Auto-fix all linters (ruff + sqlfluff + prettier) uv run ruff check --fix . uv run ruff format . - uv run sqlfluff fix dbt/models --dialect postgres + uv run sqlfluff fix dbt/models data_platform/assets/ingestion/funda/sql --dialect postgres npx --yes prettier --write "**/*.yml" "**/*.yaml" "**/*.md" \ --ignore-path .prettierignore diff --git a/data_platform/assets/ingestion/__init__.py b/data_platform/assets/ingestion/__init__.py new file mode 100644 index 0000000..7d8fead --- /dev/null +++ b/data_platform/assets/ingestion/__init__.py @@ -0,0 +1 @@ +"""Ingestion assets.""" diff --git a/data_platform/assets/ingestion/funda/__init__.py b/data_platform/assets/ingestion/funda/__init__.py new file mode 100644 index 0000000..3bd1c76 --- /dev/null +++ b/data_platform/assets/ingestion/funda/__init__.py @@ -0,0 +1,9 @@ +"""Funda ingestion assets.""" + +from data_platform.assets.ingestion.funda.funda import ( + funda_listing_details, + funda_price_history, + funda_search_results, +) + +__all__ = ["funda_listing_details", "funda_price_history", "funda_search_results"] diff --git a/data_platform/assets/funda.py b/data_platform/assets/ingestion/funda/funda.py similarity index 54% rename from data_platform/assets/funda.py rename to data_platform/assets/ingestion/funda/funda.py index 2c5435c..14b8e2b 100644 --- a/data_platform/assets/funda.py +++ b/data_platform/assets/ingestion/funda/funda.py @@ -1,6 +1,7 @@ """Funda real-estate ingestion assets.""" import json +from pathlib import Path from dagster import ( AssetExecutionContext, @@ -15,10 +16,14 @@ from data_platform.helpers import ( format_area, format_euro, md_preview_table, + render_sql, safe_int, ) from data_platform.resources import FundaResource, PostgresResource +_SQL_DIR = Path(__file__).parent / "sql" +_SCHEMA = "raw_funda" + class FundaSearchConfig(Config): """Search parameters for Funda.""" @@ -50,155 +55,6 @@ class FundaPriceHistoryConfig(Config): fetch_all: bool = True -_SCHEMA = "raw_funda" - -_DDL_SEARCH = f""" -CREATE TABLE IF NOT EXISTS {_SCHEMA}.search_results ( - global_id TEXT, - title TEXT, - city TEXT, - postcode TEXT, - province TEXT, - neighbourhood TEXT, - price BIGINT, - living_area INT, - plot_area INT, - bedrooms INT, - rooms INT, - energy_label TEXT, - object_type TEXT, - offering_type TEXT, - construction_type TEXT, - publish_date TEXT, - broker_id TEXT, - broker_name TEXT, - raw_json JSONB, - ingested_at TIMESTAMPTZ DEFAULT now(), - UNIQUE (global_id) -); -""" - -_DDL_DETAILS = f""" -CREATE TABLE IF NOT EXISTS {_SCHEMA}.listing_details ( - global_id TEXT, - tiny_id TEXT, - title TEXT, - city TEXT, - postcode TEXT, - province TEXT, - neighbourhood TEXT, - municipality TEXT, - price BIGINT, - price_formatted TEXT, - status TEXT, - offering_type TEXT, - object_type TEXT, - house_type TEXT, - construction_type TEXT, - construction_year TEXT, - energy_label TEXT, - living_area INT, - plot_area INT, - bedrooms INT, - rooms INT, - description TEXT, - publication_date TEXT, - latitude DOUBLE PRECISION, - longitude DOUBLE PRECISION, - has_garden BOOLEAN, - has_balcony BOOLEAN, - has_solar_panels BOOLEAN, - has_heat_pump BOOLEAN, - has_roof_terrace BOOLEAN, - is_energy_efficient BOOLEAN, - is_monument BOOLEAN, - url TEXT, - photo_count INT, - views INT, - saves INT, - raw_json JSONB, - ingested_at TIMESTAMPTZ DEFAULT now(), - UNIQUE (global_id, status) -); -""" - -_DDL_PRICE_HISTORY = f""" -CREATE TABLE IF NOT EXISTS {_SCHEMA}.price_history ( - global_id TEXT, - price BIGINT, - human_price TEXT, - date TEXT, - timestamp TEXT, - source TEXT, - status TEXT, - ingested_at TIMESTAMPTZ DEFAULT now(), - UNIQUE (global_id, date, source, status) -); -""" - -# Deduplicate existing rows and add constraints for tables created before UNIQUE clauses. -_MIGRATE_SEARCH_CONSTRAINT = f""" -DO $$ -BEGIN - IF NOT EXISTS ( - SELECT 1 FROM pg_constraint - WHERE conrelid = '{_SCHEMA}.search_results'::regclass - AND contype = 'u' - ) THEN - DELETE FROM {_SCHEMA}.search_results a - USING {_SCHEMA}.search_results b - WHERE a.global_id = b.global_id - AND a.ingested_at < b.ingested_at; - - ALTER TABLE {_SCHEMA}.search_results - ADD UNIQUE (global_id); - END IF; -END $$; -""" - -_MIGRATE_DETAILS_CONSTRAINT = f""" -DO $$ -BEGIN - IF NOT EXISTS ( - SELECT 1 FROM pg_constraint - WHERE conrelid = '{_SCHEMA}.listing_details'::regclass - AND contype = 'u' - ) THEN - DELETE FROM {_SCHEMA}.listing_details a - USING {_SCHEMA}.listing_details b - WHERE a.global_id = b.global_id - AND a.status IS NOT DISTINCT FROM b.status - AND a.ingested_at < b.ingested_at; - - ALTER TABLE {_SCHEMA}.listing_details - ADD UNIQUE (global_id, status); - END IF; -END $$; -""" - -_MIGRATE_PRICE_HISTORY_CONSTRAINT = f""" -DO $$ -BEGIN - IF NOT EXISTS ( - SELECT 1 FROM pg_constraint - WHERE conrelid = '{_SCHEMA}.price_history'::regclass - AND contype = 'u' - ) THEN - DELETE FROM {_SCHEMA}.price_history a - USING {_SCHEMA}.price_history b - WHERE a.global_id = b.global_id - AND a.date IS NOT DISTINCT FROM b.date - AND a.source IS NOT DISTINCT FROM b.source - AND a.status IS NOT DISTINCT FROM b.status - AND a.ingested_at < b.ingested_at; - - ALTER TABLE {_SCHEMA}.price_history - ADD UNIQUE (global_id, date, source, status); - END IF; -END $$; -""" - - @asset( group_name="funda", kinds={"python", "postgres"}, @@ -254,8 +110,16 @@ def funda_search_results( engine = postgres.get_engine() with engine.begin() as conn: conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {_SCHEMA}")) - conn.execute(text(_DDL_SEARCH)) - conn.execute(text(_MIGRATE_SEARCH_CONSTRAINT)) + conn.execute( + text(render_sql(_SQL_DIR, "ddl/create_search_results.sql", schema=_SCHEMA)) + ) + conn.execute( + text( + render_sql( + _SQL_DIR, "ddl/migrate_search_constraint.sql", schema=_SCHEMA + ) + ) + ) rows = [] for listing in all_listings: @@ -284,39 +148,9 @@ def funda_search_results( } ) - insert_sql = f""" - INSERT INTO {_SCHEMA}.search_results - (global_id, title, city, postcode, province, neighbourhood, - price, living_area, plot_area, bedrooms, rooms, energy_label, - object_type, offering_type, construction_type, publish_date, - broker_id, broker_name, raw_json) - VALUES - (:global_id, :title, :city, :postcode, :province, :neighbourhood, - :price, :living_area, :plot_area, :bedrooms, :rooms, :energy_label, - :object_type, :offering_type, :construction_type, :publish_date, - :broker_id, :broker_name, :raw_json) - ON CONFLICT (global_id) DO UPDATE SET - title = EXCLUDED.title, - city = EXCLUDED.city, - postcode = EXCLUDED.postcode, - province = EXCLUDED.province, - neighbourhood = EXCLUDED.neighbourhood, - price = EXCLUDED.price, - living_area = EXCLUDED.living_area, - plot_area = EXCLUDED.plot_area, - bedrooms = EXCLUDED.bedrooms, - rooms = EXCLUDED.rooms, - energy_label = EXCLUDED.energy_label, - object_type = EXCLUDED.object_type, - offering_type = EXCLUDED.offering_type, - construction_type = EXCLUDED.construction_type, - publish_date = EXCLUDED.publish_date, - broker_id = EXCLUDED.broker_id, - broker_name = EXCLUDED.broker_name, - raw_json = EXCLUDED.raw_json, - ingested_at = now() - """ - postgres.execute_many(insert_sql, rows) + postgres.execute_many( + render_sql(_SQL_DIR, "dml/insert_search_results.sql", schema=_SCHEMA), rows + ) context.log.info( f"Inserted {len(rows)} search results into {_SCHEMA}.search_results" @@ -361,8 +195,16 @@ def funda_listing_details( with engine.begin() as conn: conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {_SCHEMA}")) - conn.execute(text(_DDL_DETAILS)) - conn.execute(text(_MIGRATE_DETAILS_CONSTRAINT)) + conn.execute( + text(render_sql(_SQL_DIR, "ddl/create_listing_details.sql", schema=_SCHEMA)) + ) + conn.execute( + text( + render_sql( + _SQL_DIR, "ddl/migrate_details_constraint.sql", schema=_SCHEMA + ) + ) + ) with engine.connect() as conn: result = conn.execute( @@ -432,66 +274,9 @@ def funda_listing_details( context.log.info(f" fetched {i + 1}/{len(ids)} …") if rows: - insert_sql = f""" - INSERT INTO {_SCHEMA}.listing_details - (global_id, tiny_id, title, city, postcode, province, - neighbourhood, municipality, price, price_formatted, - status, offering_type, object_type, house_type, - construction_type, construction_year, energy_label, - living_area, plot_area, bedrooms, rooms, description, - publication_date, latitude, longitude, - has_garden, has_balcony, has_solar_panels, has_heat_pump, - has_roof_terrace, is_energy_efficient, is_monument, - url, photo_count, views, saves, raw_json) - VALUES - (:global_id, :tiny_id, :title, :city, :postcode, :province, - :neighbourhood, :municipality, :price, :price_formatted, - :status, :offering_type, :object_type, :house_type, - :construction_type, :construction_year, :energy_label, - :living_area, :plot_area, :bedrooms, :rooms, :description, - :publication_date, :latitude, :longitude, - :has_garden, :has_balcony, :has_solar_panels, :has_heat_pump, - :has_roof_terrace, :is_energy_efficient, :is_monument, - :url, :photo_count, :views, :saves, :raw_json) - ON CONFLICT (global_id, status) DO UPDATE SET - tiny_id = EXCLUDED.tiny_id, - title = EXCLUDED.title, - city = EXCLUDED.city, - postcode = EXCLUDED.postcode, - province = EXCLUDED.province, - neighbourhood = EXCLUDED.neighbourhood, - municipality = EXCLUDED.municipality, - price = EXCLUDED.price, - price_formatted = EXCLUDED.price_formatted, - offering_type = EXCLUDED.offering_type, - object_type = EXCLUDED.object_type, - house_type = EXCLUDED.house_type, - construction_type = EXCLUDED.construction_type, - construction_year = EXCLUDED.construction_year, - energy_label = EXCLUDED.energy_label, - living_area = EXCLUDED.living_area, - plot_area = EXCLUDED.plot_area, - bedrooms = EXCLUDED.bedrooms, - rooms = EXCLUDED.rooms, - description = EXCLUDED.description, - publication_date = EXCLUDED.publication_date, - latitude = EXCLUDED.latitude, - longitude = EXCLUDED.longitude, - has_garden = EXCLUDED.has_garden, - has_balcony = EXCLUDED.has_balcony, - has_solar_panels = EXCLUDED.has_solar_panels, - has_heat_pump = EXCLUDED.has_heat_pump, - has_roof_terrace = EXCLUDED.has_roof_terrace, - is_energy_efficient = EXCLUDED.is_energy_efficient, - is_monument = EXCLUDED.is_monument, - url = EXCLUDED.url, - photo_count = EXCLUDED.photo_count, - views = EXCLUDED.views, - saves = EXCLUDED.saves, - raw_json = EXCLUDED.raw_json, - ingested_at = now() - """ - postgres.execute_many(insert_sql, rows) + postgres.execute_many( + render_sql(_SQL_DIR, "dml/insert_listing_details.sql", schema=_SCHEMA), rows + ) context.log.info( f"Inserted {len(rows)} listing details ({errors} errors) into {_SCHEMA}.listing_details" @@ -535,8 +320,16 @@ def funda_price_history( with engine.begin() as conn: conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {_SCHEMA}")) - conn.execute(text(_DDL_PRICE_HISTORY)) - conn.execute(text(_MIGRATE_PRICE_HISTORY_CONSTRAINT)) + conn.execute( + text(render_sql(_SQL_DIR, "ddl/create_price_history.sql", schema=_SCHEMA)) + ) + conn.execute( + text( + render_sql( + _SQL_DIR, "ddl/migrate_price_history_constraint.sql", schema=_SCHEMA + ) + ) + ) with engine.connect() as conn: result = conn.execute( @@ -579,18 +372,9 @@ def funda_price_history( context.log.info(f" fetched {i + 1}/{len(ids)} …") if rows: - insert_sql = f""" - INSERT INTO {_SCHEMA}.price_history - (global_id, price, human_price, date, timestamp, source, status) - VALUES - (:global_id, :price, :human_price, :date, :timestamp, :source, :status) - ON CONFLICT (global_id, date, source, status) DO UPDATE SET - price = EXCLUDED.price, - human_price = EXCLUDED.human_price, - timestamp = EXCLUDED.timestamp, - ingested_at = now() - """ - postgres.execute_many(insert_sql, rows) + postgres.execute_many( + render_sql(_SQL_DIR, "dml/insert_price_history.sql", schema=_SCHEMA), rows + ) context.log.info( f"Inserted {len(rows)} price history records ({errors} errors) into {_SCHEMA}.price_history" diff --git a/data_platform/assets/ingestion/funda/sql/ddl/create_listing_details.sql b/data_platform/assets/ingestion/funda/sql/ddl/create_listing_details.sql new file mode 100644 index 0000000..b9a21f4 --- /dev/null +++ b/data_platform/assets/ingestion/funda/sql/ddl/create_listing_details.sql @@ -0,0 +1,41 @@ +CREATE TABLE IF NOT EXISTS {{ schema }}.listing_details ( + global_id TEXT, + tiny_id TEXT, + title TEXT, + city TEXT, + postcode TEXT, + province TEXT, + neighbourhood TEXT, + municipality TEXT, + price BIGINT, + price_formatted TEXT, + status TEXT, + offering_type TEXT, + object_type TEXT, + house_type TEXT, + construction_type TEXT, + construction_year TEXT, + energy_label TEXT, + living_area INT, + plot_area INT, + bedrooms INT, + rooms INT, + description TEXT, + publication_date TEXT, + latitude DOUBLE PRECISION, + longitude DOUBLE PRECISION, + has_garden BOOLEAN, + has_balcony BOOLEAN, + has_solar_panels BOOLEAN, + has_heat_pump BOOLEAN, + has_roof_terrace BOOLEAN, + is_energy_efficient BOOLEAN, + is_monument BOOLEAN, + url TEXT, + photo_count INT, + views INT, + saves INT, + raw_json JSONB, + ingested_at TIMESTAMPTZ DEFAULT now(), + UNIQUE (global_id, status) +); diff --git a/data_platform/assets/ingestion/funda/sql/ddl/create_price_history.sql b/data_platform/assets/ingestion/funda/sql/ddl/create_price_history.sql new file mode 100644 index 0000000..3332d0e --- /dev/null +++ b/data_platform/assets/ingestion/funda/sql/ddl/create_price_history.sql @@ -0,0 +1,11 @@ +CREATE TABLE IF NOT EXISTS {{ schema }}.price_history ( + global_id TEXT, + price BIGINT, + human_price TEXT, + date TEXT, + timestamp TEXT, + source TEXT, + status TEXT, + ingested_at TIMESTAMPTZ DEFAULT now(), + UNIQUE (global_id, date, source, status) +); diff --git a/data_platform/assets/ingestion/funda/sql/ddl/create_search_results.sql b/data_platform/assets/ingestion/funda/sql/ddl/create_search_results.sql new file mode 100644 index 0000000..60b932f --- /dev/null +++ b/data_platform/assets/ingestion/funda/sql/ddl/create_search_results.sql @@ -0,0 +1,23 @@ +CREATE TABLE IF NOT EXISTS {{ schema }}.search_results ( + global_id TEXT, + title TEXT, + city TEXT, + postcode TEXT, + province TEXT, + neighbourhood TEXT, + price BIGINT, + living_area INT, + plot_area INT, + bedrooms INT, + rooms INT, + energy_label TEXT, + object_type TEXT, + offering_type TEXT, + construction_type TEXT, + publish_date TEXT, + broker_id TEXT, + broker_name TEXT, + raw_json JSONB, + ingested_at TIMESTAMPTZ DEFAULT now(), + UNIQUE (global_id) +); diff --git a/data_platform/assets/ingestion/funda/sql/ddl/migrate_details_constraint.sql b/data_platform/assets/ingestion/funda/sql/ddl/migrate_details_constraint.sql new file mode 100644 index 0000000..1263152 --- /dev/null +++ b/data_platform/assets/ingestion/funda/sql/ddl/migrate_details_constraint.sql @@ -0,0 +1,18 @@ +-- Deduplicate and add UNIQUE constraint to listing_details if it doesn't exist yet. +DO $$ +BEGIN + IF NOT EXISTS ( + SELECT 1 FROM pg_constraint + WHERE conrelid = '{{ schema }}.listing_details'::regclass + AND contype = 'u' + ) THEN + DELETE FROM {{ schema }}.listing_details a + USING {{ schema }}.listing_details b + WHERE a.global_id = b.global_id + AND a.status IS NOT DISTINCT FROM b.status + AND a.ingested_at < b.ingested_at; + + ALTER TABLE {{ schema }}.listing_details + ADD UNIQUE (global_id, status); + END IF; +END $$; diff --git a/data_platform/assets/ingestion/funda/sql/ddl/migrate_price_history_constraint.sql b/data_platform/assets/ingestion/funda/sql/ddl/migrate_price_history_constraint.sql new file mode 100644 index 0000000..8294b71 --- /dev/null +++ b/data_platform/assets/ingestion/funda/sql/ddl/migrate_price_history_constraint.sql @@ -0,0 +1,20 @@ +-- Deduplicate and add UNIQUE constraint to price_history if it doesn't exist yet. +DO $$ +BEGIN + IF NOT EXISTS ( + SELECT 1 FROM pg_constraint + WHERE conrelid = '{{ schema }}.price_history'::regclass + AND contype = 'u' + ) THEN + DELETE FROM {{ schema }}.price_history a + USING {{ schema }}.price_history b + WHERE a.global_id = b.global_id + AND a.date IS NOT DISTINCT FROM b.date + AND a.source IS NOT DISTINCT FROM b.source + AND a.status IS NOT DISTINCT FROM b.status + AND a.ingested_at < b.ingested_at; + + ALTER TABLE {{ schema }}.price_history + ADD UNIQUE (global_id, date, source, status); + END IF; +END $$; diff --git a/data_platform/assets/ingestion/funda/sql/ddl/migrate_search_constraint.sql b/data_platform/assets/ingestion/funda/sql/ddl/migrate_search_constraint.sql new file mode 100644 index 0000000..0cfc840 --- /dev/null +++ b/data_platform/assets/ingestion/funda/sql/ddl/migrate_search_constraint.sql @@ -0,0 +1,17 @@ +-- Deduplicate and add UNIQUE constraint to search_results if it doesn't exist yet. +DO $$ +BEGIN + IF NOT EXISTS ( + SELECT 1 FROM pg_constraint + WHERE conrelid = '{{ schema }}.search_results'::regclass + AND contype = 'u' + ) THEN + DELETE FROM {{ schema }}.search_results a + USING {{ schema }}.search_results b + WHERE a.global_id = b.global_id + AND a.ingested_at < b.ingested_at; + + ALTER TABLE {{ schema }}.search_results + ADD UNIQUE (global_id); + END IF; +END $$; diff --git a/data_platform/assets/ingestion/funda/sql/dml/.sqlfluff b/data_platform/assets/ingestion/funda/sql/dml/.sqlfluff new file mode 100644 index 0000000..30f96c1 --- /dev/null +++ b/data_platform/assets/ingestion/funda/sql/dml/.sqlfluff @@ -0,0 +1,4 @@ +[sqlfluff] +# SQLAlchemy :named_param bind parameters are misinterpreted by sqlfluff +# as cast operators, causing false LT01 spacing violations. Disable here. +exclude_rules = LT01 diff --git a/data_platform/assets/ingestion/funda/sql/dml/insert_listing_details.sql b/data_platform/assets/ingestion/funda/sql/dml/insert_listing_details.sql new file mode 100644 index 0000000..31e958d --- /dev/null +++ b/data_platform/assets/ingestion/funda/sql/dml/insert_listing_details.sql @@ -0,0 +1,59 @@ +INSERT INTO {{ schema }}.listing_details ( + global_id, tiny_id, title, city, postcode, province, + neighbourhood, municipality, price, price_formatted, + status, offering_type, object_type, house_type, + construction_type, construction_year, energy_label, + living_area, plot_area, bedrooms, rooms, description, + publication_date, latitude, longitude, + has_garden, has_balcony, has_solar_panels, has_heat_pump, + has_roof_terrace, is_energy_efficient, is_monument, + url, photo_count, views, saves, raw_json +) +VALUES ( + :global_id, :tiny_id, :title, :city, :postcode, :province, + :neighbourhood, :municipality, :price, :price_formatted, + :status, :offering_type, :object_type, :house_type, + :construction_type, :construction_year, :energy_label, + :living_area, :plot_area, :bedrooms, :rooms, :description, + :publication_date, :latitude, :longitude, + :has_garden, :has_balcony, :has_solar_panels, :has_heat_pump, + :has_roof_terrace, :is_energy_efficient, :is_monument, + :url, :photo_count, :views, :saves, :raw_json +) +ON CONFLICT (global_id, status) DO UPDATE SET + tiny_id = excluded.tiny_id, + title = excluded.title, + city = excluded.city, + postcode = excluded.postcode, + province = excluded.province, + neighbourhood = excluded.neighbourhood, + municipality = excluded.municipality, + price = excluded.price, + price_formatted = excluded.price_formatted, + offering_type = excluded.offering_type, + object_type = excluded.object_type, + house_type = excluded.house_type, + construction_type = excluded.construction_type, + construction_year = excluded.construction_year, + energy_label = excluded.energy_label, + living_area = excluded.living_area, + plot_area = excluded.plot_area, + bedrooms = excluded.bedrooms, + rooms = excluded.rooms, + description = excluded.description, + publication_date = excluded.publication_date, + latitude = excluded.latitude, + longitude = excluded.longitude, + has_garden = excluded.has_garden, + has_balcony = excluded.has_balcony, + has_solar_panels = excluded.has_solar_panels, + has_heat_pump = excluded.has_heat_pump, + has_roof_terrace = excluded.has_roof_terrace, + is_energy_efficient = excluded.is_energy_efficient, + is_monument = excluded.is_monument, + url = excluded.url, + photo_count = excluded.photo_count, + views = excluded.views, + saves = excluded.saves, + raw_json = excluded.raw_json, + ingested_at = now() diff --git a/data_platform/assets/ingestion/funda/sql/dml/insert_price_history.sql b/data_platform/assets/ingestion/funda/sql/dml/insert_price_history.sql new file mode 100644 index 0000000..f324647 --- /dev/null +++ b/data_platform/assets/ingestion/funda/sql/dml/insert_price_history.sql @@ -0,0 +1,11 @@ +INSERT INTO {{ schema }}.price_history ( + global_id, price, human_price, date, timestamp, source, status +) +VALUES ( + :global_id, :price, :human_price, :date, :timestamp, :source, :status +) +ON CONFLICT (global_id, date, source, status) DO UPDATE SET + price = excluded.price, + human_price = excluded.human_price, + timestamp = excluded.timestamp, + ingested_at = now() diff --git a/data_platform/assets/ingestion/funda/sql/dml/insert_search_results.sql b/data_platform/assets/ingestion/funda/sql/dml/insert_search_results.sql new file mode 100644 index 0000000..4f350f1 --- /dev/null +++ b/data_platform/assets/ingestion/funda/sql/dml/insert_search_results.sql @@ -0,0 +1,32 @@ +INSERT INTO {{ schema }}.search_results ( + global_id, title, city, postcode, province, neighbourhood, + price, living_area, plot_area, bedrooms, rooms, energy_label, + object_type, offering_type, construction_type, publish_date, + broker_id, broker_name, raw_json +) +VALUES ( + :global_id, :title, :city, :postcode, :province, :neighbourhood, + :price, :living_area, :plot_area, :bedrooms, :rooms, :energy_label, + :object_type, :offering_type, :construction_type, :publish_date, + :broker_id, :broker_name, :raw_json +) +ON CONFLICT (global_id) DO UPDATE SET + title = excluded.title, + city = excluded.city, + postcode = excluded.postcode, + province = excluded.province, + neighbourhood = excluded.neighbourhood, + price = excluded.price, + living_area = excluded.living_area, + plot_area = excluded.plot_area, + bedrooms = excluded.bedrooms, + rooms = excluded.rooms, + energy_label = excluded.energy_label, + object_type = excluded.object_type, + offering_type = excluded.offering_type, + construction_type = excluded.construction_type, + publish_date = excluded.publish_date, + broker_id = excluded.broker_id, + broker_name = excluded.broker_name, + raw_json = excluded.raw_json, + ingested_at = now() diff --git a/data_platform/definitions.py b/data_platform/definitions.py index cf3a474..380bfef 100644 --- a/data_platform/definitions.py +++ b/data_platform/definitions.py @@ -2,7 +2,7 @@ from dagster import Definitions from dagster_dbt import DbtCliResource from data_platform.assets.dbt import DBT_PROJECT_DIR, dbt_project_assets -from data_platform.assets.funda import ( +from data_platform.assets.ingestion.funda import ( funda_listing_details, funda_price_history, funda_search_results, diff --git a/data_platform/helpers/__init__.py b/data_platform/helpers/__init__.py index 31978cc..b631d03 100644 --- a/data_platform/helpers/__init__.py +++ b/data_platform/helpers/__init__.py @@ -1,6 +1,15 @@ """Shared helper utilities.""" import json +from pathlib import Path + +from jinja2 import Environment, FileSystemLoader + + +def render_sql(sql_dir: Path, path: str, **kwargs: object) -> str: + """Load and render a Jinja2 SQL template relative to sql_dir.""" + env = Environment(loader=FileSystemLoader(str(sql_dir)), autoescape=False) + return env.get_template(path).render(**kwargs) def safe(val): diff --git a/data_platform/schedules.py b/data_platform/schedules.py index 61fd0b1..210e1a6 100644 --- a/data_platform/schedules.py +++ b/data_platform/schedules.py @@ -8,7 +8,7 @@ from dagster import ( define_asset_job, ) -from data_platform.assets.funda import ( +from data_platform.assets.ingestion.funda.funda import ( FundaDetailsConfig, FundaPriceHistoryConfig, FundaSearchConfig, diff --git a/pyproject.toml b/pyproject.toml index 7f70114..e7aeafc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -10,6 +10,7 @@ dependencies = [ "dbt-core", "dbt-postgres", "pyfunda", + "jinja2", ] [build-system] diff --git a/tests/test_assets_funda.py b/tests/test_assets_funda.py index f063cf3..acfa13c 100644 --- a/tests/test_assets_funda.py +++ b/tests/test_assets_funda.py @@ -4,12 +4,12 @@ from unittest.mock import MagicMock from dagster import materialize -from data_platform.assets.funda import ( - FundaSearchConfig, +from data_platform.assets.ingestion.funda import ( funda_listing_details, funda_price_history, funda_search_results, ) +from data_platform.assets.ingestion.funda.funda import FundaSearchConfig from tests.conftest import make_mock_engine, make_mock_listing diff --git a/uv.lock b/uv.lock index 43395a3..a1bdfb0 100644 --- a/uv.lock +++ b/uv.lock @@ -505,6 +505,7 @@ dependencies = [ { name = "dagster-webserver" }, { name = "dbt-core" }, { name = "dbt-postgres" }, + { name = "jinja2" }, { name = "pyfunda" }, ] @@ -524,6 +525,7 @@ requires-dist = [ { name = "dagster-webserver" }, { name = "dbt-core" }, { name = "dbt-postgres" }, + { name = "jinja2" }, { name = "pyfunda" }, ]