From b5f7952c07dcd644e54dff34e5ac47fe9d88b383 Mon Sep 17 00:00:00 2001 From: Rik Veenboer Date: Sat, 2 Aug 2025 10:49:56 +0200 Subject: [PATCH] dynamic partition names --- apps/vinyl/src/assets.py | 2 +- apps/vinyl/src/jobs.py | 7 +++++-- shared/src/shared/utils.py | 2 +- 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/apps/vinyl/src/assets.py b/apps/vinyl/src/assets.py index 7461cf7..c54aea8 100644 --- a/apps/vinyl/src/assets.py +++ b/apps/vinyl/src/assets.py @@ -179,7 +179,7 @@ def new_deals( ic(partitions.keys()) partition_keys = get_partition_keys(context) - ic(partition_keys) + ic(partition_keys.keys()) parsed_partition_keys = parse_partition_keys(context, "partitions") ic(parsed_partition_keys) diff --git a/apps/vinyl/src/jobs.py b/apps/vinyl/src/jobs.py index 5ac0d99..46c4d4e 100644 --- a/apps/vinyl/src/jobs.py +++ b/apps/vinyl/src/jobs.py @@ -10,7 +10,7 @@ kwargs = dict(tags={"app": APP}) job = partial(dg.job, **kwargs) define_asset_job = partial(dg.define_asset_job, **kwargs) -deals_job = define_asset_job( +deals_job = dg.define_asset_job( "deals_job", selection=[assets.deals.key], partitions_def=assets.deals.partitions_def, @@ -32,12 +32,15 @@ def check_partitions(context: dg.OpExecutionContext) -> None: asset_path = "/".join(asset_key.path) ic(storage_dir, asset_key, asset_path) + dimension_names = asset.partitions_def.partition_dimension_names + ic(dimension_names) + partitions = [] for row in ( pl.scan_parquet( f"{storage_dir}/{asset_path}/*/*.parquet", extra_columns="ignore" ) - .select(["date", "source"]) # asset.partitions_def.names ? + .select(dimension_names) .unique() .with_columns(pl.exclude(pl.String).cast(str)) .collect() diff --git a/shared/src/shared/utils.py b/shared/src/shared/utils.py index 34539d8..9f94f00 100644 --- a/shared/src/shared/utils.py +++ b/shared/src/shared/utils.py @@ -21,7 +21,7 @@ def get_dimension_names( """ partition_definition = context.asset_partitions_def_for_input(input_name) if isinstance(partition_definition, dg.MultiPartitionsDefinition): - return [x.name for x in partition_definition.partitions_defs] + return partition_definition.partition_dimension_names raise NotImplementedError("Only MultiPartitionsDefinition is supported.")