demo of delta lake with forked io manager

This commit is contained in:
2025-07-29 15:37:37 +02:00
parent f59b30b9ea
commit 4c59dcb8ac
6 changed files with 64 additions and 6 deletions

View File

@@ -2,11 +2,15 @@ import sys
from functools import partial
from logging import getLogger
import pandas as pd
import pyarrow as pa
from config import APP
import dagster as dg
asset = partial(dg.asset, key_prefix=APP)
TAGS = {"app": APP}
asset = partial(dg.asset, key_prefix=APP, tags=TAGS)
@asset()
@@ -18,3 +22,27 @@ def logging(context):
sys.__stderr__.write("This goes to stderr!\n")
getLogger("mylogger").info("This is an info message from mylogger")
@asset(io_manager_key="delta_io_manager")
def iris_dataset() -> pa.Table:
df = pd.read_csv(
"https://docs.dagster.io/assets/iris.csv",
names=[
"sepal_length_cm",
"sepal_width_cm",
"petal_length_cm",
"petal_width_cm",
"species",
],
)
return pa.Table.from_pandas(df)
@asset(
io_manager_key="delta_io_manager", ins={"table": dg.AssetIn(key=iris_dataset.key)}
)
def iris_cleaned(table: pa.Table) -> pa.Table:
df = table.to_pandas()
result_df = df.dropna().drop_duplicates()
return pa.Table.from_pandas(result_df)

View File

@@ -1,4 +1,5 @@
import assets
from dagster_delta import DeltaLakePyarrowIOManager, LocalConfig, WriteMode
from dagster_polars import PolarsParquetIOManager
from icecream import install
from shared.config import APP, STORAGE_DIR
@@ -11,11 +12,16 @@ definitions = dg.Definitions(
assets=[
asset.with_attributes(
group_names_by_key={asset.key: APP},
tags_by_key={asset.key: {"app": APP}},
)
for asset in dg.load_assets_from_modules([assets])
],
resources={
"polars_parquet_io_manager": PolarsParquetIOManager(base_dir=STORAGE_DIR)
"polars_parquet_io_manager": PolarsParquetIOManager(base_dir=STORAGE_DIR),
"delta_io_manager": DeltaLakePyarrowIOManager(
root_uri=STORAGE_DIR,
storage_options=LocalConfig(),
mode=WriteMode.overwrite,
parquet_read_options={"coerce_int96_timestamp_unit": "us"},
),
},
)