diff --git a/data_platform/assets/ingestion/funda/funda.py b/data_platform/assets/ingestion/funda/funda.py index 14b8e2b..c57009c 100644 --- a/data_platform/assets/ingestion/funda/funda.py +++ b/data_platform/assets/ingestion/funda/funda.py @@ -46,13 +46,13 @@ class FundaSearchConfig(Config): class FundaDetailsConfig(Config): """Config for listing details fetch.""" - fetch_all: bool = True + fetch_all: bool = False class FundaPriceHistoryConfig(Config): """Config for price history fetch.""" - fetch_all: bool = True + fetch_all: bool = False @asset( @@ -152,13 +152,28 @@ def funda_search_results( render_sql(_SQL_DIR, "dml/insert_search_results.sql", schema=_SCHEMA), rows ) + # Mark listings not seen in the last 7 days as inactive. + engine = postgres.get_engine() + with engine.begin() as conn: + result = conn.execute( + text( + f"UPDATE {_SCHEMA}.search_results" + f" SET is_active = FALSE" + f" WHERE last_seen_at < now() - INTERVAL '7 days'" + f" RETURNING global_id" + ) + ) + newly_inactive = result.rowcount + context.log.info( f"Inserted {len(rows)} search results into {_SCHEMA}.search_results" + f" ({newly_inactive} listings marked inactive)" ) return MaterializeResult( metadata={ "count": len(rows), + "newly_inactive": newly_inactive, "location": MetadataValue.text(config.location), "offering_type": MetadataValue.text(config.offering_type), "preview": MetadataValue.md( @@ -207,9 +222,19 @@ def funda_listing_details( ) with engine.connect() as conn: - result = conn.execute( - text(f"SELECT DISTINCT global_id FROM {_SCHEMA}.search_results") - ) + if config.fetch_all: + query = text(f"SELECT DISTINCT global_id FROM {_SCHEMA}.search_results") + else: + query = text( + f""" + SELECT DISTINCT s.global_id + FROM {_SCHEMA}.search_results s + LEFT JOIN {_SCHEMA}.listing_details d ON s.global_id = d.global_id + WHERE s.is_active = TRUE + AND (d.global_id IS NULL OR d.is_stale = TRUE) + """ + ) + result = conn.execute(query) ids = [row[0] for row in result if row[0]] if not ids: @@ -278,6 +303,21 @@ def funda_listing_details( render_sql(_SQL_DIR, "dml/insert_listing_details.sql", schema=_SCHEMA), rows ) + # Mark details as stale where the parent search listing is no longer active. + with engine.begin() as conn: + conn.execute( + text( + f""" + UPDATE {_SCHEMA}.listing_details d + SET is_stale = TRUE + FROM {_SCHEMA}.search_results s + WHERE d.global_id = s.global_id + AND s.is_active = FALSE + AND d.is_stale = FALSE + """ + ) + ) + context.log.info( f"Inserted {len(rows)} listing details ({errors} errors) into {_SCHEMA}.listing_details" ) @@ -332,9 +372,23 @@ def funda_price_history( ) with engine.connect() as conn: - result = conn.execute( - text(f"SELECT DISTINCT global_id FROM {_SCHEMA}.listing_details") - ) + if config.fetch_all: + query = text(f"SELECT DISTINCT global_id FROM {_SCHEMA}.listing_details") + else: + query = text( + f""" + SELECT DISTINCT d.global_id + FROM {_SCHEMA}.listing_details d + JOIN {_SCHEMA}.search_results s ON d.global_id = s.global_id + WHERE s.is_active = TRUE + UNION + SELECT DISTINCT d.global_id + FROM {_SCHEMA}.listing_details d + LEFT JOIN {_SCHEMA}.price_history p ON d.global_id = p.global_id + WHERE p.global_id IS NULL + """ + ) + result = conn.execute(query) ids = [row[0] for row in result if row[0]] if not ids: diff --git a/data_platform/assets/ingestion/funda/sql/ddl/create_listing_details.sql b/data_platform/assets/ingestion/funda/sql/ddl/create_listing_details.sql index b9a21f4..59a7a7c 100644 --- a/data_platform/assets/ingestion/funda/sql/ddl/create_listing_details.sql +++ b/data_platform/assets/ingestion/funda/sql/ddl/create_listing_details.sql @@ -37,5 +37,7 @@ CREATE TABLE IF NOT EXISTS {{ schema }}.listing_details ( saves INT, raw_json JSONB, ingested_at TIMESTAMPTZ DEFAULT now(), + last_fetched_at TIMESTAMPTZ DEFAULT now(), + is_stale BOOLEAN DEFAULT FALSE, UNIQUE (global_id, status) ); diff --git a/data_platform/assets/ingestion/funda/sql/ddl/create_search_results.sql b/data_platform/assets/ingestion/funda/sql/ddl/create_search_results.sql index 60b932f..cca03f8 100644 --- a/data_platform/assets/ingestion/funda/sql/ddl/create_search_results.sql +++ b/data_platform/assets/ingestion/funda/sql/ddl/create_search_results.sql @@ -19,5 +19,7 @@ CREATE TABLE IF NOT EXISTS {{ schema }}.search_results ( broker_name TEXT, raw_json JSONB, ingested_at TIMESTAMPTZ DEFAULT now(), + last_seen_at TIMESTAMPTZ DEFAULT now(), + is_active BOOLEAN DEFAULT TRUE, UNIQUE (global_id) ); diff --git a/data_platform/assets/ingestion/funda/sql/dml/insert_listing_details.sql b/data_platform/assets/ingestion/funda/sql/dml/insert_listing_details.sql index 31e958d..434600d 100644 --- a/data_platform/assets/ingestion/funda/sql/dml/insert_listing_details.sql +++ b/data_platform/assets/ingestion/funda/sql/dml/insert_listing_details.sql @@ -56,4 +56,6 @@ ON CONFLICT (global_id, status) DO UPDATE SET views = excluded.views, saves = excluded.saves, raw_json = excluded.raw_json, - ingested_at = now() + ingested_at = now(), + last_fetched_at = now(), + is_stale = FALSE diff --git a/data_platform/assets/ingestion/funda/sql/dml/insert_search_results.sql b/data_platform/assets/ingestion/funda/sql/dml/insert_search_results.sql index 4f350f1..87fd786 100644 --- a/data_platform/assets/ingestion/funda/sql/dml/insert_search_results.sql +++ b/data_platform/assets/ingestion/funda/sql/dml/insert_search_results.sql @@ -29,4 +29,6 @@ ON CONFLICT (global_id) DO UPDATE SET broker_id = excluded.broker_id, broker_name = excluded.broker_name, raw_json = excluded.raw_json, - ingested_at = now() + ingested_at = now(), + last_seen_at = now(), + is_active = TRUE