26 lines
591 B
Python
26 lines
591 B
Python
"""dbt source freshness op."""
|
|
|
|
from dagster import Config, OpExecutionContext, op
|
|
from dagster_dbt import DbtCliResource
|
|
|
|
|
|
class SourceFreshnessConfig(Config):
|
|
"""Config for the source freshness op."""
|
|
|
|
source_name: str
|
|
|
|
|
|
@op
|
|
def check_source_freshness(
|
|
context: OpExecutionContext,
|
|
config: SourceFreshnessConfig,
|
|
dbt: DbtCliResource,
|
|
) -> None:
|
|
"""Run dbt source freshness for the configured source."""
|
|
list(
|
|
dbt.cli(
|
|
["source", "freshness", "--select", f"source:{config.source_name}"],
|
|
context=context,
|
|
).stream()
|
|
)
|