experiments with materialization
This commit is contained in:
@@ -1,4 +1,6 @@
|
|||||||
|
import os
|
||||||
import sys
|
import sys
|
||||||
|
from collections.abc import Iterator
|
||||||
from functools import partial
|
from functools import partial
|
||||||
from logging import getLogger
|
from logging import getLogger
|
||||||
|
|
||||||
@@ -46,3 +48,27 @@ def iris_cleaned(table: pa.Table) -> pa.Table:
|
|||||||
df = table.to_pandas()
|
df = table.to_pandas()
|
||||||
result_df = df.dropna().drop_duplicates()
|
result_df = df.dropna().drop_duplicates()
|
||||||
return pa.Table.from_pandas(result_df)
|
return pa.Table.from_pandas(result_df)
|
||||||
|
|
||||||
|
|
||||||
|
daily_partitions_def = dg.DailyPartitionsDefinition(
|
||||||
|
start_date="2024-09-01", end_offset=1, timezone=os.environ.get("TZ", "UTC")
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@asset(partitions_def=daily_partitions_def)
|
||||||
|
def one() -> Iterator[dg.Output[int]]:
|
||||||
|
# Attempt to materialize multiple times
|
||||||
|
yield dg.Output(1)
|
||||||
|
yield dg.Output(2)
|
||||||
|
|
||||||
|
# Error: Compute for op "other__one" returned an output "result" multiple times
|
||||||
|
|
||||||
|
|
||||||
|
@asset()
|
||||||
|
def two(context: dg.AssetExecutionContext) -> None:
|
||||||
|
# Attempt to log materialization of other asset
|
||||||
|
context.log_event(
|
||||||
|
dg.AssetMaterialization(
|
||||||
|
asset_key=one.key, partition="2025-07-27"
|
||||||
|
) # this works!
|
||||||
|
)
|
||||||
|
|||||||
Reference in New Issue
Block a user