dynamic partition names

This commit is contained in:
2025-08-02 10:49:56 +02:00
parent 2a032cf25d
commit b5f7952c07
3 changed files with 7 additions and 4 deletions

View File

@@ -179,7 +179,7 @@ def new_deals(
ic(partitions.keys()) ic(partitions.keys())
partition_keys = get_partition_keys(context) partition_keys = get_partition_keys(context)
ic(partition_keys) ic(partition_keys.keys())
parsed_partition_keys = parse_partition_keys(context, "partitions") parsed_partition_keys = parse_partition_keys(context, "partitions")
ic(parsed_partition_keys) ic(parsed_partition_keys)

View File

@@ -10,7 +10,7 @@ kwargs = dict(tags={"app": APP})
job = partial(dg.job, **kwargs) job = partial(dg.job, **kwargs)
define_asset_job = partial(dg.define_asset_job, **kwargs) define_asset_job = partial(dg.define_asset_job, **kwargs)
deals_job = define_asset_job( deals_job = dg.define_asset_job(
"deals_job", "deals_job",
selection=[assets.deals.key], selection=[assets.deals.key],
partitions_def=assets.deals.partitions_def, partitions_def=assets.deals.partitions_def,
@@ -32,12 +32,15 @@ def check_partitions(context: dg.OpExecutionContext) -> None:
asset_path = "/".join(asset_key.path) asset_path = "/".join(asset_key.path)
ic(storage_dir, asset_key, asset_path) ic(storage_dir, asset_key, asset_path)
dimension_names = asset.partitions_def.partition_dimension_names
ic(dimension_names)
partitions = [] partitions = []
for row in ( for row in (
pl.scan_parquet( pl.scan_parquet(
f"{storage_dir}/{asset_path}/*/*.parquet", extra_columns="ignore" f"{storage_dir}/{asset_path}/*/*.parquet", extra_columns="ignore"
) )
.select(["date", "source"]) # asset.partitions_def.names ? .select(dimension_names)
.unique() .unique()
.with_columns(pl.exclude(pl.String).cast(str)) .with_columns(pl.exclude(pl.String).cast(str))
.collect() .collect()

View File

@@ -21,7 +21,7 @@ def get_dimension_names(
""" """
partition_definition = context.asset_partitions_def_for_input(input_name) partition_definition = context.asset_partitions_def_for_input(input_name)
if isinstance(partition_definition, dg.MultiPartitionsDefinition): if isinstance(partition_definition, dg.MultiPartitionsDefinition):
return [x.name for x in partition_definition.partitions_defs] return partition_definition.partition_dimension_names
raise NotImplementedError("Only MultiPartitionsDefinition is supported.") raise NotImplementedError("Only MultiPartitionsDefinition is supported.")