delete unused partition mapping helper
This commit is contained in:
@@ -68,6 +68,7 @@ def asset_single_2(context, asset_single_1):
|
||||
return f"Processed data for {partition_key}"
|
||||
|
||||
|
||||
# Special partition mapping that should give the last two partitions for "date" and all partitions for "source"
|
||||
partition_mapping = MultiPartitionMapping(
|
||||
{
|
||||
"date": DimensionPartitionMapping(
|
||||
@@ -87,6 +88,7 @@ partition_mapping = MultiPartitionMapping(
|
||||
ins={"asset_multi_1": AssetIn(partition_mapping=partition_mapping)},
|
||||
)
|
||||
def asset_multi_2(context, asset_multi_1):
|
||||
# Towards some delta mechanism between two partitions
|
||||
ic()
|
||||
ic(context.partition_key)
|
||||
ic(context.partition_key.keys_by_dimension)
|
||||
|
||||
@@ -1,107 +0,0 @@
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Optional
|
||||
|
||||
from dagster import MultiPartitionKey, PartitionMapping, PartitionsDefinition
|
||||
from dagster._core.definitions.partition import PartitionsSubset
|
||||
from dagster._core.definitions.partition_mapping import (
|
||||
MultiPartitionMapping,
|
||||
UpstreamPartitionsResult,
|
||||
)
|
||||
from dagster._core.instance import DynamicPartitionsStore
|
||||
from dagster._serdes import whitelist_for_serdes
|
||||
|
||||
|
||||
class LatestTwoPartitionsMapping(PartitionMapping):
|
||||
def get_upstream_mapped_partitions_result_for_partitions(
|
||||
self,
|
||||
downstream_partitions_subset: Optional[PartitionsSubset],
|
||||
downstream_partitions_def: Optional[PartitionsDefinition],
|
||||
upstream_partitions_def: PartitionsDefinition,
|
||||
current_time: Optional[datetime] = None,
|
||||
dynamic_partitions_store: Optional[DynamicPartitionsStore] = None,
|
||||
) -> UpstreamPartitionsResult:
|
||||
ic()
|
||||
|
||||
# Get upstream partitions from the subset
|
||||
all_partitions = upstream_partitions_def.get_partition_keys()
|
||||
ic(all_partitions)
|
||||
|
||||
if len(all_partitions) < 2:
|
||||
raise ValueError("Not enough partitions to proceed.")
|
||||
|
||||
# Select the last two partitions
|
||||
partition_keys = [all_partitions[-2], all_partitions[-1]]
|
||||
return UpstreamPartitionsResult(
|
||||
upstream_partitions_def.subset_with_partition_keys(partition_keys), []
|
||||
)
|
||||
|
||||
def get_downstream_partitions_for_partitions(
|
||||
self,
|
||||
upstream_partitions_subset: PartitionsSubset,
|
||||
downstream_partitions_def,
|
||||
upstream_partitions_def,
|
||||
) -> PartitionsSubset:
|
||||
ic()
|
||||
# Get the downstream partition that corresponds to the latest upstream partition
|
||||
downstream_partition_key = upstream_partitions_subset.get_partition_keys()[-1]
|
||||
return downstream_partitions_def.subset_with_partition_keys(
|
||||
[downstream_partition_key]
|
||||
)
|
||||
|
||||
@property
|
||||
def description(self):
|
||||
return "Maps to the latest two upstream partitions."
|
||||
|
||||
|
||||
@whitelist_for_serdes
|
||||
class X(MultiPartitionMapping):
|
||||
def get_upstream_partitions_for_partition_range(
|
||||
self,
|
||||
downstream_partition_range,
|
||||
upstream_partitions_def,
|
||||
downstream_partitions_def,
|
||||
) -> UpstreamPartitionsResult:
|
||||
ic()
|
||||
|
||||
# Extract downstream partition range keys
|
||||
downstream_keys = downstream_partition_range.get_partition_keys()
|
||||
|
||||
# Initialize a list to hold the upstream partition keys
|
||||
upstream_keys = []
|
||||
|
||||
# Iterate over each downstream partition key
|
||||
for downstream_key in downstream_keys:
|
||||
# Parse the MultiPartitionKey
|
||||
downstream_mpk = MultiPartitionKey.from_str(downstream_key)
|
||||
|
||||
for i in [1, 2]:
|
||||
# Shift the daily partition by one day
|
||||
shifted_date = datetime.strptime(
|
||||
downstream_mpk.keys_by_dimension["date"], "%Y-%m-%d"
|
||||
) - timedelta(days=i)
|
||||
|
||||
# Recreate the MultiPartitionKey with the shifted daily partition
|
||||
upstream_mpk = MultiPartitionKey(
|
||||
{
|
||||
"source": downstream_mpk.keys_by_dimension["source"],
|
||||
"date": shifted_date.strftime("%Y-%m-%d"),
|
||||
}
|
||||
)
|
||||
|
||||
# Add the upstream partition key
|
||||
upstream_keys.append(upstream_mpk.to_string())
|
||||
|
||||
return UpstreamPartitionsResult(
|
||||
upstream_partitions_def.subset_with_partition_keys(upstream_keys), []
|
||||
)
|
||||
|
||||
def get_downstream_partitions_for_partition_range(
|
||||
self,
|
||||
upstream_partition_range,
|
||||
downstream_partitions_def,
|
||||
upstream_partitions_def,
|
||||
) -> PartitionsSubset:
|
||||
# This method would map upstream partitions back to downstream, but for simplicity, let's assume it's symmetric.
|
||||
return self.get_upstream_partitions_for_partition_range(
|
||||
upstream_partition_range, upstream_partitions_def, downstream_partitions_def
|
||||
)
|
||||
Reference in New Issue
Block a user