Compare commits

...

4 Commits

Author SHA1 Message Date
aa4a2fa5b1 add all new locations 2025-10-29 11:11:26 +01:00
af913e258a test ssh access 2025-10-29 10:15:57 +01:00
7a8f15b1d6 remove sample code 2025-10-29 10:15:35 +01:00
e9ad1677ef warn for missing partitions 2025-10-29 10:15:10 +01:00
4 changed files with 20 additions and 82 deletions

View File

@@ -19,85 +19,8 @@ def borg_repos(context: dg.SensorEvaluationContext) -> dg.SensorResult:
new_repos = list(set(repos) - existing_repos)
return dg.SensorResult(
run_requests=[dg.RunRequest(partition_key=repo) for repo in new_repos],
# run_requests=[dg.RunRequest(partition_key=repo) for repo in new_repos],
dynamic_partitions_requests=[
borg_repo_partitions_def.build_add_request(new_repos),
],
)
# @dg.sensor(job=jobs.raw_html_job, minimum_interval_seconds=4 * 60 * 60)
# def list_archives(context: dg.SensorEvaluationContext) -> Iterator[dg.RunRequest]:
# ic(context.cursor)
#
# response = requests.get(URL)
# response.raise_for_status()
#
# try:
# date_obj = next(extract_date(response.text))
# date_str = date_obj.strftime("%Y-%m-%d")
# context.log.info(f"Found date: {date_str}")
# if date_str > context.cursor:
# context.update_cursor(date_str)
# yield dg.RunRequest()
# return
# except Exception as e:
# context.log.error(f"Parsing error: {e}")
#
# now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# file = f"{now_str} stocks.html"
# context.log.info(f"Saving file: {file}")
# with open(f"/cache/{file}", "w") as fp:
# fp.write(response.text)
#
#
# @dg.sensor(job=jobs.extract_job, minimum_interval_seconds=2 * 60 * 60)
# def parse_raw(context: dg.SensorEvaluationContext):
# # TODO: use cursor from sensor to filter materialization events
#
# # Get the known materialized partitions of daily_tables
# daily_partitions = context.instance.get_materialized_partitions(
# assets.daily_table.key
# )
# dates = [x.split("|")[0] for x in daily_partitions]
# ic(daily_partitions, dates)
#
# # Get metadata for the raw asset (assumes it's tracked or logged with metadata)
# events = list(
# context.instance.get_event_records(
# event_records_filter=dg.EventRecordsFilter(
# event_type=dg.DagsterEventType.ASSET_MATERIALIZATION,
# asset_key=assets.raw_html.key,
# ),
# limit=100,
# )
# )
#
# # Track unique dates found in raw that are not materialized in daily_tables
# unknown_dates = set()
# for event in events:
# metadata = event.event_log_entry.asset_materialization.metadata
# date_str = None
# ic(metadata)
# for key, entry in metadata.items():
# # TODO: move this general logic
# if key.lower() in {"date", "partition", "partition_date"}:
# date_str = entry.value
# break
# if not date_str:
# continue
#
# # Normalize and validate the date
# try:
# dt = pendulum.from_timestamp(int(date_str))
# date_str = dt.strftime("%Y-%m-%d")
# except Exception as e:
# logger.error(f"Failed to parse date: {date_str}", input=date_str, e=e)
# continue
#
# if date_str not in dates:
# unknown_dates.add(date_str)
#
# ic(unknown_dates)
# for date_str in sorted(unknown_dates):
# yield dg.RunRequest(partition_key=date_str)

11
apps/backup/src/test.py Normal file
View File

@@ -0,0 +1,11 @@
import structlog
from utils.borg import get_ssh_client, list_repos
logger = structlog.get_logger()
if __name__ == "__main__":
with get_ssh_client() as client:
parent = "/mnt/yotta/xenon/borg/"
repos = set(list_repos(client, parent))
print(repos)

View File

@@ -232,7 +232,9 @@ def new_deals(
},
ins={"partitions": dg.AssetIn(key=new_deals.key)},
output_required=False,
automation_condition=dg.AutomationCondition.eager().without(~dg.AutomationCondition.any_deps_missing())
automation_condition=dg.AutomationCondition.eager().without(
~dg.AutomationCondition.any_deps_missing()
),
)
def good_deals(
context: dg.AssetExecutionContext,
@@ -242,6 +244,9 @@ def good_deals(
parsed_partition_keys = parse_partition_keys(context, "partitions")
ic(parsed_partition_keys)
if not partitions:
logger.warning("Partitions are empty!")
return
df = pl.concat(partitions.values(), how="vertical_relaxed").collect()
counts = dict(df.group_by("source").len().iter_rows())

View File

@@ -40,11 +40,10 @@ def list_locations(context: dg.SensorEvaluationContext) -> dg.SensorResult:
context.log.info(f"Discovered {len(new_locations)} new locations.")
# Limit to 3 new locations
selected = new_locations[:3]
return dg.SensorResult(
run_requests=[], # dg.RunRequest(partition_key=location) for location in locations],
run_requests=[],
dynamic_partitions_requests=[
location_partitions_def.build_add_request(selected),
location_partitions_def.build_add_request(new_locations),
latitude_partitions_def.build_add_request(new_latitudes),
longitude_partitions_def.build_add_request(new_longitudes),
],