diff --git a/.sqlfluff b/.sqlfluff index a30b678..be470c9 100644 --- a/.sqlfluff +++ b/.sqlfluff @@ -15,6 +15,7 @@ ref = "{% macro ref(model_name) %}{{ model_name }}{% endmacro %}" [sqlfluff:templater:jinja:context] schema = raw_funda +staleness_days = 7 [sqlfluff:indentation] indent_unit = space diff --git a/data_platform/assets/ingestion/funda/funda.py b/data_platform/assets/ingestion/funda/funda.py index 841ad45..7850b17 100644 --- a/data_platform/assets/ingestion/funda/funda.py +++ b/data_platform/assets/ingestion/funda/funda.py @@ -55,6 +55,7 @@ class FundaPriceHistoryConfig(Config): """Config for price history fetch.""" fetch_all: bool = False + staleness_days: int = 7 @asset( @@ -371,6 +372,7 @@ def raw_funda_price_history( _SQL_DIR, "dml/select_new_price_history_listings.sql", schema=_SCHEMA, + staleness_days=config.staleness_days, ) ) result = conn.execute(query) diff --git a/data_platform/assets/ingestion/funda/sql/dml/select_new_price_history_listings.sql b/data_platform/assets/ingestion/funda/sql/dml/select_new_price_history_listings.sql index 275a823..8229d59 100644 --- a/data_platform/assets/ingestion/funda/sql/dml/select_new_price_history_listings.sql +++ b/data_platform/assets/ingestion/funda/sql/dml/select_new_price_history_listings.sql @@ -1,17 +1,24 @@ +with last_price_history as ( + select + global_id, + max(ingested_at) as last_ingested + from {{ schema }}.price_history + group by global_id +) + select distinct d.global_id, d.url, d.title, d.postcode from {{ schema }}.listing_details as d -inner join {{ schema }}.search_results as s on d.global_id = s.global_id -where s.is_active = true -union -select distinct - d.global_id, - d.url, - d.title, - d.postcode -from {{ schema }}.listing_details as d -left join {{ schema }}.price_history as p on d.global_id = p.global_id -where p.global_id is null +inner join {{ schema }}.search_results as s + on d.global_id = s.global_id +left join last_price_history as ph + on d.global_id = ph.global_id +where + s.is_active = true + and ( + ph.last_ingested is null + or ph.last_ingested < now() - interval '{{ staleness_days }} days' + )