70 lines
1.7 KiB
Python
70 lines
1.7 KiB
Python
import sys
|
|
from collections.abc import Iterator
|
|
from functools import partial
|
|
from logging import getLogger
|
|
|
|
import pandas as pd
|
|
import pyarrow as pa
|
|
from config import APP
|
|
from partitions import daily_partitions_def
|
|
|
|
import dagster as dg
|
|
|
|
TAGS = {"app": APP}
|
|
|
|
asset = partial(dg.asset, key_prefix=APP, tags=TAGS)
|
|
|
|
|
|
@asset()
|
|
def logging(context):
|
|
ic()
|
|
ic(context.partition_key)
|
|
|
|
sys.__stdout__.write("This goes to stdout!\n")
|
|
sys.__stderr__.write("This goes to stderr!\n")
|
|
|
|
getLogger("mylogger").info("This is an info message from mylogger")
|
|
|
|
|
|
@asset(io_manager_key="delta_io_manager")
|
|
def iris_dataset() -> pa.Table:
|
|
df = pd.read_csv(
|
|
"https://docs.dagster.io/assets/iris.csv",
|
|
names=[
|
|
"sepal_length_cm",
|
|
"sepal_width_cm",
|
|
"petal_length_cm",
|
|
"petal_width_cm",
|
|
"species",
|
|
],
|
|
)
|
|
return pa.Table.from_pandas(df)
|
|
|
|
|
|
@asset(
|
|
io_manager_key="delta_io_manager", ins={"table": dg.AssetIn(key=iris_dataset.key)}
|
|
)
|
|
def iris_cleaned(table: pa.Table) -> pa.Table:
|
|
df = table.to_pandas()
|
|
result_df = df.dropna().drop_duplicates()
|
|
return pa.Table.from_pandas(result_df)
|
|
|
|
|
|
@asset(partitions_def=daily_partitions_def)
|
|
def one() -> Iterator[dg.Output[int]]:
|
|
# Attempt to materialize multiple times
|
|
yield dg.Output(1)
|
|
yield dg.Output(2)
|
|
|
|
# Error: Compute for op "other__one" returned an output "result" multiple times
|
|
|
|
|
|
@asset()
|
|
def two(context: dg.AssetExecutionContext) -> None:
|
|
# Attempt to log materialization of other asset
|
|
context.log_event(
|
|
dg.AssetMaterialization(
|
|
asset_key=one.key, partition="2025-07-27"
|
|
) # this works!
|
|
)
|