shared config

This commit is contained in:
2025-07-29 11:38:41 +02:00
parent cc6beeffed
commit f59b30b9ea
6 changed files with 17 additions and 44 deletions

View File

@@ -1,6 +1,7 @@
import assets
from dagster_polars import PolarsParquetIOManager
from icecream import install
from shared.config import APP, STORAGE_DIR
import dagster as dg
@@ -9,12 +10,12 @@ install()
definitions = dg.Definitions(
assets=[
asset.with_attributes(
group_names_by_key={asset.key: "other"},
tags_by_key={asset.key: {"app": "other"}},
group_names_by_key={asset.key: APP},
tags_by_key={asset.key: {"app": APP}},
)
for asset in dg.load_assets_from_modules([assets])
],
resources={
"polars_parquet_io_manager": PolarsParquetIOManager(base_dir="/storage")
"polars_parquet_io_manager": PolarsParquetIOManager(base_dir=STORAGE_DIR)
},
)

View File

@@ -1,20 +1,15 @@
import os
import assets
import sensors
from dagster_polars import PolarsParquetIOManager
from icecream import install
from resources import HtmlIOManager
from shared.config import APP, STORAGE_DIR
import dagster as dg
from dagster import load_assets_from_modules
install()
APP = os.environ["APP"]
storage_dir = os.environ.get("STORAGE_DIR", "/storage")
definitions = dg.Definitions(
assets=[
asset.with_attributes(
@@ -23,10 +18,8 @@ definitions = dg.Definitions(
for asset in load_assets_from_modules([assets])
],
resources={
"html_io_manager": HtmlIOManager(base_dir=storage_dir),
"polars_parquet_io_manager": PolarsParquetIOManager(
base_dir=os.environ.get("STORAGE_DIR", "/storage") + f"/{APP}"
),
"html_io_manager": HtmlIOManager(base_dir=STORAGE_DIR),
"polars_parquet_io_manager": PolarsParquetIOManager(base_dir=STORAGE_DIR),
},
sensors=[sensors.check_update],
)

View File

@@ -1,16 +1,13 @@
import os
import assets
from dagster_polars import PolarsParquetIOManager
from icecream import install
from shared.config import APP, STORAGE_DIR
import dagster as dg
from dagster import load_assets_from_modules
install()
APP = os.environ["APP"]
definitions = dg.Definitions(
assets=[
asset.with_attributes(
@@ -20,8 +17,6 @@ definitions = dg.Definitions(
for asset in load_assets_from_modules([assets])
],
resources={
"polars_parquet_io_manager": PolarsParquetIOManager(
base_dir=os.environ.get("STORAGE_DIR", "/storage") + f"/{APP}"
),
"polars_parquet_io_manager": PolarsParquetIOManager(base_dir=STORAGE_DIR),
},
)

View File

@@ -12,23 +12,7 @@ import dagster as dg
install()
def deep_merge_dicts(base: dict, override: dict) -> dict:
"""
Recursively merge two dictionaries.
Values from `override` will overwrite or be merged into `base`.
"""
result = base.copy()
for key, override_value in override.items():
base_value = result.get(key)
if isinstance(base_value, dict) and isinstance(override_value, dict):
result[key] = deep_merge_dicts(base_value, override_value)
else:
result[key] = override_value
return result
storage_dir = os.environ.get("STORAGE_DIR", "/storage")
storage_dir = os.environ.get("STORAGE_DIR", "/storage") + f"/{APP}"
definitions = dg.Definitions(
assets=[

View File

@@ -1,18 +1,14 @@
import os
import assets
import sensors
from config import APP
from dagster_polars import PolarsParquetIOManager
from icecream import install
from resources import JsonIOManager
from shared.config import APP, STORAGE_DIR
import dagster as dg
install()
storage_dir = os.environ.get("STORAGE_DIR", "/storage")
definitions = dg.Definitions(
assets=[
asset.with_attributes(
@@ -22,8 +18,8 @@ definitions = dg.Definitions(
for asset in dg.load_assets_from_modules([assets])
],
resources={
"json_io_manager": JsonIOManager(base_dir=storage_dir),
"polars_parquet_io_manager": PolarsParquetIOManager(base_dir=storage_dir),
"json_io_manager": JsonIOManager(base_dir=STORAGE_DIR),
"polars_parquet_io_manager": PolarsParquetIOManager(base_dir=STORAGE_DIR),
},
sensors=[sensors.list_locations, sensors.list_latitudes, sensors.retrieve_weather],
schedules=[

View File

@@ -0,0 +1,4 @@
import os
APP = os.environ["APP"]
STORAGE_DIR = os.environ.get("STORAGE_DIR", "/storage") + f"/{APP}"