feat: expand testing

This commit is contained in:
Stijnvandenbroek
2026-03-04 22:18:30 +00:00
parent 0d2706a93e
commit 0b9b408714
22 changed files with 1266 additions and 54 deletions

View File

@@ -7,8 +7,17 @@ from data_platform.assets.ingestion.funda import (
funda_price_history,
funda_search_results,
)
from data_platform.jobs import (
elementary_refresh_job,
funda_ingestion_job,
funda_raw_quality_job,
)
from data_platform.resources import FundaResource, PostgresResource
from data_platform.schedules import funda_ingestion_job, funda_ingestion_schedule
from data_platform.schedules import (
elementary_refresh_schedule,
funda_ingestion_schedule,
funda_raw_quality_schedule,
)
defs = Definitions(
assets=[
@@ -17,8 +26,12 @@ defs = Definitions(
funda_listing_details,
funda_price_history,
],
jobs=[funda_ingestion_job],
schedules=[funda_ingestion_schedule],
jobs=[funda_ingestion_job, funda_raw_quality_job, elementary_refresh_job],
schedules=[
funda_ingestion_schedule,
funda_raw_quality_schedule,
elementary_refresh_schedule,
],
resources={
"dbt": DbtCliResource(project_dir=str(DBT_PROJECT_DIR)),
"funda": FundaResource(),

View File

@@ -0,0 +1,8 @@
from data_platform.jobs.elementary import elementary_refresh_job
from data_platform.jobs.funda import funda_ingestion_job, funda_raw_quality_job
__all__ = [
"funda_ingestion_job",
"funda_raw_quality_job",
"elementary_refresh_job",
]

View File

@@ -0,0 +1,10 @@
"""Elementary jobs."""
from dagster import job
from data_platform.ops.elementary import elementary_generate_report
@job(description="Regenerate the Elementary data observability report.")
def elementary_refresh_job():
elementary_generate_report()

View File

@@ -0,0 +1,28 @@
"""Funda jobs."""
from dagster import AssetSelection, RunConfig, define_asset_job, job
from data_platform.ops.check_source_freshness import (
SourceFreshnessConfig,
check_source_freshness,
)
funda_ingestion_job = define_asset_job(
name="funda_ingestion",
selection=AssetSelection.assets(
"funda_search_results",
"funda_listing_details",
"funda_price_history",
),
description="Full Funda ingestion pipeline.",
)
@job(
description="dbt source freshness checks for all raw_funda sources.",
config=RunConfig(
ops={"check_source_freshness": SourceFreshnessConfig(source_name="raw_funda")}
),
)
def funda_raw_quality_job():
check_source_freshness()

View File

@@ -0,0 +1,11 @@
from data_platform.ops.check_source_freshness import (
SourceFreshnessConfig,
check_source_freshness,
)
from data_platform.ops.elementary import elementary_generate_report
__all__ = [
"check_source_freshness",
"SourceFreshnessConfig",
"elementary_generate_report",
]

View File

@@ -0,0 +1,25 @@
"""dbt source freshness op."""
from dagster import Config, OpExecutionContext, op
from dagster_dbt import DbtCliResource
class SourceFreshnessConfig(Config):
"""Config for the source freshness op."""
source_name: str
@op
def check_source_freshness(
context: OpExecutionContext,
config: SourceFreshnessConfig,
dbt: DbtCliResource,
) -> None:
"""Run dbt source freshness for the configured source."""
list(
dbt.cli(
["source", "freshness", "--select", f"source:{config.source_name}"],
context=context,
).stream()
)

View File

@@ -0,0 +1,31 @@
"""Elementary report generation op."""
import subprocess
from pathlib import Path
from dagster import OpExecutionContext, op
_DBT_DIR = Path(__file__).parents[2] / "dbt"
@op
def elementary_generate_report(context: OpExecutionContext) -> None:
"""Run edr report to regenerate the Elementary HTML report."""
cmd = [
"edr",
"report",
"--profiles-dir",
str(_DBT_DIR),
"--project-dir",
str(_DBT_DIR),
"--disable-open-browser",
]
context.log.info(f"Running: {' '.join(cmd)}")
result = subprocess.run(cmd, capture_output=True, text=True)
if result.stdout:
context.log.info(result.stdout)
if result.stderr:
context.log.warning(result.stderr)
if result.returncode != 0:
raise Exception(f"edr report failed with exit code {result.returncode}")
context.log.info("Elementary report generated successfully.")

View File

@@ -0,0 +1,11 @@
from data_platform.schedules.elementary import elementary_refresh_schedule
from data_platform.schedules.funda import (
funda_ingestion_schedule,
funda_raw_quality_schedule,
)
__all__ = [
"funda_ingestion_schedule",
"funda_raw_quality_schedule",
"elementary_refresh_schedule",
]

View File

@@ -0,0 +1,13 @@
"""Elementary schedules."""
from dagster import DefaultScheduleStatus, ScheduleDefinition
from data_platform.jobs.elementary import elementary_refresh_job
elementary_refresh_schedule = ScheduleDefinition(
name="elementary_refresh_schedule",
job=elementary_refresh_job,
cron_schedule="0 9 * * *",
description="Regenerate the Elementary report daily at 09:00 UTC.",
default_status=DefaultScheduleStatus.RUNNING,
)

View File

@@ -1,28 +1,13 @@
"""Dagster jobs and schedules."""
"""Funda schedules."""
from dagster import (
AssetSelection,
DefaultScheduleStatus,
RunConfig,
ScheduleDefinition,
define_asset_job,
)
from dagster import DefaultScheduleStatus, RunConfig, ScheduleDefinition
from data_platform.assets.ingestion.funda.funda import (
FundaDetailsConfig,
FundaPriceHistoryConfig,
FundaSearchConfig,
)
funda_ingestion_job = define_asset_job(
name="funda_ingestion",
selection=AssetSelection.assets(
"funda_search_results",
"funda_listing_details",
"funda_price_history",
),
description="Full Funda ingestion pipeline.",
)
from data_platform.jobs.funda import funda_ingestion_job, funda_raw_quality_job
funda_ingestion_schedule = ScheduleDefinition(
name="funda_ingestion_schedule",
@@ -37,3 +22,11 @@ funda_ingestion_schedule = ScheduleDefinition(
),
default_status=DefaultScheduleStatus.RUNNING,
)
funda_raw_quality_schedule = ScheduleDefinition(
name="funda_raw_quality_schedule",
job=funda_raw_quality_job,
cron_schedule="0 8 * * *",
description="Daily quality checks on all raw Funda tables.",
default_status=DefaultScheduleStatus.RUNNING,
)