Files
dagster/system/run_launcher.py

45 lines
1.7 KiB
Python

import structlog
from dagster_docker import DockerRunLauncher
import dagster._check as check
from dagster._core.launcher.base import (
LaunchRunContext,
)
from dagster._grpc.types import ExecuteRunArgs
logger = structlog.get_logger(__name__)
class CustomDockerRunLauncher(DockerRunLauncher):
"""
Custom Docker Run Launcher.
This class is an extension of the DockerRunLauncher, allowing for a custom Docker image
to be specified via the run tags. It overrides the `launch_run` method to accommodate
the custom image logic. If no custom image is provided in the tags, it defaults to
the behavior of the parent class. It also generates command arguments for the execution
container and launches the container with these arguments.
"""
def launch_run(self, context: LaunchRunContext) -> None:
"""
Launches a run in a Docker container with the specified command and image.
This function retrieves the Docker image from the run tags or determines it based on the
given job code origin. It then constructs the command to execute the run and launches
a Docker container to perform the execution.
"""
run = context.dagster_run
job_code_origin = check.not_none(context.job_code_origin)
if not (docker_image := context.dagster_run.tags.get("docker/image")):
docker_image = self._get_docker_image(job_code_origin)
command = ExecuteRunArgs(
job_origin=job_code_origin,
run_id=run.run_id,
instance_ref=self._instance.get_ref(),
).get_command_args()
self._launch_container_with_command(run, docker_image, command)