import structlog from partitions import borg_repo_partitions_def from utils.borg import get_ssh_client, list_repos import dagster as dg logger = structlog.get_logger() @dg.sensor() def borg_repos(context: dg.SensorEvaluationContext) -> dg.SensorResult: existing_repos = set( context.instance.get_dynamic_partitions(borg_repo_partitions_def.name) ) with get_ssh_client() as client: parent = "/mnt/yotta/xenon/borg/" repos = set(list_repos(client, parent)) new_repos = list(set(repos) - existing_repos) return dg.SensorResult( # run_requests=[dg.RunRequest(partition_key=repo) for repo in new_repos], dynamic_partitions_requests=[ borg_repo_partitions_def.build_add_request(new_repos), ], )