diff --git a/apps/other/src/definitions.py b/apps/other/src/definitions.py index 003eb58..b86cd92 100644 --- a/apps/other/src/definitions.py +++ b/apps/other/src/definitions.py @@ -1,6 +1,7 @@ import assets from dagster_polars import PolarsParquetIOManager from icecream import install +from shared.config import APP, STORAGE_DIR import dagster as dg @@ -9,12 +10,12 @@ install() definitions = dg.Definitions( assets=[ asset.with_attributes( - group_names_by_key={asset.key: "other"}, - tags_by_key={asset.key: {"app": "other"}}, + group_names_by_key={asset.key: APP}, + tags_by_key={asset.key: {"app": APP}}, ) for asset in dg.load_assets_from_modules([assets]) ], resources={ - "polars_parquet_io_manager": PolarsParquetIOManager(base_dir="/storage") + "polars_parquet_io_manager": PolarsParquetIOManager(base_dir=STORAGE_DIR) }, ) diff --git a/apps/stocks/src/definitions.py b/apps/stocks/src/definitions.py index 9a57ae3..5d12b11 100644 --- a/apps/stocks/src/definitions.py +++ b/apps/stocks/src/definitions.py @@ -1,20 +1,15 @@ -import os - import assets import sensors from dagster_polars import PolarsParquetIOManager from icecream import install from resources import HtmlIOManager +from shared.config import APP, STORAGE_DIR import dagster as dg from dagster import load_assets_from_modules install() -APP = os.environ["APP"] - -storage_dir = os.environ.get("STORAGE_DIR", "/storage") - definitions = dg.Definitions( assets=[ asset.with_attributes( @@ -23,10 +18,8 @@ definitions = dg.Definitions( for asset in load_assets_from_modules([assets]) ], resources={ - "html_io_manager": HtmlIOManager(base_dir=storage_dir), - "polars_parquet_io_manager": PolarsParquetIOManager( - base_dir=os.environ.get("STORAGE_DIR", "/storage") + f"/{APP}" - ), + "html_io_manager": HtmlIOManager(base_dir=STORAGE_DIR), + "polars_parquet_io_manager": PolarsParquetIOManager(base_dir=STORAGE_DIR), }, sensors=[sensors.check_update], ) diff --git a/apps/tesla/src/definitions.py b/apps/tesla/src/definitions.py index 7e0e20b..eb0be9b 100644 --- a/apps/tesla/src/definitions.py +++ b/apps/tesla/src/definitions.py @@ -1,16 +1,13 @@ -import os - import assets from dagster_polars import PolarsParquetIOManager from icecream import install +from shared.config import APP, STORAGE_DIR import dagster as dg from dagster import load_assets_from_modules install() -APP = os.environ["APP"] - definitions = dg.Definitions( assets=[ asset.with_attributes( @@ -20,8 +17,6 @@ definitions = dg.Definitions( for asset in load_assets_from_modules([assets]) ], resources={ - "polars_parquet_io_manager": PolarsParquetIOManager( - base_dir=os.environ.get("STORAGE_DIR", "/storage") + f"/{APP}" - ), + "polars_parquet_io_manager": PolarsParquetIOManager(base_dir=STORAGE_DIR), }, ) diff --git a/apps/vinyl/src/definitions.py b/apps/vinyl/src/definitions.py index 30a5396..1c1f71c 100644 --- a/apps/vinyl/src/definitions.py +++ b/apps/vinyl/src/definitions.py @@ -12,23 +12,7 @@ import dagster as dg install() - -def deep_merge_dicts(base: dict, override: dict) -> dict: - """ - Recursively merge two dictionaries. - Values from `override` will overwrite or be merged into `base`. - """ - result = base.copy() - for key, override_value in override.items(): - base_value = result.get(key) - if isinstance(base_value, dict) and isinstance(override_value, dict): - result[key] = deep_merge_dicts(base_value, override_value) - else: - result[key] = override_value - return result - - -storage_dir = os.environ.get("STORAGE_DIR", "/storage") +storage_dir = os.environ.get("STORAGE_DIR", "/storage") + f"/{APP}" definitions = dg.Definitions( assets=[ diff --git a/apps/weather/src/definitions.py b/apps/weather/src/definitions.py index 312aec3..c5247cf 100644 --- a/apps/weather/src/definitions.py +++ b/apps/weather/src/definitions.py @@ -1,18 +1,14 @@ -import os - import assets import sensors -from config import APP from dagster_polars import PolarsParquetIOManager from icecream import install from resources import JsonIOManager +from shared.config import APP, STORAGE_DIR import dagster as dg install() -storage_dir = os.environ.get("STORAGE_DIR", "/storage") - definitions = dg.Definitions( assets=[ asset.with_attributes( @@ -22,8 +18,8 @@ definitions = dg.Definitions( for asset in dg.load_assets_from_modules([assets]) ], resources={ - "json_io_manager": JsonIOManager(base_dir=storage_dir), - "polars_parquet_io_manager": PolarsParquetIOManager(base_dir=storage_dir), + "json_io_manager": JsonIOManager(base_dir=STORAGE_DIR), + "polars_parquet_io_manager": PolarsParquetIOManager(base_dir=STORAGE_DIR), }, sensors=[sensors.list_locations, sensors.list_latitudes, sensors.retrieve_weather], schedules=[ diff --git a/shared/src/shared/config.py b/shared/src/shared/config.py new file mode 100644 index 0000000..9386b54 --- /dev/null +++ b/shared/src/shared/config.py @@ -0,0 +1,4 @@ +import os + +APP = os.environ["APP"] +STORAGE_DIR = os.environ.get("STORAGE_DIR", "/storage") + f"/{APP}"