import structlog from partitions import borg_repo_partitions_def from utils.borg import get_ssh_client, list_repos import dagster as dg logger = structlog.get_logger() @dg.sensor() def borg_repos(context: dg.SensorEvaluationContext) -> dg.SensorResult: existing_repos = set( context.instance.get_dynamic_partitions(borg_repo_partitions_def.name) ) with get_ssh_client() as client: parent = "/mnt/yotta/xenon/borg/" repos = set(list_repos(client, parent)) new_repos = list(set(repos) - existing_repos) return dg.SensorResult( run_requests=[dg.RunRequest(partition_key=repo) for repo in new_repos], dynamic_partitions_requests=[ borg_repo_partitions_def.build_add_request(new_repos), ], ) # @dg.sensor(job=jobs.raw_html_job, minimum_interval_seconds=4 * 60 * 60) # def list_archives(context: dg.SensorEvaluationContext) -> Iterator[dg.RunRequest]: # ic(context.cursor) # # response = requests.get(URL) # response.raise_for_status() # # try: # date_obj = next(extract_date(response.text)) # date_str = date_obj.strftime("%Y-%m-%d") # context.log.info(f"Found date: {date_str}") # if date_str > context.cursor: # context.update_cursor(date_str) # yield dg.RunRequest() # return # except Exception as e: # context.log.error(f"Parsing error: {e}") # # now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S") # file = f"{now_str} stocks.html" # context.log.info(f"Saving file: {file}") # with open(f"/cache/{file}", "w") as fp: # fp.write(response.text) # # # @dg.sensor(job=jobs.extract_job, minimum_interval_seconds=2 * 60 * 60) # def parse_raw(context: dg.SensorEvaluationContext): # # TODO: use cursor from sensor to filter materialization events # # # Get the known materialized partitions of daily_tables # daily_partitions = context.instance.get_materialized_partitions( # assets.daily_table.key # ) # dates = [x.split("|")[0] for x in daily_partitions] # ic(daily_partitions, dates) # # # Get metadata for the raw asset (assumes it's tracked or logged with metadata) # events = list( # context.instance.get_event_records( # event_records_filter=dg.EventRecordsFilter( # event_type=dg.DagsterEventType.ASSET_MATERIALIZATION, # asset_key=assets.raw_html.key, # ), # limit=100, # ) # ) # # # Track unique dates found in raw that are not materialized in daily_tables # unknown_dates = set() # for event in events: # metadata = event.event_log_entry.asset_materialization.metadata # date_str = None # ic(metadata) # for key, entry in metadata.items(): # # TODO: move this general logic # if key.lower() in {"date", "partition", "partition_date"}: # date_str = entry.value # break # if not date_str: # continue # # # Normalize and validate the date # try: # dt = pendulum.from_timestamp(int(date_str)) # date_str = dt.strftime("%Y-%m-%d") # except Exception as e: # logger.error(f"Failed to parse date: {date_str}", input=date_str, e=e) # continue # # if date_str not in dates: # unknown_dates.add(date_str) # # ic(unknown_dates) # for date_str in sorted(unknown_dates): # yield dg.RunRequest(partition_key=date_str)