diff --git a/apps/stocks/src/assets.py b/apps/stocks/src/assets.py index d0afd1b..6c4f984 100644 --- a/apps/stocks/src/assets.py +++ b/apps/stocks/src/assets.py @@ -109,11 +109,3 @@ def daily_html() -> str: ... class MyAssetConfig(dg.Config): image: str = "bla" - - -@asset(metadata={"docker_image": "my-other-image:latest"}) -def my_asset(context: dg.AssetExecutionContext, config: MyAssetConfig) -> None: - ic(context.op_config) - ic(config) - # image = context.op_config.image - # context.log.info(f"This asset wants to use Docker image: {image}") diff --git a/apps/stocks/src/definitions.py b/apps/stocks/src/definitions.py index ddbe2e2..9041cd3 100644 --- a/apps/stocks/src/definitions.py +++ b/apps/stocks/src/definitions.py @@ -22,6 +22,6 @@ definitions = dg.Definitions( "html_io_manager": HtmlIOManager(base_dir=STORAGE_DIR), "polars_parquet_io_manager": PolarsParquetIOManager(base_dir=STORAGE_DIR), }, - jobs=[jobs.test_job], + jobs=[jobs.raw_html_job], sensors=[sensors.check_update], ) diff --git a/apps/stocks/src/jobs.py b/apps/stocks/src/jobs.py index 593159a..1c00465 100644 --- a/apps/stocks/src/jobs.py +++ b/apps/stocks/src/jobs.py @@ -2,10 +2,8 @@ import assets import dagster as dg -raw_html_job = dg.define_asset_job("deals_job", selection=[assets.raw_html.key]) - -test_job = dg.define_asset_job( - "my_asset_job", - selection=[assets.my_asset.key], - tags={"docker/image": "my.registry/image:from-config"}, +raw_html_job = dg.define_asset_job( + "raw_html_job", + selection=[assets.raw_html.key], + tags={"docker/image": "dagster-code-stocks-playwright"}, ) diff --git a/compose.code.yaml b/compose.code.yaml index 6b73ec8..8d3b19f 100644 --- a/compose.code.yaml +++ b/compose.code.yaml @@ -43,7 +43,7 @@ services: dagster-code-stocks: build: context: apps/stocks - dockerfile: ../../Dockerfile.code.playwright + dockerfile: ../../Dockerfile.code args: - APP=stocks container_name: dagster-code-stocks diff --git a/system/__init__.py b/system/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/system/run_launcher.py b/system/run_launcher.py new file mode 100644 index 0000000..bbb8825 --- /dev/null +++ b/system/run_launcher.py @@ -0,0 +1,44 @@ +import structlog +from dagster_docker import DockerRunLauncher + +import dagster._check as check +from dagster._core.launcher.base import ( + LaunchRunContext, +) +from dagster._grpc.types import ExecuteRunArgs + +logger = structlog.get_logger(__name__) + + +class CustomDockerRunLauncher(DockerRunLauncher): + """ + Custom Docker Run Launcher. + + This class is an extension of the DockerRunLauncher, allowing for a custom Docker image + to be specified via the run tags. It overrides the `launch_run` method to accommodate + the custom image logic. If no custom image is provided in the tags, it defaults to + the behavior of the parent class. It also generates command arguments for the execution + container and launches the container with these arguments. + """ + + def launch_run(self, context: LaunchRunContext) -> None: + """ + Launches a run in a Docker container with the specified command and image. + + This function retrieves the Docker image from the run tags or determines it based on the + given job code origin. It then constructs the command to execute the run and launches + a Docker container to perform the execution. + """ + + run = context.dagster_run + job_code_origin = check.not_none(context.job_code_origin) + if not (docker_image := context.dagster_run.tags.get("docker/image")): + docker_image = self._get_docker_image(job_code_origin) + + command = ExecuteRunArgs( + job_origin=job_code_origin, + run_id=run.run_id, + instance_ref=self._instance.get_ref(), + ).get_command_args() + + self._launch_container_with_command(run, docker_image, command)