attempt to overwrite run launch container kwargs
This commit is contained in:
@@ -1,6 +1,9 @@
|
|||||||
import os
|
import os
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
import assets
|
import assets
|
||||||
|
import yaml
|
||||||
|
from dagster_docker import docker_executor
|
||||||
from dagster_polars import PolarsParquetIOManager
|
from dagster_polars import PolarsParquetIOManager
|
||||||
from icecream import install
|
from icecream import install
|
||||||
from jobs import check_partitions_job, deals_job
|
from jobs import check_partitions_job, deals_job
|
||||||
@@ -12,6 +15,50 @@ import dagster as dg
|
|||||||
APP = os.environ["APP"]
|
APP = os.environ["APP"]
|
||||||
|
|
||||||
install()
|
install()
|
||||||
|
|
||||||
|
|
||||||
|
def deep_merge_dicts(base: dict, override: dict) -> dict:
|
||||||
|
"""
|
||||||
|
Recursively merge two dictionaries.
|
||||||
|
Values from `override` will overwrite or be merged into `base`.
|
||||||
|
"""
|
||||||
|
result = base.copy()
|
||||||
|
for key, override_value in override.items():
|
||||||
|
base_value = result.get(key)
|
||||||
|
if isinstance(base_value, dict) and isinstance(override_value, dict):
|
||||||
|
result[key] = deep_merge_dicts(base_value, override_value)
|
||||||
|
else:
|
||||||
|
result[key] = override_value
|
||||||
|
return result
|
||||||
|
|
||||||
|
|
||||||
|
path = Path("dagster.yaml")
|
||||||
|
if path.exists():
|
||||||
|
with path.open() as fp:
|
||||||
|
tmp = yaml.safe_load(fp)
|
||||||
|
print(tmp)
|
||||||
|
else:
|
||||||
|
tmp = {}
|
||||||
|
|
||||||
|
tmp = tmp.get("run_launcher", {}).get("config", {})
|
||||||
|
res = deep_merge_dicts(
|
||||||
|
tmp,
|
||||||
|
{
|
||||||
|
"container_kwargs": {
|
||||||
|
"volumes": [
|
||||||
|
"/opt/dagster/shared/:/shared/:ro",
|
||||||
|
"/opt/dagster/apps/:/apps:ro",
|
||||||
|
"/opt/dagster/storage/:/storage/:rw",
|
||||||
|
"/opt/dagster/logs/:/logs:rw",
|
||||||
|
"/opt/dagster/cache/:/cache:rw",
|
||||||
|
"/tmp/something:/something:rw",
|
||||||
|
]
|
||||||
|
}
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
print(res)
|
||||||
|
|
||||||
definitions = dg.Definitions(
|
definitions = dg.Definitions(
|
||||||
assets=[
|
assets=[
|
||||||
asset.with_attributes(
|
asset.with_attributes(
|
||||||
@@ -35,4 +82,5 @@ definitions = dg.Definitions(
|
|||||||
},
|
},
|
||||||
jobs=[deals_job, check_partitions_job],
|
jobs=[deals_job, check_partitions_job],
|
||||||
schedules=[deals_schedule],
|
schedules=[deals_schedule],
|
||||||
|
executor=docker_executor.configured(res),
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -32,6 +32,7 @@ services:
|
|||||||
<<: [ *dagster_env, *email_env ]
|
<<: [ *dagster_env, *email_env ]
|
||||||
DAGSTER_CURRENT_IMAGE: dagster-code-vinyl
|
DAGSTER_CURRENT_IMAGE: dagster-code-vinyl
|
||||||
volumes:
|
volumes:
|
||||||
|
- /opt/dagster/dagster.yaml:/opt/dagster/home/dagster.yaml:ro
|
||||||
- /opt/dagster/apps/:/apps/:ro
|
- /opt/dagster/apps/:/apps/:ro
|
||||||
- /opt/dagster/shared/:/shared/:ro
|
- /opt/dagster/shared/:/shared/:ro
|
||||||
- /opt/dagster/logs/:/logs:rw
|
- /opt/dagster/logs/:/logs:rw
|
||||||
|
|||||||
@@ -31,6 +31,7 @@ run_launcher:
|
|||||||
- /opt/dagster/apps/:/apps:ro
|
- /opt/dagster/apps/:/apps:ro
|
||||||
- /opt/dagster/storage/:/storage/:rw
|
- /opt/dagster/storage/:/storage/:rw
|
||||||
- /opt/dagster/logs/:/logs:rw
|
- /opt/dagster/logs/:/logs:rw
|
||||||
|
- /opt/dagster/cache/:/cache:rw
|
||||||
|
|
||||||
run_storage:
|
run_storage:
|
||||||
module: dagster_postgres.run_storage
|
module: dagster_postgres.run_storage
|
||||||
|
|||||||
Reference in New Issue
Block a user