diff --git a/Dockerfile.code b/Dockerfile.code index 2cc8ce7..fbc5e17 100644 --- a/Dockerfile.code +++ b/Dockerfile.code @@ -1,5 +1,7 @@ FROM python:3.12-slim +RUN apt update && apt install --no-install-recommends --yes git + # Checkout and install dagster libraries needed to run the gRPC server # exposing your repository to dagit and dagster-daemon, and to load the DagsterInstance diff --git a/apps/other/requirements.txt b/apps/other/requirements.txt index ad4367b..4b51f16 100644 --- a/apps/other/requirements.txt +++ b/apps/other/requirements.txt @@ -11,6 +11,8 @@ anyio==4.9.0 # gql # starlette # watchfiles +arro3-core==0.5.1 + # via deltalake asttokens==3.0.0 # via icecream backoff==2.2.1 @@ -51,6 +53,7 @@ dagster==1.11.3 # via # dev (pyproject.toml) # dagster-aws + # dagster-delta # dagster-docker # dagster-duckdb # dagster-duckdb-pandas @@ -60,6 +63,8 @@ dagster==1.11.3 # dagster-webserver dagster-aws==0.27.3 # via dev (pyproject.toml) +dagster-delta @ git+https://github.com/ASML-Labs/dagster-delta.git@d28de7a7c13b7071f42231234eb9231269c7c1bf#subdirectory=libraries/dagster-delta + # via dev (pyproject.toml) dagster-docker==0.27.3 # via dev (pyproject.toml) dagster-duckdb==0.27.3 @@ -82,6 +87,10 @@ dagster-shared==1.11.3 # via dagster dagster-webserver==1.11.3 # via dagit +deltalake==1.1.3 + # via dagster-delta +deprecated==1.2.18 + # via deltalake dnspython==2.7.0 # via email-validator docker==7.1.0 @@ -195,6 +204,8 @@ patito==0.8.3 # via # dev (pyproject.toml) # dagster-polars +pendulum==3.1.0 + # via dagster-delta pillow==11.3.0 # via matplotlib polars==1.31.0 @@ -237,6 +248,7 @@ python-dateutil==2.9.0.post0 # graphene # matplotlib # pandas + # pendulum python-dotenv==1.1.1 # via # dagster @@ -310,6 +322,7 @@ typing-extensions==4.14.1 # via # alembic # anyio + # arro3-core # beautifulsoup4 # dagster-polars # dagster-shared @@ -325,7 +338,9 @@ typing-inspection==0.4.1 # pydantic # pydantic-settings tzdata==2025.2 - # via pandas + # via + # pandas + # pendulum universal-pathlib==0.2.6 # via # dagster @@ -345,6 +360,8 @@ watchfiles==1.1.0 # via uvicorn websockets==15.0.1 # via uvicorn +wrapt==1.17.2 + # via deprecated xlsxwriter==3.2.5 # via dev (pyproject.toml) yarl==1.20.1 diff --git a/apps/other/src/assets.py b/apps/other/src/assets.py index 0cc93f6..6edeef6 100644 --- a/apps/other/src/assets.py +++ b/apps/other/src/assets.py @@ -2,11 +2,15 @@ import sys from functools import partial from logging import getLogger +import pandas as pd +import pyarrow as pa from config import APP import dagster as dg -asset = partial(dg.asset, key_prefix=APP) +TAGS = {"app": APP} + +asset = partial(dg.asset, key_prefix=APP, tags=TAGS) @asset() @@ -18,3 +22,27 @@ def logging(context): sys.__stderr__.write("This goes to stderr!\n") getLogger("mylogger").info("This is an info message from mylogger") + + +@asset(io_manager_key="delta_io_manager") +def iris_dataset() -> pa.Table: + df = pd.read_csv( + "https://docs.dagster.io/assets/iris.csv", + names=[ + "sepal_length_cm", + "sepal_width_cm", + "petal_length_cm", + "petal_width_cm", + "species", + ], + ) + return pa.Table.from_pandas(df) + + +@asset( + io_manager_key="delta_io_manager", ins={"table": dg.AssetIn(key=iris_dataset.key)} +) +def iris_cleaned(table: pa.Table) -> pa.Table: + df = table.to_pandas() + result_df = df.dropna().drop_duplicates() + return pa.Table.from_pandas(result_df) diff --git a/apps/other/src/definitions.py b/apps/other/src/definitions.py index b86cd92..9ee7032 100644 --- a/apps/other/src/definitions.py +++ b/apps/other/src/definitions.py @@ -1,4 +1,5 @@ import assets +from dagster_delta import DeltaLakePyarrowIOManager, LocalConfig, WriteMode from dagster_polars import PolarsParquetIOManager from icecream import install from shared.config import APP, STORAGE_DIR @@ -11,11 +12,16 @@ definitions = dg.Definitions( assets=[ asset.with_attributes( group_names_by_key={asset.key: APP}, - tags_by_key={asset.key: {"app": APP}}, ) for asset in dg.load_assets_from_modules([assets]) ], resources={ - "polars_parquet_io_manager": PolarsParquetIOManager(base_dir=STORAGE_DIR) + "polars_parquet_io_manager": PolarsParquetIOManager(base_dir=STORAGE_DIR), + "delta_io_manager": DeltaLakePyarrowIOManager( + root_uri=STORAGE_DIR, + storage_options=LocalConfig(), + mode=WriteMode.overwrite, + parquet_read_options={"coerce_int96_timestamp_unit": "us"}, + ), }, ) diff --git a/pyproject.toml b/pyproject.toml index cebb6da..79d319c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -66,7 +66,12 @@ weather = [ "requests_cache", "retry_requests" ] -other = [] +other = [ + # "deltalake>=1.0.0", + # "dagster-deltalake-pandas", + # "dagster-deltalake-polars", + "dagster-delta @ git+https://github.com/ASML-Labs/dagster-delta.git@dagster_delta-0.5.1#subdirectory=libraries/dagster-delta" +] unknown = [ "fastapi", "geopandas", diff --git a/shared/src/shared/config.py b/shared/src/shared/config.py index 9386b54..07a4cfe 100644 --- a/shared/src/shared/config.py +++ b/shared/src/shared/config.py @@ -1,4 +1,4 @@ import os APP = os.environ["APP"] -STORAGE_DIR = os.environ.get("STORAGE_DIR", "/storage") + f"/{APP}" +STORAGE_DIR = os.environ.get("STORAGE_DIR", "/storage")