From 1a3c288001386aec2c5f7e6dd24b07839a9987ee Mon Sep 17 00:00:00 2001 From: Rik Veenboer Date: Mon, 21 Jul 2025 10:56:46 +0200 Subject: [PATCH] tidy up --- apps/other/__init__.py | 0 apps/other/src/__init__.py | 3 --- apps/other/src/definitions.py | 29 ++++++++++++++++------------- apps/vinyl/__init__.py | 0 apps/vinyl/src/__init__.py | 3 --- apps/vinyl/src/assets.py | 5 ----- apps/vinyl/src/definitions.py | 33 +++++++++++++-------------------- apps/vinyl/src/test.py | 2 +- compose.code.yaml | 2 -- compose.system.yaml | 13 ++++--------- dagster.yaml | 7 ++----- 11 files changed, 36 insertions(+), 61 deletions(-) delete mode 100644 apps/other/__init__.py delete mode 100644 apps/vinyl/__init__.py diff --git a/apps/other/__init__.py b/apps/other/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/apps/other/src/__init__.py b/apps/other/src/__init__.py index ef37bac..e69de29 100644 --- a/apps/other/src/__init__.py +++ b/apps/other/src/__init__.py @@ -1,3 +0,0 @@ -from icecream import install - -install() diff --git a/apps/other/src/definitions.py b/apps/other/src/definitions.py index 1de39ed..50631b6 100644 --- a/apps/other/src/definitions.py +++ b/apps/other/src/definitions.py @@ -1,22 +1,25 @@ import assets from dagster_polars import PolarsParquetIOManager +from icecream import install -from dagster import Definitions, load_assets_from_modules +from dagster import Definitions, define_asset_job, load_assets_from_modules + +install() # Define a job that includes both assets -# daily_job = define_asset_job("daily_job", selection=[dummy_asset, asset_multi_1, asset_multi_2]) +daily_job = define_asset_job( + "daily_job", + selection=[assets.dummy_asset, assets.asset_multi_1, assets.asset_multi_2], +) definitions = Definitions( - assets=load_assets_from_modules([assets]), - # [dummy_asset], # , asset_single_1, asset_multi_1, asset_single_2, asset_multi_2], + assets=[ + asset.with_attributes( + group_names_by_key={asset.key: "other"}, + tags_by_key={asset.key: {"app": "other"}}, + ) + for asset in load_assets_from_modules([assets]) + ], resources={"polars_parquet_io_manager": PolarsParquetIOManager()}, - # jobs=[daily_job], - # executor=docker_executor.configured({"container_kwargs": { - # "volumes": [ - # "/opt/dagster/apps/other/src/:/opt/dagster/home/app/", - # "/opt/dagster/storage/:/storage/" - # ] - # } - # } - # ) + jobs=[daily_job], ) diff --git a/apps/vinyl/__init__.py b/apps/vinyl/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/apps/vinyl/src/__init__.py b/apps/vinyl/src/__init__.py index ef37bac..e69de29 100644 --- a/apps/vinyl/src/__init__.py +++ b/apps/vinyl/src/__init__.py @@ -1,3 +0,0 @@ -from icecream import install - -install() diff --git a/apps/vinyl/src/assets.py b/apps/vinyl/src/assets.py index 9ccda83..260bc21 100644 --- a/apps/vinyl/src/assets.py +++ b/apps/vinyl/src/assets.py @@ -55,11 +55,6 @@ partition_mapping = MultiPartitionMapping( "partition_by": ["date", "source"], }, config_schema={"import_dir": Field(str, default_value="/storage/import")}, - # tags={ - # "dagster/executor": "vinyl_executor", - # "app": "vinyl" - # }, - # group_name="vinylllll" ) def deals(context): ic() diff --git a/apps/vinyl/src/definitions.py b/apps/vinyl/src/definitions.py index 7655713..3925f5d 100644 --- a/apps/vinyl/src/definitions.py +++ b/apps/vinyl/src/definitions.py @@ -1,15 +1,16 @@ from collections.abc import Sequence -from assets import deals, new_deals, works +import assets from dagster_duckdb import DuckDBIOManager from dagster_duckdb.io_manager import DbTypeHandler from dagster_duckdb_pandas import DuckDBPandasTypeHandler from dagster_polars import PolarsParquetIOManager +from icecream import install from jobs import check_partititions_job, deals_job, musicbrainz_lookup_job from schedules import deals_schedule from sensors import musicbrainz_lookup_sensor -from dagster import Definitions +from dagster import Definitions, load_assets_from_modules class PandasDuckDBIOManager(DuckDBIOManager): @@ -18,25 +19,15 @@ class PandasDuckDBIOManager(DuckDBIOManager): return [DuckDBPandasTypeHandler()] -deals.with_attributes() -assets = [] -for asset in [deals, new_deals, works]: - print(asset.tags_by_key) - # for k, v in { - # "dagster/executor": "vinyl_executor", - # "app": "vinyl" - # }.items(): - # pass - # asset._specs_by_key[asset.key][k] = v - assets.append( - asset.with_attributes( - tags_by_key={ - asset.key: {"dagster/executor": "vinyl_executor", "app": "vinyl"} - } - ) - ) +install() definitions = Definitions( - assets=assets, + assets=[ + asset.with_attributes( + group_names_by_key={asset.key: "vinyl"}, + tags_by_key={asset.key: {"app": "vinyl"}}, + ) + for asset in load_assets_from_modules([assets]) + ], resources={ "polars_parquet_io_manager": PolarsParquetIOManager(), "duckdb_io_manager": PandasDuckDBIOManager(database="vinyl"), @@ -45,3 +36,5 @@ definitions = Definitions( schedules=[deals_schedule], sensors=[musicbrainz_lookup_sensor], ) + +ic("jo") diff --git a/apps/vinyl/src/test.py b/apps/vinyl/src/test.py index 0d118f4..fca8389 100644 --- a/apps/vinyl/src/test.py +++ b/apps/vinyl/src/test.py @@ -28,7 +28,7 @@ def test_deals(source="sounds", date: str = None): resources=resources, run_config={ "loggers": {"console": {"config": {"log_level": "ERROR"}}}, - "ops": {"deals": {"config": {"import_dir": "/opt/dagster/storage/import"}}}, + "ops": {"deals": {"config": {"import_dir": "/storage/import"}}}, }, ) assert result.success diff --git a/compose.code.yaml b/compose.code.yaml index f643419..32fea89 100644 --- a/compose.code.yaml +++ b/compose.code.yaml @@ -27,7 +27,6 @@ services: - /opt/dagster/apps/:/apps/:ro - /opt/dagster/storage/import/:/storage/import/:ro - /opt/dagster/storage/deals/:/storage/deals/:rw - # - /opt/dagster/apps/vinyl/src/:/opt/dagster/home/app/ networks: - dagster @@ -45,7 +44,6 @@ services: <<: *dagster_env DAGSTER_CURRENT_IMAGE: user_code_other volumes: - # - /opt/dagster/apps/other/src/:/opt/dagster/home/app/ - /opt/dagster/apps/:/apps:ro networks: - dagster diff --git a/compose.system.yaml b/compose.system.yaml index cf8c514..9be4019 100644 --- a/compose.system.yaml +++ b/compose.system.yaml @@ -16,15 +16,10 @@ x-dagster-env: &dagster_env x-volumes: &volumes volumes: - - /opt/dagster/dagster.yaml:/opt/dagster/home/dagster.yaml - - /opt/dagster/workspace.yaml:/opt/dagster/home/workspace.yaml - - /var/run/docker.sock:/var/run/docker.sock - - - /opt/dagster/storage/import/:/storage/import/ - - /opt/dagster/storage/deals/:/storage/deals/ - - - /opt/dagster/src/app/:/opt/dagster/home/app/ - - /opt/dagster/src/repo.py:/opt/dagster/home/repo.py + - /opt/dagster/dagster.yaml:/opt/dagster/home/dagster.yaml:ro + - /opt/dagster/workspace.yaml:/opt/dagster/home/workspace.yaml:ro + - /opt/dagster/storage/:/storage/:rw + - /var/run/docker.sock:/var/run/docker.sock:rw services: # This service runs the postgres DB used by dagster for run storage, schedule storage, diff --git a/dagster.yaml b/dagster.yaml index 5010d4a..8767e40 100644 --- a/dagster.yaml +++ b/dagster.yaml @@ -18,14 +18,11 @@ run_launcher: - DAGSTER_POSTGRES_USER - DAGSTER_POSTGRES_PASSWORD - DAGSTER_POSTGRES_DB - - PYTHONPATH=app network: dagster container_kwargs: volumes: - - /opt/dagster/storage/import/:/opt/dagster/home/storage/import/ - - /opt/dagster/storage/deals/:/opt/dagster/home/storage/deals/ - - /opt/dagster/apps/vinyl/src/:/opt/dagster/home/app/ - - /opt/dagster/storage/:/opt/dagster/home/storage/ + - /opt/dagster/apps/:/apps:ro + - /opt/dagster/storage/:/storage/:rw run_storage: module: dagster_postgres.run_storage