custom run launcher for custom docker image
This commit is contained in:
@@ -109,11 +109,3 @@ def daily_html() -> str: ...
|
|||||||
|
|
||||||
class MyAssetConfig(dg.Config):
|
class MyAssetConfig(dg.Config):
|
||||||
image: str = "bla"
|
image: str = "bla"
|
||||||
|
|
||||||
|
|
||||||
@asset(metadata={"docker_image": "my-other-image:latest"})
|
|
||||||
def my_asset(context: dg.AssetExecutionContext, config: MyAssetConfig) -> None:
|
|
||||||
ic(context.op_config)
|
|
||||||
ic(config)
|
|
||||||
# image = context.op_config.image
|
|
||||||
# context.log.info(f"This asset wants to use Docker image: {image}")
|
|
||||||
|
|||||||
@@ -22,6 +22,6 @@ definitions = dg.Definitions(
|
|||||||
"html_io_manager": HtmlIOManager(base_dir=STORAGE_DIR),
|
"html_io_manager": HtmlIOManager(base_dir=STORAGE_DIR),
|
||||||
"polars_parquet_io_manager": PolarsParquetIOManager(base_dir=STORAGE_DIR),
|
"polars_parquet_io_manager": PolarsParquetIOManager(base_dir=STORAGE_DIR),
|
||||||
},
|
},
|
||||||
jobs=[jobs.test_job],
|
jobs=[jobs.raw_html_job],
|
||||||
sensors=[sensors.check_update],
|
sensors=[sensors.check_update],
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -2,10 +2,8 @@ import assets
|
|||||||
|
|
||||||
import dagster as dg
|
import dagster as dg
|
||||||
|
|
||||||
raw_html_job = dg.define_asset_job("deals_job", selection=[assets.raw_html.key])
|
raw_html_job = dg.define_asset_job(
|
||||||
|
"raw_html_job",
|
||||||
test_job = dg.define_asset_job(
|
selection=[assets.raw_html.key],
|
||||||
"my_asset_job",
|
tags={"docker/image": "dagster-code-stocks-playwright"},
|
||||||
selection=[assets.my_asset.key],
|
|
||||||
tags={"docker/image": "my.registry/image:from-config"},
|
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -43,7 +43,7 @@ services:
|
|||||||
dagster-code-stocks:
|
dagster-code-stocks:
|
||||||
build:
|
build:
|
||||||
context: apps/stocks
|
context: apps/stocks
|
||||||
dockerfile: ../../Dockerfile.code.playwright
|
dockerfile: ../../Dockerfile.code
|
||||||
args:
|
args:
|
||||||
- APP=stocks
|
- APP=stocks
|
||||||
container_name: dagster-code-stocks
|
container_name: dagster-code-stocks
|
||||||
|
|||||||
0
system/__init__.py
Normal file
0
system/__init__.py
Normal file
44
system/run_launcher.py
Normal file
44
system/run_launcher.py
Normal file
@@ -0,0 +1,44 @@
|
|||||||
|
import structlog
|
||||||
|
from dagster_docker import DockerRunLauncher
|
||||||
|
|
||||||
|
import dagster._check as check
|
||||||
|
from dagster._core.launcher.base import (
|
||||||
|
LaunchRunContext,
|
||||||
|
)
|
||||||
|
from dagster._grpc.types import ExecuteRunArgs
|
||||||
|
|
||||||
|
logger = structlog.get_logger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class CustomDockerRunLauncher(DockerRunLauncher):
|
||||||
|
"""
|
||||||
|
Custom Docker Run Launcher.
|
||||||
|
|
||||||
|
This class is an extension of the DockerRunLauncher, allowing for a custom Docker image
|
||||||
|
to be specified via the run tags. It overrides the `launch_run` method to accommodate
|
||||||
|
the custom image logic. If no custom image is provided in the tags, it defaults to
|
||||||
|
the behavior of the parent class. It also generates command arguments for the execution
|
||||||
|
container and launches the container with these arguments.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def launch_run(self, context: LaunchRunContext) -> None:
|
||||||
|
"""
|
||||||
|
Launches a run in a Docker container with the specified command and image.
|
||||||
|
|
||||||
|
This function retrieves the Docker image from the run tags or determines it based on the
|
||||||
|
given job code origin. It then constructs the command to execute the run and launches
|
||||||
|
a Docker container to perform the execution.
|
||||||
|
"""
|
||||||
|
|
||||||
|
run = context.dagster_run
|
||||||
|
job_code_origin = check.not_none(context.job_code_origin)
|
||||||
|
if not (docker_image := context.dagster_run.tags.get("docker/image")):
|
||||||
|
docker_image = self._get_docker_image(job_code_origin)
|
||||||
|
|
||||||
|
command = ExecuteRunArgs(
|
||||||
|
job_origin=job_code_origin,
|
||||||
|
run_id=run.run_id,
|
||||||
|
instance_ref=self._instance.get_ref(),
|
||||||
|
).get_command_args()
|
||||||
|
|
||||||
|
self._launch_container_with_command(run, docker_image, command)
|
||||||
Reference in New Issue
Block a user