warn for missing partitions
This commit is contained in:
@@ -232,7 +232,9 @@ def new_deals(
|
|||||||
},
|
},
|
||||||
ins={"partitions": dg.AssetIn(key=new_deals.key)},
|
ins={"partitions": dg.AssetIn(key=new_deals.key)},
|
||||||
output_required=False,
|
output_required=False,
|
||||||
automation_condition=dg.AutomationCondition.eager().without(~dg.AutomationCondition.any_deps_missing())
|
automation_condition=dg.AutomationCondition.eager().without(
|
||||||
|
~dg.AutomationCondition.any_deps_missing()
|
||||||
|
),
|
||||||
)
|
)
|
||||||
def good_deals(
|
def good_deals(
|
||||||
context: dg.AssetExecutionContext,
|
context: dg.AssetExecutionContext,
|
||||||
@@ -242,6 +244,9 @@ def good_deals(
|
|||||||
parsed_partition_keys = parse_partition_keys(context, "partitions")
|
parsed_partition_keys = parse_partition_keys(context, "partitions")
|
||||||
ic(parsed_partition_keys)
|
ic(parsed_partition_keys)
|
||||||
|
|
||||||
|
if not partitions:
|
||||||
|
logger.warning("Partitions are empty!")
|
||||||
|
return
|
||||||
df = pl.concat(partitions.values(), how="vertical_relaxed").collect()
|
df = pl.concat(partitions.values(), how="vertical_relaxed").collect()
|
||||||
|
|
||||||
counts = dict(df.group_by("source").len().iter_rows())
|
counts = dict(df.group_by("source").len().iter_rows())
|
||||||
|
|||||||
Reference in New Issue
Block a user