From be8fae969c5bfb3b47a592ab355988d59202fa66 Mon Sep 17 00:00:00 2001 From: Rik Veenboer Date: Tue, 22 Jul 2025 12:03:03 +0200 Subject: [PATCH] fix paths to storage and logs --- apps/other/src/assets.py | 12 ------------ apps/other/src/definitions.py | 6 ++++-- apps/other/src/mapping.py | 1 - apps/vinyl/src/assets.py | 2 +- apps/vinyl/src/definitions.py | 2 +- compose.code.yaml | 2 ++ compose.system.yaml | 1 + dagster.yaml | 7 +++++++ 8 files changed, 16 insertions(+), 17 deletions(-) diff --git a/apps/other/src/assets.py b/apps/other/src/assets.py index da895fd..3b06b11 100644 --- a/apps/other/src/assets.py +++ b/apps/other/src/assets.py @@ -1,4 +1,3 @@ -import pandas as pd import polars as pl from dagster import ( @@ -23,16 +22,6 @@ partitions_def_multi = MultiPartitionsDefinition( ) -@asset( - # tags={ - # "dagster/executor": "other_executor" - # }, -) -def dummy_asset(): - """A dummy asset to ensure the module is recognized by Dagster.""" - return pd.DataFrame({"dummy": [1, 2, 3]}) - - @asset( io_manager_key="polars_parquet_io_manager", partitions_def=partitions_def_single, @@ -58,7 +47,6 @@ def asset_single_1(context): def asset_multi_1(context): ic() ic(context.partition_key) - return pl.DataFrame( [{"date": context.partition_key, "data": f"Data for {context.partition_key}"}] ) diff --git a/apps/other/src/definitions.py b/apps/other/src/definitions.py index 50631b6..46bc5a7 100644 --- a/apps/other/src/definitions.py +++ b/apps/other/src/definitions.py @@ -9,7 +9,7 @@ install() # Define a job that includes both assets daily_job = define_asset_job( "daily_job", - selection=[assets.dummy_asset, assets.asset_multi_1, assets.asset_multi_2], + selection=[assets.asset_multi_1, assets.asset_multi_2], ) definitions = Definitions( @@ -20,6 +20,8 @@ definitions = Definitions( ) for asset in load_assets_from_modules([assets]) ], - resources={"polars_parquet_io_manager": PolarsParquetIOManager()}, + resources={ + "polars_parquet_io_manager": PolarsParquetIOManager(base_dir="/storage") + }, jobs=[daily_job], ) diff --git a/apps/other/src/mapping.py b/apps/other/src/mapping.py index ea4f21b..411809f 100644 --- a/apps/other/src/mapping.py +++ b/apps/other/src/mapping.py @@ -11,7 +11,6 @@ from dagster._core.instance import DynamicPartitionsStore from dagster._serdes import whitelist_for_serdes -# @whitelist_for_serdes class LatestTwoPartitionsMapping(PartitionMapping): def get_upstream_mapped_partitions_result_for_partitions( self, diff --git a/apps/vinyl/src/assets.py b/apps/vinyl/src/assets.py index 260bc21..e74ddff 100644 --- a/apps/vinyl/src/assets.py +++ b/apps/vinyl/src/assets.py @@ -110,7 +110,7 @@ def deals(context): @asset(deps=[deals], io_manager_key="polars_parquet_io_manager") def new_deals(context: OpExecutionContext) -> pl.DataFrame: ic() - storage_dir = context.instance.storage_directory() + storage_dir = context.resources.polars_parquet_io_manager.base_dir asset_key = "deals" with duckdb.connect() as con: diff --git a/apps/vinyl/src/definitions.py b/apps/vinyl/src/definitions.py index aa675b7..386efe1 100644 --- a/apps/vinyl/src/definitions.py +++ b/apps/vinyl/src/definitions.py @@ -29,7 +29,7 @@ definitions = Definitions( for asset in load_assets_from_modules([assets]) ], resources={ - "polars_parquet_io_manager": PolarsParquetIOManager(), + "polars_parquet_io_manager": PolarsParquetIOManager(base_dir="/storage"), "duckdb_io_manager": PandasDuckDBIOManager(database="vinyl"), }, jobs=[deals_job, check_partititions_job, musicbrainz_lookup_job], diff --git a/compose.code.yaml b/compose.code.yaml index 32fea89..bd57798 100644 --- a/compose.code.yaml +++ b/compose.code.yaml @@ -25,6 +25,7 @@ services: DAGSTER_CURRENT_IMAGE: user_code_vinyl volumes: - /opt/dagster/apps/:/apps/:ro + - /opt/dagster/logs/:/logs:rw - /opt/dagster/storage/import/:/storage/import/:ro - /opt/dagster/storage/deals/:/storage/deals/:rw networks: @@ -45,5 +46,6 @@ services: DAGSTER_CURRENT_IMAGE: user_code_other volumes: - /opt/dagster/apps/:/apps:ro + - /opt/dagster/logs/:/logs:rw networks: - dagster diff --git a/compose.system.yaml b/compose.system.yaml index 9be4019..e66d096 100644 --- a/compose.system.yaml +++ b/compose.system.yaml @@ -19,6 +19,7 @@ x-volumes: &volumes - /opt/dagster/dagster.yaml:/opt/dagster/home/dagster.yaml:ro - /opt/dagster/workspace.yaml:/opt/dagster/home/workspace.yaml:ro - /opt/dagster/storage/:/storage/:rw + - /opt/dagster/logs/:/logs:rw - /var/run/docker.sock:/var/run/docker.sock:rw services: diff --git a/dagster.yaml b/dagster.yaml index 8767e40..fbe6f05 100644 --- a/dagster.yaml +++ b/dagster.yaml @@ -23,6 +23,7 @@ run_launcher: volumes: - /opt/dagster/apps/:/apps:ro - /opt/dagster/storage/:/storage/:rw + - /opt/dagster/logs/:/logs:rw run_storage: module: dagster_postgres.run_storage @@ -71,3 +72,9 @@ event_log_storage: env: DAGSTER_POSTGRES_PASSWORD db_name: env: DAGSTER_POSTGRES_DB + +compute_logs: + module: dagster.core.storage.local_compute_log_manager + class: LocalComputeLogManager + config: + base_dir: /logs