diff --git a/apps/backup/Dockerfile b/apps/backup/Dockerfile new file mode 100644 index 0000000..f7268a5 --- /dev/null +++ b/apps/backup/Dockerfile @@ -0,0 +1,11 @@ +FROM dagster-code-backup-base + +RUN apt-get update \ + && apt-get install --no-install-recommends --yes \ + borgbackup openssh-client \ + && apt-get clean \ + && rm -rf /var/lib/apt/lists/* + +RUN mkdir -p /root/.ssh && chmod 0700 /root/.ssh/ +COPY --chmod=0600 id_rsa /root/.ssh/ +ADD --chmod=0600 ssh_config /root/.ssh/config diff --git a/apps/backup/src/assets.py b/apps/backup/src/assets.py new file mode 100644 index 0000000..ba02078 --- /dev/null +++ b/apps/backup/src/assets.py @@ -0,0 +1,146 @@ +import json +import os +import subprocess +import sys +from datetime import datetime +from functools import partial +from zoneinfo import ZoneInfo + +import structlog +from config import APP, BORG_HOST, BORG_ROOT +from partitions import borg_repo_partitions_def, daily_partitions_def +from shared.utils import get_partition_keys + +import dagster as dg + +asset = partial(dg.asset, key_prefix=APP) +logger = structlog.get_logger() + + +@asset( + partitions_def=dg.MultiPartitionsDefinition( + { + "date": daily_partitions_def, + "repo": borg_repo_partitions_def, + } + ) +) +def borg_archive(context: dg.AssetExecutionContext) -> None: + pass + + +@asset( + deps=[borg_archive], + partitions_def=dg.MultiPartitionsDefinition( + { + "date": daily_partitions_def, + "repo": borg_repo_partitions_def, + } + ), + automation_condition=dg.AutomationCondition.eager(), +) +def borg_archive_info(context: dg.AssetExecutionContext) -> dg.Output[None]: + partition_keys = get_partition_keys(context) + ic(partition_keys) + + location = f"ssh://{BORG_HOST}{BORG_ROOT}{partition_keys['repo']}::{partition_keys['date']}" + ic(location) + try: + result = subprocess.run( + ["borg", "info", "--json", location], + capture_output=True, + text=True, + check=True, + env={"BORG_UNKNOWN_UNENCRYPTED_REPO_ACCESS_IS_OK": "yes"}, + ) + except subprocess.CalledProcessError as e: + logger.error("borg list failed", exc_info=e, code=e.returncode) + sys.stderr.write("borg list failed\n" + e.stderr) + data = json.loads(result.stdout) + ic(data) + tmp = data["archives"][0] + + def parse_date(date_str, tz: str | None = None) -> dg.MetadataValue.timestamp: + return dg.MetadataValue.timestamp( + datetime.fromisoformat(date_str).replace( + tzinfo=ZoneInfo(tz or os.environ.get("TZ", "CET")) + ) + ) + + return dg.Output( + None, + metadata={ + "start": parse_date(tmp["start"]), + "end": parse_date(tmp["end"]), + "duration": dg.MetadataValue.float(tmp["duration"]), + "compressed_size": dg.MetadataValue.int(tmp["stats"]["compressed_size"]), + "deduplicated_size": dg.MetadataValue.int( + tmp["stats"]["deduplicated_size"] + ), + "nfiles": dg.MetadataValue.int(tmp["stats"]["nfiles"]), + "original_size": dg.MetadataValue.int(tmp["stats"]["original_size"]), + }, + ) + + # now run borg info ssh://shuttle/mnt/yotta/xenon/borg/opt/::2025-07-27 --json and register info + + +@asset( + partitions_def=borg_repo_partitions_def, +) +def borg_repo(context: dg.AssetExecutionContext) -> None: + location = f"ssh://{BORG_HOST}{BORG_ROOT}{context.partition_key}" + ic(location) + repo = context.partition_key + + # Get Borg backup list + try: + result = subprocess.run( + ["borg", "list", "--json", location], + capture_output=True, + text=True, + check=True, + env={"BORG_UNKNOWN_UNENCRYPTED_REPO_ACCESS_IS_OK": "yes"}, + ) + except subprocess.CalledProcessError as e: + logger.error("borg list failed", exc_info=e, code=e.returncode) + sys.stderr.write("borg list failed\n" + e.stderr) + data = json.loads(result.stdout) + ic(data) + + for entry in data.get("archives", []): + partition = f"{entry['archive']}|{repo}" + context.log_event( + dg.AssetMaterialization( + asset_key=borg_archive.key, + partition=partition, + metadata={ + "id": dg.MetadataValue.text(entry["id"]), + }, + ) + ) + # context. + + # snapshots = data.get("archives", []) + # + # # Find latest backup for this day + # match = next( + # (s for s in reversed(snapshots) + # if datetime.fromisoformat(s["end"]).date() == expected_date), + # None + # ) + # + # if match: + # context.log_event( + # dg.AssetMaterialization( + # asset_key=one.key, partition="2025-07-27" + # ) # this works! + # ) + # + # return { + # "name": match["name"], + # "end": match["end"], + # "size": match.get("size", 0) + # } + # else: + # raise Exception(f"No backup found for {expected_date}") diff --git a/apps/backup/src/config.py b/apps/backup/src/config.py new file mode 100644 index 0000000..ec80036 --- /dev/null +++ b/apps/backup/src/config.py @@ -0,0 +1,7 @@ +import os +from pathlib import Path + +APP = os.environ.get("APP", Path(__file__).parent.parent.name) + +BORG_HOST = "backup" +BORG_ROOT: str = "/mnt/yotta/xenon/borg/" diff --git a/apps/backup/src/definitions.py b/apps/backup/src/definitions.py new file mode 100644 index 0000000..aab1e7f --- /dev/null +++ b/apps/backup/src/definitions.py @@ -0,0 +1,22 @@ +import assets +import sensors +from config import APP +from icecream import install + +import dagster as dg + +install() + +definitions = dg.Definitions( + assets=[ + asset.with_attributes( + group_names_by_key={asset.key: APP}, + tags_by_key={asset.key: {"app": APP}}, + ) + for asset in dg.load_assets_from_modules([assets]) + ], + resources={}, + jobs=[], + schedules=[], + sensors=[sensors.borg_repos], +) diff --git a/apps/backup/src/partitions.py b/apps/backup/src/partitions.py new file mode 100644 index 0000000..e7aa401 --- /dev/null +++ b/apps/backup/src/partitions.py @@ -0,0 +1,8 @@ +import os + +import dagster as dg + +borg_repo_partitions_def = dg.DynamicPartitionsDefinition(name="borg_repo") +daily_partitions_def = dg.DailyPartitionsDefinition( + start_date="2025-01-01", end_offset=1, timezone=os.environ.get("TZ", "UTC") +)