From cc51ae2b6a3fee24b4f37d890f8f3ef8de24f4a7 Mon Sep 17 00:00:00 2001 From: Rik Veenboer Date: Tue, 22 Jul 2025 16:53:14 +0200 Subject: [PATCH] delete unused partition mapping helper --- apps/other/src/assets.py | 2 + apps/other/src/mapping.py | 107 -------------------------------------- 2 files changed, 2 insertions(+), 107 deletions(-) delete mode 100644 apps/other/src/mapping.py diff --git a/apps/other/src/assets.py b/apps/other/src/assets.py index 3b06b11..3b2ac53 100644 --- a/apps/other/src/assets.py +++ b/apps/other/src/assets.py @@ -68,6 +68,7 @@ def asset_single_2(context, asset_single_1): return f"Processed data for {partition_key}" +# Special partition mapping that should give the last two partitions for "date" and all partitions for "source" partition_mapping = MultiPartitionMapping( { "date": DimensionPartitionMapping( @@ -87,6 +88,7 @@ partition_mapping = MultiPartitionMapping( ins={"asset_multi_1": AssetIn(partition_mapping=partition_mapping)}, ) def asset_multi_2(context, asset_multi_1): + # Towards some delta mechanism between two partitions ic() ic(context.partition_key) ic(context.partition_key.keys_by_dimension) diff --git a/apps/other/src/mapping.py b/apps/other/src/mapping.py deleted file mode 100644 index 411809f..0000000 --- a/apps/other/src/mapping.py +++ /dev/null @@ -1,107 +0,0 @@ -from datetime import datetime, timedelta -from typing import Optional - -from dagster import MultiPartitionKey, PartitionMapping, PartitionsDefinition -from dagster._core.definitions.partition import PartitionsSubset -from dagster._core.definitions.partition_mapping import ( - MultiPartitionMapping, - UpstreamPartitionsResult, -) -from dagster._core.instance import DynamicPartitionsStore -from dagster._serdes import whitelist_for_serdes - - -class LatestTwoPartitionsMapping(PartitionMapping): - def get_upstream_mapped_partitions_result_for_partitions( - self, - downstream_partitions_subset: Optional[PartitionsSubset], - downstream_partitions_def: Optional[PartitionsDefinition], - upstream_partitions_def: PartitionsDefinition, - current_time: Optional[datetime] = None, - dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, - ) -> UpstreamPartitionsResult: - ic() - - # Get upstream partitions from the subset - all_partitions = upstream_partitions_def.get_partition_keys() - ic(all_partitions) - - if len(all_partitions) < 2: - raise ValueError("Not enough partitions to proceed.") - - # Select the last two partitions - partition_keys = [all_partitions[-2], all_partitions[-1]] - return UpstreamPartitionsResult( - upstream_partitions_def.subset_with_partition_keys(partition_keys), [] - ) - - def get_downstream_partitions_for_partitions( - self, - upstream_partitions_subset: PartitionsSubset, - downstream_partitions_def, - upstream_partitions_def, - ) -> PartitionsSubset: - ic() - # Get the downstream partition that corresponds to the latest upstream partition - downstream_partition_key = upstream_partitions_subset.get_partition_keys()[-1] - return downstream_partitions_def.subset_with_partition_keys( - [downstream_partition_key] - ) - - @property - def description(self): - return "Maps to the latest two upstream partitions." - - -@whitelist_for_serdes -class X(MultiPartitionMapping): - def get_upstream_partitions_for_partition_range( - self, - downstream_partition_range, - upstream_partitions_def, - downstream_partitions_def, - ) -> UpstreamPartitionsResult: - ic() - - # Extract downstream partition range keys - downstream_keys = downstream_partition_range.get_partition_keys() - - # Initialize a list to hold the upstream partition keys - upstream_keys = [] - - # Iterate over each downstream partition key - for downstream_key in downstream_keys: - # Parse the MultiPartitionKey - downstream_mpk = MultiPartitionKey.from_str(downstream_key) - - for i in [1, 2]: - # Shift the daily partition by one day - shifted_date = datetime.strptime( - downstream_mpk.keys_by_dimension["date"], "%Y-%m-%d" - ) - timedelta(days=i) - - # Recreate the MultiPartitionKey with the shifted daily partition - upstream_mpk = MultiPartitionKey( - { - "source": downstream_mpk.keys_by_dimension["source"], - "date": shifted_date.strftime("%Y-%m-%d"), - } - ) - - # Add the upstream partition key - upstream_keys.append(upstream_mpk.to_string()) - - return UpstreamPartitionsResult( - upstream_partitions_def.subset_with_partition_keys(upstream_keys), [] - ) - - def get_downstream_partitions_for_partition_range( - self, - upstream_partition_range, - downstream_partitions_def, - upstream_partitions_def, - ) -> PartitionsSubset: - # This method would map upstream partitions back to downstream, but for simplicity, let's assume it's symmetric. - return self.get_upstream_partitions_for_partition_range( - upstream_partition_range, upstream_partitions_def, downstream_partitions_def - )