add tag to job
This commit is contained in:
@@ -1,28 +1,34 @@
|
|||||||
|
from functools import partial
|
||||||
|
|
||||||
import polars as pl
|
import polars as pl
|
||||||
from assets import deals
|
from assets import deals
|
||||||
|
from config import APP
|
||||||
|
|
||||||
import dagster as dg
|
import dagster as dg
|
||||||
|
|
||||||
deals_job = dg.define_asset_job(
|
kwargs = dict(tags={"app": APP})
|
||||||
|
job = partial(dg.job, **kwargs)
|
||||||
|
define_asset_job = partial(dg.define_asset_job, **kwargs)
|
||||||
|
|
||||||
|
deals_job = define_asset_job(
|
||||||
"deals_job", selection=[deals.key], partitions_def=deals.partitions_def
|
"deals_job", selection=[deals.key], partitions_def=deals.partitions_def
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@dg.op(required_resource_keys={"polars_parquet_io_manager"})
|
@dg.op(required_resource_keys={"polars_parquet_io_manager"})
|
||||||
def check_partitions(context: dg.OpExecutionContext):
|
def check_partitions(context: dg.OpExecutionContext) -> None:
|
||||||
asset_key = "deals"
|
asset_key = deals.key
|
||||||
|
|
||||||
# Fetch the materialized partitions for the asset key
|
# Fetch the materialized partitions for the asset key
|
||||||
materialized_partitions = context.instance.get_materialized_partitions(
|
materialized_partitions = context.instance.get_materialized_partitions(asset_key)
|
||||||
asset_key=dg.AssetKey(asset_key)
|
|
||||||
)
|
|
||||||
ic(materialized_partitions)
|
ic(materialized_partitions)
|
||||||
|
|
||||||
storage_dir = context.resources.polars_parquet_io_manager.base_dir
|
storage_dir = context.resources.polars_parquet_io_manager.base_dir
|
||||||
ic(storage_dir)
|
asset_path = "/".join(asset_key.path)
|
||||||
|
ic(storage_dir, asset_key, asset_path)
|
||||||
for row in (
|
for row in (
|
||||||
pl.scan_parquet(
|
pl.scan_parquet(
|
||||||
f"{storage_dir}/{asset_key}/*/*.parquet", # extra_columns="ignore"
|
f"{storage_dir}/{asset_path}/*/*.parquet", extra_columns="ignore"
|
||||||
)
|
)
|
||||||
.select(["date", "source"])
|
.select(["date", "source"])
|
||||||
.unique()
|
.unique()
|
||||||
@@ -37,6 +43,6 @@ def check_partitions(context: dg.OpExecutionContext):
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@dg.job
|
@job
|
||||||
def check_partitions_job():
|
def check_partitions_job():
|
||||||
check_partitions()
|
check_partitions()
|
||||||
|
|||||||
Reference in New Issue
Block a user