import structlog from dagster_docker import DockerRunLauncher import dagster._check as check from dagster._core.launcher.base import ( LaunchRunContext, ) from dagster._grpc.types import ExecuteRunArgs logger = structlog.get_logger(__name__) class CustomDockerRunLauncher(DockerRunLauncher): """ Custom Docker Run Launcher. This class is an extension of the DockerRunLauncher, allowing for a custom Docker image to be specified via the run tags. It overrides the `launch_run` method to accommodate the custom image logic. If no custom image is provided in the tags, it defaults to the behavior of the parent class. It also generates command arguments for the execution container and launches the container with these arguments. """ def launch_run(self, context: LaunchRunContext) -> None: """ Launches a run in a Docker container with the specified command and image. This function retrieves the Docker image from the run tags or determines it based on the given job code origin. It then constructs the command to execute the run and launches a Docker container to perform the execution. """ run = context.dagster_run job_code_origin = check.not_none(context.job_code_origin) if not (docker_image := context.dagster_run.tags.get("docker/image")): docker_image = self._get_docker_image(job_code_origin) command = ExecuteRunArgs( job_origin=job_code_origin, run_id=run.run_id, instance_ref=self._instance.get_ref(), ).get_command_args() self._launch_container_with_command(run, docker_image, command)