feat: revise funda data process

This commit is contained in:
Stijnvandenbroek
2026-03-05 19:12:33 +00:00
parent ef0cddaa22
commit b959049fe8
11 changed files with 68 additions and 135 deletions

View File

@@ -1,9 +1,13 @@
"""Funda ingestion assets."""
from data_platform.assets.ingestion.funda.funda import (
funda_listing_details,
funda_price_history,
funda_search_results,
raw_funda_listing_details,
raw_funda_price_history,
raw_funda_search_results,
)
__all__ = ["funda_listing_details", "funda_price_history", "funda_search_results"]
__all__ = [
"raw_funda_listing_details",
"raw_funda_price_history",
"raw_funda_search_results",
]

View File

@@ -60,7 +60,7 @@ class FundaPriceHistoryConfig(Config):
kinds={"python", "postgres"},
description="Search Funda listings and store results in Postgres.",
)
def funda_search_results(
def raw_funda_search_results(
context: AssetExecutionContext,
config: FundaSearchConfig,
funda: FundaResource,
@@ -189,10 +189,10 @@ def funda_search_results(
@asset(
group_name="funda",
kinds={"python", "postgres"},
deps=[funda_search_results],
deps=[raw_funda_search_results],
description="Fetch full listing details for each search result and store in Postgres.",
)
def funda_listing_details(
def raw_funda_listing_details(
context: AssetExecutionContext,
config: FundaDetailsConfig,
funda: FundaResource,
@@ -332,10 +332,10 @@ def funda_listing_details(
@asset(
group_name="funda",
kinds={"python", "postgres"},
deps=[funda_listing_details],
deps=[raw_funda_listing_details],
description="Fetch price history for each detailed listing and store in Postgres.",
)
def funda_price_history(
def raw_funda_price_history(
context: AssetExecutionContext,
config: FundaPriceHistoryConfig,
funda: FundaResource,

View File

@@ -3,9 +3,9 @@ from dagster_dbt import DbtCliResource
from data_platform.assets.dbt import DBT_PROJECT_DIR, dbt_project_assets
from data_platform.assets.ingestion.funda import (
funda_listing_details,
funda_price_history,
funda_search_results,
raw_funda_listing_details,
raw_funda_price_history,
raw_funda_search_results,
)
from data_platform.helpers import apply_automation
from data_platform.jobs import (
@@ -24,9 +24,9 @@ defs = Definitions(
assets=apply_automation(
[
dbt_project_assets,
funda_search_results,
funda_listing_details,
funda_price_history,
raw_funda_search_results,
raw_funda_listing_details,
raw_funda_price_history,
]
),
jobs=[funda_ingestion_job, funda_raw_quality_job, elementary_refresh_job],

View File

@@ -10,9 +10,9 @@ from data_platform.ops.check_source_freshness import (
funda_ingestion_job = define_asset_job(
name="funda_ingestion",
selection=AssetSelection.assets(
"funda_search_results",
"funda_listing_details",
"funda_price_history",
"raw_funda_search_results",
"raw_funda_listing_details",
"raw_funda_price_history",
),
description="Full Funda ingestion pipeline.",
)

View File

@@ -15,9 +15,9 @@ funda_ingestion_schedule = ScheduleDefinition(
cron_schedule="0 */4 * * *",
run_config=RunConfig(
ops={
"funda_search_results": FundaSearchConfig(),
"funda_listing_details": FundaDetailsConfig(),
"funda_price_history": FundaPriceHistoryConfig(),
"raw_funda_search_results": FundaSearchConfig(),
"raw_funda_listing_details": FundaDetailsConfig(),
"raw_funda_price_history": FundaPriceHistoryConfig(),
}
),
default_status=DefaultScheduleStatus.RUNNING,

View File

@@ -1,29 +0,0 @@
-- Mart: per-city price statistics for available listings.
with listings as (
select * from {{ ref('funda_listings') }}
where not is_sold
),
city_stats as (
select
city,
province,
offering_type,
object_type,
count(*) as listing_count,
round(avg(current_price), 0) as avg_price,
min(current_price) as min_price,
max(current_price) as max_price,
percentile_cont(0.5) within group (
order by current_price
) as median_price,
round(avg(price_per_sqm), 0) as avg_price_per_sqm,
round(avg(living_area), 0) as avg_living_area,
round(avg(bedrooms), 1) as avg_bedrooms
from listings
where current_price is not null
group by city, province, offering_type, object_type
)
select * from city_stats

View File

@@ -1,64 +0,0 @@
version: 2
models:
- name: funda_city_stats
description: >
Aggregated price statistics per city, province, offering type and object type. Only includes
currently available (not sold) listings.
config:
contract:
enforced: true
meta:
dagster:
group: funda
columns:
- name: city
description: City name.
data_type: text
constraints:
- type: not_null
tests:
- not_null
- name: province
description: Province name.
data_type: text
- name: offering_type
description: Buy or rent.
data_type: text
constraints:
- type: not_null
tests:
- not_null
- name: object_type
description: Property type.
data_type: text
- name: listing_count
description: Number of active listings in this group.
data_type: bigint
constraints:
- type: not_null
tests:
- not_null
- dbt_utils.expression_is_true:
expression: "> 0"
- name: avg_price
description: Average asking price.
data_type: numeric
- name: min_price
description: Lowest asking price in this group.
data_type: bigint
- name: max_price
description: Highest asking price in this group.
data_type: bigint
- name: median_price
description: Median asking price.
data_type: double precision
- name: avg_price_per_sqm
description: Average price per square metre.
data_type: numeric
- name: avg_living_area
description: Average living area in m².
data_type: numeric
- name: avg_bedrooms
description: Average number of bedrooms.
data_type: numeric

View File

@@ -1,8 +1,20 @@
-- Mart: analysis-ready Funda listings table.
-- Selects the most useful fields and adds derived metrics.
-- Incrementally loads enriched listings, updating existing rows on re-ingestion.
{{
config(
materialized='incremental',
unique_key='global_id',
on_schema_change='fail'
)
}}
with enriched as (
select * from {{ ref('int_funda_listings_enriched') }}
select *
from {{ ref('int_funda_listings_enriched') }}
{% if is_incremental() %}
where ingested_at > (select max(ingested_at) from {{ this }}) -- noqa: RF02
{% endif %}
),
final as (

View File

@@ -10,7 +10,7 @@ sources:
description: Funda search results (broad overview of matching listings).
meta:
dagster:
asset_key: ["funda_search_results"]
asset_key: ["raw_funda_search_results"]
loaded_at_field: last_seen_at
freshness:
warn_after: { count: 12, period: hour }
@@ -51,7 +51,7 @@ sources:
Full listing details fetched per search result (50+ fields).
meta:
dagster:
asset_key: ["funda_listing_details"]
asset_key: ["raw_funda_listing_details"]
loaded_at_field: last_fetched_at
freshness:
warn_after: { count: 25, period: hour }
@@ -96,7 +96,7 @@ sources:
Historical price data per listing (asking prices, WOZ, sales).
meta:
dagster:
asset_key: ["funda_price_history"]
asset_key: ["raw_funda_price_history"]
loaded_at_field: ingested_at
freshness:
warn_after: { count: 25, period: hour }

8
dbt/package-lock.yml Normal file
View File

@@ -0,0 +1,8 @@
packages:
- name: dbt_utils
package: dbt-labs/dbt_utils
version: 1.3.3
- name: elementary
package: elementary-data/elementary
version: 0.22.1
sha1_hash: 51fa23005683f751449a38a01754e2399b86bdfd

View File

@@ -5,9 +5,9 @@ from unittest.mock import MagicMock
from dagster import materialize
from data_platform.assets.ingestion.funda import (
funda_listing_details,
funda_price_history,
funda_search_results,
raw_funda_listing_details,
raw_funda_price_history,
raw_funda_search_results,
)
from data_platform.assets.ingestion.funda.funda import FundaSearchConfig
from tests.conftest import make_mock_engine, make_mock_listing
@@ -92,14 +92,14 @@ class TestFundaSearchResults:
engine, _, _ = make_mock_engine()
rows = inserted_rows if inserted_rows is not None else []
result = materialize(
[funda_search_results],
[raw_funda_search_results],
resources={
"funda": MockFundaResource(mock_client),
"postgres": MockPostgresResource(engine, rows),
},
run_config={
"ops": {
"funda_search_results": {
"raw_funda_search_results": {
"config": {"max_pages": 1, **(config or {})}
}
}
@@ -112,7 +112,7 @@ class TestFundaSearchResults:
client.search_listing.return_value = []
result = self._run(client)
assert result.success
mat = result.asset_materializations_for_node("funda_search_results")
mat = result.asset_materializations_for_node("raw_funda_search_results")
assert mat[0].metadata["count"].value == 0
def test_results_are_inserted(self):
@@ -133,12 +133,14 @@ class TestFundaSearchResults:
]
inserted = []
result = materialize(
[funda_search_results],
[raw_funda_search_results],
resources={
"funda": MockFundaResource(client),
"postgres": MockPostgresResource(make_mock_engine()[0], inserted),
},
run_config={"ops": {"funda_search_results": {"config": {"max_pages": 3}}}},
run_config={
"ops": {"raw_funda_search_results": {"config": {"max_pages": 3}}}
},
)
assert result.success
assert client.search_listing.call_count == 2
@@ -195,7 +197,7 @@ class TestFundaListingDetails:
def _run(self, mock_client, engine, inserted_rows=None):
rows = inserted_rows if inserted_rows is not None else []
return materialize(
[funda_listing_details],
[raw_funda_listing_details],
resources={
"funda": MockFundaResource(mock_client),
"postgres": MockPostgresResource(engine, rows),
@@ -207,7 +209,7 @@ class TestFundaListingDetails:
client = MagicMock()
result = self._run(client, engine)
assert result.success
mat = result.asset_materializations_for_node("funda_listing_details")
mat = result.asset_materializations_for_node("raw_funda_listing_details")
assert mat[0].metadata["count"].value == 0
def test_details_fetched_and_inserted(self):
@@ -233,7 +235,7 @@ class TestFundaListingDetails:
inserted = []
result = self._run(client, engine, inserted)
assert result.success
mat = result.asset_materializations_for_node("funda_listing_details")
mat = result.asset_materializations_for_node("raw_funda_listing_details")
assert mat[0].metadata["errors"].value == 1
assert len(inserted) == 1
@@ -242,7 +244,7 @@ class TestFundaPriceHistory:
def _run(self, mock_client, engine, inserted_rows=None):
rows = inserted_rows if inserted_rows is not None else []
return materialize(
[funda_price_history],
[raw_funda_price_history],
resources={
"funda": MockFundaResource(mock_client),
"postgres": MockPostgresResource(engine, rows),
@@ -254,7 +256,7 @@ class TestFundaPriceHistory:
client = MagicMock()
result = self._run(client, engine)
assert result.success
mat = result.asset_materializations_for_node("funda_price_history")
mat = result.asset_materializations_for_node("raw_funda_price_history")
assert mat[0].metadata["count"].value == 0
def test_price_history_inserted(self):
@@ -285,17 +287,17 @@ class TestFundaPriceHistory:
assert len(inserted) == 2
assert inserted[0]["source"] == "Funda"
assert inserted[1]["source"] == "WOZ"
mat = result.asset_materializations_for_node("funda_price_history")
mat = result.asset_materializations_for_node("raw_funda_price_history")
assert mat[0].metadata["count"].value == 2
class TestFundaSearchConfig:
def test_defaults(self):
cfg = FundaSearchConfig()
assert cfg.location == "woerden, utrecht, zeist, maarssen, nieuwegein, gouda"
assert cfg.location == "woerden"
assert cfg.offering_type == "buy"
assert cfg.sort == "newest"
assert cfg.max_pages == 3
assert cfg.max_pages == 10
assert cfg.price_min == 300000
assert cfg.price_max == 500000