Files
dagster/apps/other/src/assets.py

98 lines
2.4 KiB
Python

import polars as pl
from dagster import (
AssetIn,
DailyPartitionsDefinition,
DimensionPartitionMapping,
IdentityPartitionMapping,
MultiPartitionMapping,
MultiPartitionsDefinition,
StaticPartitionsDefinition,
TimeWindowPartitionMapping,
asset,
)
partitions_def_single = DailyPartitionsDefinition(start_date="2024-09-20")
partitions_def_multi = MultiPartitionsDefinition(
{
"date": DailyPartitionsDefinition(start_date="2024-09-20"),
"source": StaticPartitionsDefinition(["plato", "sounds"]),
}
)
@asset(
io_manager_key="polars_parquet_io_manager",
partitions_def=partitions_def_single,
metadata={
"partition_by": ["date"],
},
)
def asset_single_1(context):
ic()
ic(context.partition_key)
return pl.DataFrame(
[{"date": context.partition_key, "data": f"Data for {context.partition_key}"}]
)
@asset(
io_manager_key="polars_parquet_io_manager",
partitions_def=partitions_def_multi,
metadata={
"partition_by": ["date", "source"],
},
)
def asset_multi_1(context):
ic()
ic(context.partition_key)
return pl.DataFrame(
[{"date": context.partition_key, "data": f"Data for {context.partition_key}"}]
)
@asset(
partitions_def=partitions_def_single,
ins={
"asset_single_1": AssetIn(
partition_mapping=TimeWindowPartitionMapping(start_offset=-1, end_offset=0)
)
},
)
def asset_single_2(context, asset_single_1):
ic()
ic(context.partition_key)
ic(asset_single_1.keys())
partition_key = context.asset_partition_key_for_output()
return f"Processed data for {partition_key}"
partition_mapping = MultiPartitionMapping(
{
"date": DimensionPartitionMapping(
dimension_name="date",
partition_mapping=TimeWindowPartitionMapping(start_offset=-1, end_offset=0),
),
"source": DimensionPartitionMapping(
dimension_name="source",
partition_mapping=IdentityPartitionMapping(),
),
}
)
@asset(
partitions_def=partitions_def_multi,
ins={"asset_multi_1": AssetIn(partition_mapping=partition_mapping)},
)
def asset_multi_2(context, asset_multi_1):
ic()
ic(context.partition_key)
ic(context.partition_key.keys_by_dimension)
ic(asset_multi_1)
partition_key = context.asset_partition_key_for_output()
ic(partition_key)
return f"Processed data for {partition_key}"