diff --git a/data_platform/assets/ingestion/funda/__init__.py b/data_platform/assets/ingestion/funda/__init__.py index 3bd1c76..f48b49a 100644 --- a/data_platform/assets/ingestion/funda/__init__.py +++ b/data_platform/assets/ingestion/funda/__init__.py @@ -1,9 +1,13 @@ """Funda ingestion assets.""" from data_platform.assets.ingestion.funda.funda import ( - funda_listing_details, - funda_price_history, - funda_search_results, + raw_funda_listing_details, + raw_funda_price_history, + raw_funda_search_results, ) -__all__ = ["funda_listing_details", "funda_price_history", "funda_search_results"] +__all__ = [ + "raw_funda_listing_details", + "raw_funda_price_history", + "raw_funda_search_results", +] diff --git a/data_platform/assets/ingestion/funda/funda.py b/data_platform/assets/ingestion/funda/funda.py index a1df7ca..abf46ff 100644 --- a/data_platform/assets/ingestion/funda/funda.py +++ b/data_platform/assets/ingestion/funda/funda.py @@ -60,7 +60,7 @@ class FundaPriceHistoryConfig(Config): kinds={"python", "postgres"}, description="Search Funda listings and store results in Postgres.", ) -def funda_search_results( +def raw_funda_search_results( context: AssetExecutionContext, config: FundaSearchConfig, funda: FundaResource, @@ -189,10 +189,10 @@ def funda_search_results( @asset( group_name="funda", kinds={"python", "postgres"}, - deps=[funda_search_results], + deps=[raw_funda_search_results], description="Fetch full listing details for each search result and store in Postgres.", ) -def funda_listing_details( +def raw_funda_listing_details( context: AssetExecutionContext, config: FundaDetailsConfig, funda: FundaResource, @@ -332,10 +332,10 @@ def funda_listing_details( @asset( group_name="funda", kinds={"python", "postgres"}, - deps=[funda_listing_details], + deps=[raw_funda_listing_details], description="Fetch price history for each detailed listing and store in Postgres.", ) -def funda_price_history( +def raw_funda_price_history( context: AssetExecutionContext, config: FundaPriceHistoryConfig, funda: FundaResource, diff --git a/data_platform/definitions.py b/data_platform/definitions.py index de2e51d..6217e8c 100644 --- a/data_platform/definitions.py +++ b/data_platform/definitions.py @@ -3,9 +3,9 @@ from dagster_dbt import DbtCliResource from data_platform.assets.dbt import DBT_PROJECT_DIR, dbt_project_assets from data_platform.assets.ingestion.funda import ( - funda_listing_details, - funda_price_history, - funda_search_results, + raw_funda_listing_details, + raw_funda_price_history, + raw_funda_search_results, ) from data_platform.helpers import apply_automation from data_platform.jobs import ( @@ -24,9 +24,9 @@ defs = Definitions( assets=apply_automation( [ dbt_project_assets, - funda_search_results, - funda_listing_details, - funda_price_history, + raw_funda_search_results, + raw_funda_listing_details, + raw_funda_price_history, ] ), jobs=[funda_ingestion_job, funda_raw_quality_job, elementary_refresh_job], diff --git a/data_platform/jobs/funda.py b/data_platform/jobs/funda.py index 7fa2495..df30958 100644 --- a/data_platform/jobs/funda.py +++ b/data_platform/jobs/funda.py @@ -10,9 +10,9 @@ from data_platform.ops.check_source_freshness import ( funda_ingestion_job = define_asset_job( name="funda_ingestion", selection=AssetSelection.assets( - "funda_search_results", - "funda_listing_details", - "funda_price_history", + "raw_funda_search_results", + "raw_funda_listing_details", + "raw_funda_price_history", ), description="Full Funda ingestion pipeline.", ) diff --git a/data_platform/schedules/funda.py b/data_platform/schedules/funda.py index 94e31c0..c30b918 100644 --- a/data_platform/schedules/funda.py +++ b/data_platform/schedules/funda.py @@ -15,9 +15,9 @@ funda_ingestion_schedule = ScheduleDefinition( cron_schedule="0 */4 * * *", run_config=RunConfig( ops={ - "funda_search_results": FundaSearchConfig(), - "funda_listing_details": FundaDetailsConfig(), - "funda_price_history": FundaPriceHistoryConfig(), + "raw_funda_search_results": FundaSearchConfig(), + "raw_funda_listing_details": FundaDetailsConfig(), + "raw_funda_price_history": FundaPriceHistoryConfig(), } ), default_status=DefaultScheduleStatus.RUNNING, diff --git a/dbt/models/marts/funda_city_stats.sql b/dbt/models/marts/funda_city_stats.sql deleted file mode 100644 index cf8acf9..0000000 --- a/dbt/models/marts/funda_city_stats.sql +++ /dev/null @@ -1,29 +0,0 @@ --- Mart: per-city price statistics for available listings. - -with listings as ( - select * from {{ ref('funda_listings') }} - where not is_sold -), - -city_stats as ( - select - city, - province, - offering_type, - object_type, - count(*) as listing_count, - round(avg(current_price), 0) as avg_price, - min(current_price) as min_price, - max(current_price) as max_price, - percentile_cont(0.5) within group ( - order by current_price - ) as median_price, - round(avg(price_per_sqm), 0) as avg_price_per_sqm, - round(avg(living_area), 0) as avg_living_area, - round(avg(bedrooms), 1) as avg_bedrooms - from listings - where current_price is not null - group by city, province, offering_type, object_type -) - -select * from city_stats diff --git a/dbt/models/marts/funda_city_stats.yml b/dbt/models/marts/funda_city_stats.yml deleted file mode 100644 index cb07bbd..0000000 --- a/dbt/models/marts/funda_city_stats.yml +++ /dev/null @@ -1,64 +0,0 @@ -version: 2 - -models: - - name: funda_city_stats - description: > - Aggregated price statistics per city, province, offering type and object type. Only includes - currently available (not sold) listings. - config: - contract: - enforced: true - meta: - dagster: - group: funda - columns: - - name: city - description: City name. - data_type: text - constraints: - - type: not_null - tests: - - not_null - - name: province - description: Province name. - data_type: text - - name: offering_type - description: Buy or rent. - data_type: text - constraints: - - type: not_null - tests: - - not_null - - name: object_type - description: Property type. - data_type: text - - name: listing_count - description: Number of active listings in this group. - data_type: bigint - constraints: - - type: not_null - tests: - - not_null - - dbt_utils.expression_is_true: - expression: "> 0" - - name: avg_price - description: Average asking price. - data_type: numeric - - name: min_price - description: Lowest asking price in this group. - data_type: bigint - - name: max_price - description: Highest asking price in this group. - data_type: bigint - - name: median_price - description: Median asking price. - data_type: double precision - - name: avg_price_per_sqm - description: Average price per square metre. - data_type: numeric - - name: avg_living_area - description: Average living area in m². - data_type: numeric - - name: avg_bedrooms - description: Average number of bedrooms. - data_type: numeric diff --git a/dbt/models/marts/funda_listings.sql b/dbt/models/marts/funda_listings.sql index ef11b6d..2f4134d 100644 --- a/dbt/models/marts/funda_listings.sql +++ b/dbt/models/marts/funda_listings.sql @@ -1,8 +1,20 @@ -- Mart: analysis-ready Funda listings table. --- Selects the most useful fields and adds derived metrics. +-- Incrementally loads enriched listings, updating existing rows on re-ingestion. + +{{ + config( + materialized='incremental', + unique_key='global_id', + on_schema_change='fail' + ) +}} with enriched as ( - select * from {{ ref('int_funda_listings_enriched') }} + select * + from {{ ref('int_funda_listings_enriched') }} + {% if is_incremental() %} + where ingested_at > (select max(ingested_at) from {{ this }}) -- noqa: RF02 + {% endif %} ), final as ( diff --git a/dbt/models/staging/sources.yml b/dbt/models/staging/sources.yml index 8d696ff..97c6c5b 100644 --- a/dbt/models/staging/sources.yml +++ b/dbt/models/staging/sources.yml @@ -10,7 +10,7 @@ sources: description: Funda search results (broad overview of matching listings). meta: dagster: - asset_key: ["funda_search_results"] + asset_key: ["raw_funda_search_results"] loaded_at_field: last_seen_at freshness: warn_after: { count: 12, period: hour } @@ -51,7 +51,7 @@ sources: Full listing details fetched per search result (50+ fields). meta: dagster: - asset_key: ["funda_listing_details"] + asset_key: ["raw_funda_listing_details"] loaded_at_field: last_fetched_at freshness: warn_after: { count: 25, period: hour } @@ -96,7 +96,7 @@ sources: Historical price data per listing (asking prices, WOZ, sales). meta: dagster: - asset_key: ["funda_price_history"] + asset_key: ["raw_funda_price_history"] loaded_at_field: ingested_at freshness: warn_after: { count: 25, period: hour } diff --git a/dbt/package-lock.yml b/dbt/package-lock.yml new file mode 100644 index 0000000..034f423 --- /dev/null +++ b/dbt/package-lock.yml @@ -0,0 +1,8 @@ +packages: + - name: dbt_utils + package: dbt-labs/dbt_utils + version: 1.3.3 + - name: elementary + package: elementary-data/elementary + version: 0.22.1 +sha1_hash: 51fa23005683f751449a38a01754e2399b86bdfd diff --git a/tests/test_assets_funda.py b/tests/test_assets_funda.py index acfa13c..4254abc 100644 --- a/tests/test_assets_funda.py +++ b/tests/test_assets_funda.py @@ -5,9 +5,9 @@ from unittest.mock import MagicMock from dagster import materialize from data_platform.assets.ingestion.funda import ( - funda_listing_details, - funda_price_history, - funda_search_results, + raw_funda_listing_details, + raw_funda_price_history, + raw_funda_search_results, ) from data_platform.assets.ingestion.funda.funda import FundaSearchConfig from tests.conftest import make_mock_engine, make_mock_listing @@ -92,14 +92,14 @@ class TestFundaSearchResults: engine, _, _ = make_mock_engine() rows = inserted_rows if inserted_rows is not None else [] result = materialize( - [funda_search_results], + [raw_funda_search_results], resources={ "funda": MockFundaResource(mock_client), "postgres": MockPostgresResource(engine, rows), }, run_config={ "ops": { - "funda_search_results": { + "raw_funda_search_results": { "config": {"max_pages": 1, **(config or {})} } } @@ -112,7 +112,7 @@ class TestFundaSearchResults: client.search_listing.return_value = [] result = self._run(client) assert result.success - mat = result.asset_materializations_for_node("funda_search_results") + mat = result.asset_materializations_for_node("raw_funda_search_results") assert mat[0].metadata["count"].value == 0 def test_results_are_inserted(self): @@ -133,12 +133,14 @@ class TestFundaSearchResults: ] inserted = [] result = materialize( - [funda_search_results], + [raw_funda_search_results], resources={ "funda": MockFundaResource(client), "postgres": MockPostgresResource(make_mock_engine()[0], inserted), }, - run_config={"ops": {"funda_search_results": {"config": {"max_pages": 3}}}}, + run_config={ + "ops": {"raw_funda_search_results": {"config": {"max_pages": 3}}} + }, ) assert result.success assert client.search_listing.call_count == 2 @@ -195,7 +197,7 @@ class TestFundaListingDetails: def _run(self, mock_client, engine, inserted_rows=None): rows = inserted_rows if inserted_rows is not None else [] return materialize( - [funda_listing_details], + [raw_funda_listing_details], resources={ "funda": MockFundaResource(mock_client), "postgres": MockPostgresResource(engine, rows), @@ -207,7 +209,7 @@ class TestFundaListingDetails: client = MagicMock() result = self._run(client, engine) assert result.success - mat = result.asset_materializations_for_node("funda_listing_details") + mat = result.asset_materializations_for_node("raw_funda_listing_details") assert mat[0].metadata["count"].value == 0 def test_details_fetched_and_inserted(self): @@ -233,7 +235,7 @@ class TestFundaListingDetails: inserted = [] result = self._run(client, engine, inserted) assert result.success - mat = result.asset_materializations_for_node("funda_listing_details") + mat = result.asset_materializations_for_node("raw_funda_listing_details") assert mat[0].metadata["errors"].value == 1 assert len(inserted) == 1 @@ -242,7 +244,7 @@ class TestFundaPriceHistory: def _run(self, mock_client, engine, inserted_rows=None): rows = inserted_rows if inserted_rows is not None else [] return materialize( - [funda_price_history], + [raw_funda_price_history], resources={ "funda": MockFundaResource(mock_client), "postgres": MockPostgresResource(engine, rows), @@ -254,7 +256,7 @@ class TestFundaPriceHistory: client = MagicMock() result = self._run(client, engine) assert result.success - mat = result.asset_materializations_for_node("funda_price_history") + mat = result.asset_materializations_for_node("raw_funda_price_history") assert mat[0].metadata["count"].value == 0 def test_price_history_inserted(self): @@ -285,17 +287,17 @@ class TestFundaPriceHistory: assert len(inserted) == 2 assert inserted[0]["source"] == "Funda" assert inserted[1]["source"] == "WOZ" - mat = result.asset_materializations_for_node("funda_price_history") + mat = result.asset_materializations_for_node("raw_funda_price_history") assert mat[0].metadata["count"].value == 2 class TestFundaSearchConfig: def test_defaults(self): cfg = FundaSearchConfig() - assert cfg.location == "woerden, utrecht, zeist, maarssen, nieuwegein, gouda" + assert cfg.location == "woerden" assert cfg.offering_type == "buy" assert cfg.sort == "newest" - assert cfg.max_pages == 3 + assert cfg.max_pages == 10 assert cfg.price_min == 300000 assert cfg.price_max == 500000