diff --git a/dagster_home/dagster.yaml b/dagster_home/dagster.yaml index 9e2ce04..4c8535b 100644 --- a/dagster_home/dagster.yaml +++ b/dagster_home/dagster.yaml @@ -4,3 +4,12 @@ storage: postgres: postgres_url: env: DAGSTER_POSTGRES_URL + pool_size: 5 + max_overflow: 5 + +# Limit concurrent runs to avoid overwhelming the VM and database. +concurrency: + default_op_concurrency_limit: 1 + +run_queue: + max_concurrent_runs: 1 diff --git a/data_platform/assets/ingestion/funda/funda.py b/data_platform/assets/ingestion/funda/funda.py index abf46ff..841ad45 100644 --- a/data_platform/assets/ingestion/funda/funda.py +++ b/data_platform/assets/ingestion/funda/funda.py @@ -1,6 +1,7 @@ """Funda real-estate ingestion assets.""" import json +import time from pathlib import Path from dagster import ( @@ -10,6 +11,7 @@ from dagster import ( MetadataValue, asset, ) +from funda import Listing from sqlalchemy import text from data_platform.helpers import ( @@ -94,14 +96,14 @@ def raw_funda_search_results( all_listings = [] for page in range(config.max_pages): - context.log.info(f"Fetching search page {page + 1}/{config.max_pages} …") + context.log.info(f"Fetching search page {page + 1}/{config.max_pages}...") kwargs["page"] = page results = client.search_listing(**kwargs) if not results: context.log.info("No more results.") break all_listings.extend(results) - context.log.info(f" got {len(results)} listings (total: {len(all_listings)})") + context.log.info(f"Got {len(results)} listings (total: {len(all_listings)}).") if not all_listings: context.log.warning("Search returned zero results.") @@ -109,7 +111,9 @@ def raw_funda_search_results( engine = postgres.get_engine() with engine.begin() as conn: - conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {_SCHEMA}")) + conn.execute( + text(render_sql(_SQL_DIR, "ddl/create_schema.sql", schema=_SCHEMA)) + ) conn.execute( text(render_sql(_SQL_DIR, "ddl/create_search_results.sql", schema=_SCHEMA)) ) @@ -150,10 +154,11 @@ def raw_funda_search_results( with engine.begin() as conn: result = conn.execute( text( - f"UPDATE {_SCHEMA}.search_results" - f" SET is_active = FALSE" - f" WHERE last_seen_at < now() - INTERVAL '7 days'" - f" RETURNING global_id" + render_sql( + _SQL_DIR, + "dml/mark_inactive_search_results.sql", + schema=_SCHEMA, + ) ) ) newly_inactive = result.rowcount @@ -202,23 +207,21 @@ def raw_funda_listing_details( engine = postgres.get_engine() with engine.begin() as conn: - conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {_SCHEMA}")) + conn.execute( + text(render_sql(_SQL_DIR, "ddl/create_schema.sql", schema=_SCHEMA)) + ) conn.execute( text(render_sql(_SQL_DIR, "ddl/create_listing_details.sql", schema=_SCHEMA)) ) with engine.connect() as conn: if config.fetch_all: - query = text(f"SELECT DISTINCT global_id FROM {_SCHEMA}.search_results") + query = text( + render_sql(_SQL_DIR, "dml/select_all_detail_ids.sql", schema=_SCHEMA) + ) else: query = text( - f""" - SELECT DISTINCT s.global_id - FROM {_SCHEMA}.search_results s - LEFT JOIN {_SCHEMA}.listing_details d ON s.global_id = d.global_id - WHERE s.is_active = TRUE - AND (d.global_id IS NULL OR d.is_stale = TRUE) - """ + render_sql(_SQL_DIR, "dml/select_new_detail_ids.sql", schema=_SCHEMA) ) result = conn.execute(query) ids = [row[0] for row in result if row[0]] @@ -227,7 +230,7 @@ def raw_funda_listing_details( context.log.warning("No search results found – run funda_search_results first.") return MaterializeResult(metadata={"count": 0}) - context.log.info(f"Fetching details for {len(ids)} listings …") + context.log.info(f"Fetching details for {len(ids)} listings...") rows = [] errors = 0 @@ -282,7 +285,9 @@ def raw_funda_listing_details( continue if (i + 1) % 10 == 0: - context.log.info(f" fetched {i + 1}/{len(ids)} …") + context.log.info(f"Fetched {i + 1}/{len(ids)} listings.") + + time.sleep(1) if rows: postgres.execute_many( @@ -293,14 +298,11 @@ def raw_funda_listing_details( with engine.begin() as conn: conn.execute( text( - f""" - UPDATE {_SCHEMA}.listing_details d - SET is_stale = TRUE - FROM {_SCHEMA}.search_results s - WHERE d.global_id = s.global_id - AND s.is_active = FALSE - AND d.is_stale = FALSE - """ + render_sql( + _SQL_DIR, + "dml/mark_stale_listing_details.sql", + schema=_SCHEMA, + ) ) ) @@ -345,78 +347,89 @@ def raw_funda_price_history( engine = postgres.get_engine() with engine.begin() as conn: - conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {_SCHEMA}")) + conn.execute( + text(render_sql(_SQL_DIR, "ddl/create_schema.sql", schema=_SCHEMA)) + ) conn.execute( text(render_sql(_SQL_DIR, "ddl/create_price_history.sql", schema=_SCHEMA)) ) + # Fetch listing metadata (url, title, postcode) from the DB so we can call + # get_price_history without re-fetching each listing from the Funda API. with engine.connect() as conn: if config.fetch_all: - query = text(f"SELECT DISTINCT global_id FROM {_SCHEMA}.listing_details") + query = text( + render_sql( + _SQL_DIR, + "dml/select_all_price_history_listings.sql", + schema=_SCHEMA, + ) + ) else: query = text( - f""" - SELECT DISTINCT d.global_id - FROM {_SCHEMA}.listing_details d - JOIN {_SCHEMA}.search_results s ON d.global_id = s.global_id - WHERE s.is_active = TRUE - UNION - SELECT DISTINCT d.global_id - FROM {_SCHEMA}.listing_details d - LEFT JOIN {_SCHEMA}.price_history p ON d.global_id = p.global_id - WHERE p.global_id IS NULL - """ + render_sql( + _SQL_DIR, + "dml/select_new_price_history_listings.sql", + schema=_SCHEMA, + ) ) result = conn.execute(query) - ids = [row[0] for row in result if row[0]] + listings = [(row[0], row[1], row[2], row[3]) for row in result if row[0]] - if not ids: + if not listings: context.log.warning( "No listing details found – run funda_listing_details first." ) return MaterializeResult(metadata={"count": 0}) - context.log.info(f"Fetching price history for {len(ids)} listings …") + context.log.info(f"Fetching price history for {len(listings)} listings...") - rows = [] + batch_size = 25 + total_rows = 0 errors = 0 - for i, gid in enumerate(ids): + for i, (gid, url, title, postcode) in enumerate(listings): try: - listing = client.get_listing(int(gid)) - history = client.get_price_history(listing) - for entry in history: - rows.append( - { - "global_id": gid, - "price": safe_int(entry.get("price")), - "human_price": entry.get("human_price"), - "date": entry.get("date"), - "timestamp": entry.get("timestamp"), - "source": entry.get("source"), - "status": entry.get("status"), - } + stub = Listing(data={"url": url, "title": title, "postcode": postcode}) + history = client.get_price_history(stub) + rows = [ + { + "global_id": gid, + "price": safe_int(entry.get("price")), + "human_price": entry.get("human_price"), + "date": entry.get("date"), + "timestamp": entry.get("timestamp"), + "source": entry.get("source"), + "status": entry.get("status"), + } + for entry in history + ] + if rows: + postgres.execute_many( + render_sql( + _SQL_DIR, "dml/insert_price_history.sql", schema=_SCHEMA + ), + rows, ) + total_rows += len(rows) except Exception as e: errors += 1 context.log.warning(f"Failed to fetch price history for {gid}: {e}") continue - if (i + 1) % 10 == 0: - context.log.info(f" fetched {i + 1}/{len(ids)} …") + if (i + 1) % batch_size == 0: + context.log.info(f"Fetched {i + 1}/{len(listings)} price histories.") - if rows: - postgres.execute_many( - render_sql(_SQL_DIR, "dml/insert_price_history.sql", schema=_SCHEMA), rows - ) + time.sleep(1) context.log.info( - f"Inserted {len(rows)} price history records ({errors} errors) into {_SCHEMA}.price_history" + f"Inserted {total_rows} price history records ({errors} errors) " + f"into {_SCHEMA}.price_history" ) return MaterializeResult( metadata={ - "count": len(rows), + "count": total_rows, "errors": errors, - "listings_processed": len(ids) - errors, + "listings_processed": len(listings) - errors, } ) diff --git a/data_platform/assets/ingestion/funda/sql/ddl/create_schema.sql b/data_platform/assets/ingestion/funda/sql/ddl/create_schema.sql new file mode 100644 index 0000000..4b3923d --- /dev/null +++ b/data_platform/assets/ingestion/funda/sql/ddl/create_schema.sql @@ -0,0 +1 @@ +create schema if not exists {{ schema }}; diff --git a/data_platform/assets/ingestion/funda/sql/dml/mark_inactive_search_results.sql b/data_platform/assets/ingestion/funda/sql/dml/mark_inactive_search_results.sql new file mode 100644 index 0000000..f8e88a2 --- /dev/null +++ b/data_platform/assets/ingestion/funda/sql/dml/mark_inactive_search_results.sql @@ -0,0 +1,4 @@ +update {{ schema }}.search_results +set is_active = false +where last_seen_at < now() - interval '7 days' +returning global_id diff --git a/data_platform/assets/ingestion/funda/sql/dml/mark_stale_listing_details.sql b/data_platform/assets/ingestion/funda/sql/dml/mark_stale_listing_details.sql new file mode 100644 index 0000000..4717fb3 --- /dev/null +++ b/data_platform/assets/ingestion/funda/sql/dml/mark_stale_listing_details.sql @@ -0,0 +1,7 @@ +update {{ schema }}.listing_details d +set is_stale = true +from {{ schema }}.search_results as s +where + d.global_id = s.global_id + and s.is_active = false + and d.is_stale = false diff --git a/data_platform/assets/ingestion/funda/sql/dml/select_all_detail_ids.sql b/data_platform/assets/ingestion/funda/sql/dml/select_all_detail_ids.sql new file mode 100644 index 0000000..280cbfa --- /dev/null +++ b/data_platform/assets/ingestion/funda/sql/dml/select_all_detail_ids.sql @@ -0,0 +1,2 @@ +select distinct global_id +from {{ schema }}.search_results diff --git a/data_platform/assets/ingestion/funda/sql/dml/select_all_price_history_listings.sql b/data_platform/assets/ingestion/funda/sql/dml/select_all_price_history_listings.sql new file mode 100644 index 0000000..5593de5 --- /dev/null +++ b/data_platform/assets/ingestion/funda/sql/dml/select_all_price_history_listings.sql @@ -0,0 +1,6 @@ +select distinct + d.global_id, + d.url, + d.title, + d.postcode +from {{ schema }}.listing_details as d diff --git a/data_platform/assets/ingestion/funda/sql/dml/select_new_detail_ids.sql b/data_platform/assets/ingestion/funda/sql/dml/select_new_detail_ids.sql new file mode 100644 index 0000000..bed5a8a --- /dev/null +++ b/data_platform/assets/ingestion/funda/sql/dml/select_new_detail_ids.sql @@ -0,0 +1,6 @@ +select distinct s.global_id +from {{ schema }}.search_results as s +left join {{ schema }}.listing_details as d on s.global_id = d.global_id +where + s.is_active = true + and (d.global_id is null or d.is_stale = true) diff --git a/data_platform/assets/ingestion/funda/sql/dml/select_new_price_history_listings.sql b/data_platform/assets/ingestion/funda/sql/dml/select_new_price_history_listings.sql new file mode 100644 index 0000000..275a823 --- /dev/null +++ b/data_platform/assets/ingestion/funda/sql/dml/select_new_price_history_listings.sql @@ -0,0 +1,17 @@ +select distinct + d.global_id, + d.url, + d.title, + d.postcode +from {{ schema }}.listing_details as d +inner join {{ schema }}.search_results as s on d.global_id = s.global_id +where s.is_active = true +union +select distinct + d.global_id, + d.url, + d.title, + d.postcode +from {{ schema }}.listing_details as d +left join {{ schema }}.price_history as p on d.global_id = p.global_id +where p.global_id is null diff --git a/data_platform/definitions.py b/data_platform/definitions.py index 6217e8c..ebd6e0e 100644 --- a/data_platform/definitions.py +++ b/data_platform/definitions.py @@ -1,4 +1,8 @@ -from dagster import Definitions +from dagster import ( + AutomationConditionSensorDefinition, + DefaultSensorStatus, + Definitions, +) from dagster_dbt import DbtCliResource from data_platform.assets.dbt import DBT_PROJECT_DIR, dbt_project_assets @@ -30,6 +34,13 @@ defs = Definitions( ] ), jobs=[funda_ingestion_job, funda_raw_quality_job, elementary_refresh_job], + sensors=[ + AutomationConditionSensorDefinition( + name="automation_condition_sensor", + target="*", + default_status=DefaultSensorStatus.RUNNING, + ), + ], schedules=[ funda_ingestion_schedule, funda_raw_quality_schedule, diff --git a/data_platform/resources/__init__.py b/data_platform/resources/__init__.py index bdc00b5..b3a07f4 100644 --- a/data_platform/resources/__init__.py +++ b/data_platform/resources/__init__.py @@ -3,6 +3,7 @@ from dagster import ConfigurableResource, EnvVar from funda import Funda from sqlalchemy import create_engine, text +from sqlalchemy.pool import NullPool class FundaResource(ConfigurableResource): @@ -25,7 +26,7 @@ class PostgresResource(ConfigurableResource): def get_engine(self): url = f"postgresql://{self.user}:{self.password}@{self.host}:{self.port}/{self.dbname}" - return create_engine(url) + return create_engine(url, poolclass=NullPool) def execute(self, statement: str, params: dict | None = None): engine = self.get_engine() diff --git a/dbt/models/staging/stg_funda_listings.yml b/dbt/models/staging/stg_funda_listings.yml index 803f1d0..5bf3a3a 100644 --- a/dbt/models/staging/stg_funda_listings.yml +++ b/dbt/models/staging/stg_funda_listings.yml @@ -89,7 +89,7 @@ models: data_type: text tests: - accepted_values: - values: ["A3", "A2", "A1", "A", "B", "C", "D", "E", "F", "G"] + values: ["A4", "A3", "A2", "A1", "A", "B", "C", "D", "E", "F", "G"] where: "energy_label is not null" - name: living_area description: Interior floor area in m². diff --git a/tests/test_assets_funda.py b/tests/test_assets_funda.py index 4254abc..a6c3015 100644 --- a/tests/test_assets_funda.py +++ b/tests/test_assets_funda.py @@ -260,9 +260,17 @@ class TestFundaPriceHistory: assert mat[0].metadata["count"].value == 0 def test_price_history_inserted(self): - engine, _, _ = make_mock_engine(select_rows=[("1234567",)]) + engine, _, _ = make_mock_engine( + select_rows=[ + ( + "1234567", + "https://www.funda.nl/detail/koop/amsterdam/app/87654321/", + "Teststraat 1", + "1234AB", + ), + ] + ) client = MagicMock() - client.get_listing.return_value = make_mock_listing(_DETAIL_LISTING_DATA) client.get_price_history.return_value = [ { "price": 350000,