Compare commits
4 Commits
2a4da9abb9
...
aa4a2fa5b1
| Author | SHA1 | Date | |
|---|---|---|---|
| aa4a2fa5b1 | |||
| af913e258a | |||
| 7a8f15b1d6 | |||
| e9ad1677ef |
@@ -19,85 +19,8 @@ def borg_repos(context: dg.SensorEvaluationContext) -> dg.SensorResult:
|
|||||||
|
|
||||||
new_repos = list(set(repos) - existing_repos)
|
new_repos = list(set(repos) - existing_repos)
|
||||||
return dg.SensorResult(
|
return dg.SensorResult(
|
||||||
run_requests=[dg.RunRequest(partition_key=repo) for repo in new_repos],
|
# run_requests=[dg.RunRequest(partition_key=repo) for repo in new_repos],
|
||||||
dynamic_partitions_requests=[
|
dynamic_partitions_requests=[
|
||||||
borg_repo_partitions_def.build_add_request(new_repos),
|
borg_repo_partitions_def.build_add_request(new_repos),
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
# @dg.sensor(job=jobs.raw_html_job, minimum_interval_seconds=4 * 60 * 60)
|
|
||||||
# def list_archives(context: dg.SensorEvaluationContext) -> Iterator[dg.RunRequest]:
|
|
||||||
# ic(context.cursor)
|
|
||||||
#
|
|
||||||
# response = requests.get(URL)
|
|
||||||
# response.raise_for_status()
|
|
||||||
#
|
|
||||||
# try:
|
|
||||||
# date_obj = next(extract_date(response.text))
|
|
||||||
# date_str = date_obj.strftime("%Y-%m-%d")
|
|
||||||
# context.log.info(f"Found date: {date_str}")
|
|
||||||
# if date_str > context.cursor:
|
|
||||||
# context.update_cursor(date_str)
|
|
||||||
# yield dg.RunRequest()
|
|
||||||
# return
|
|
||||||
# except Exception as e:
|
|
||||||
# context.log.error(f"Parsing error: {e}")
|
|
||||||
#
|
|
||||||
# now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
||||||
# file = f"{now_str} stocks.html"
|
|
||||||
# context.log.info(f"Saving file: {file}")
|
|
||||||
# with open(f"/cache/{file}", "w") as fp:
|
|
||||||
# fp.write(response.text)
|
|
||||||
#
|
|
||||||
#
|
|
||||||
# @dg.sensor(job=jobs.extract_job, minimum_interval_seconds=2 * 60 * 60)
|
|
||||||
# def parse_raw(context: dg.SensorEvaluationContext):
|
|
||||||
# # TODO: use cursor from sensor to filter materialization events
|
|
||||||
#
|
|
||||||
# # Get the known materialized partitions of daily_tables
|
|
||||||
# daily_partitions = context.instance.get_materialized_partitions(
|
|
||||||
# assets.daily_table.key
|
|
||||||
# )
|
|
||||||
# dates = [x.split("|")[0] for x in daily_partitions]
|
|
||||||
# ic(daily_partitions, dates)
|
|
||||||
#
|
|
||||||
# # Get metadata for the raw asset (assumes it's tracked or logged with metadata)
|
|
||||||
# events = list(
|
|
||||||
# context.instance.get_event_records(
|
|
||||||
# event_records_filter=dg.EventRecordsFilter(
|
|
||||||
# event_type=dg.DagsterEventType.ASSET_MATERIALIZATION,
|
|
||||||
# asset_key=assets.raw_html.key,
|
|
||||||
# ),
|
|
||||||
# limit=100,
|
|
||||||
# )
|
|
||||||
# )
|
|
||||||
#
|
|
||||||
# # Track unique dates found in raw that are not materialized in daily_tables
|
|
||||||
# unknown_dates = set()
|
|
||||||
# for event in events:
|
|
||||||
# metadata = event.event_log_entry.asset_materialization.metadata
|
|
||||||
# date_str = None
|
|
||||||
# ic(metadata)
|
|
||||||
# for key, entry in metadata.items():
|
|
||||||
# # TODO: move this general logic
|
|
||||||
# if key.lower() in {"date", "partition", "partition_date"}:
|
|
||||||
# date_str = entry.value
|
|
||||||
# break
|
|
||||||
# if not date_str:
|
|
||||||
# continue
|
|
||||||
#
|
|
||||||
# # Normalize and validate the date
|
|
||||||
# try:
|
|
||||||
# dt = pendulum.from_timestamp(int(date_str))
|
|
||||||
# date_str = dt.strftime("%Y-%m-%d")
|
|
||||||
# except Exception as e:
|
|
||||||
# logger.error(f"Failed to parse date: {date_str}", input=date_str, e=e)
|
|
||||||
# continue
|
|
||||||
#
|
|
||||||
# if date_str not in dates:
|
|
||||||
# unknown_dates.add(date_str)
|
|
||||||
#
|
|
||||||
# ic(unknown_dates)
|
|
||||||
# for date_str in sorted(unknown_dates):
|
|
||||||
# yield dg.RunRequest(partition_key=date_str)
|
|
||||||
|
|||||||
11
apps/backup/src/test.py
Normal file
11
apps/backup/src/test.py
Normal file
@@ -0,0 +1,11 @@
|
|||||||
|
import structlog
|
||||||
|
from utils.borg import get_ssh_client, list_repos
|
||||||
|
|
||||||
|
logger = structlog.get_logger()
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
with get_ssh_client() as client:
|
||||||
|
parent = "/mnt/yotta/xenon/borg/"
|
||||||
|
repos = set(list_repos(client, parent))
|
||||||
|
|
||||||
|
print(repos)
|
||||||
@@ -232,7 +232,9 @@ def new_deals(
|
|||||||
},
|
},
|
||||||
ins={"partitions": dg.AssetIn(key=new_deals.key)},
|
ins={"partitions": dg.AssetIn(key=new_deals.key)},
|
||||||
output_required=False,
|
output_required=False,
|
||||||
automation_condition=dg.AutomationCondition.eager().without(~dg.AutomationCondition.any_deps_missing())
|
automation_condition=dg.AutomationCondition.eager().without(
|
||||||
|
~dg.AutomationCondition.any_deps_missing()
|
||||||
|
),
|
||||||
)
|
)
|
||||||
def good_deals(
|
def good_deals(
|
||||||
context: dg.AssetExecutionContext,
|
context: dg.AssetExecutionContext,
|
||||||
@@ -242,6 +244,9 @@ def good_deals(
|
|||||||
parsed_partition_keys = parse_partition_keys(context, "partitions")
|
parsed_partition_keys = parse_partition_keys(context, "partitions")
|
||||||
ic(parsed_partition_keys)
|
ic(parsed_partition_keys)
|
||||||
|
|
||||||
|
if not partitions:
|
||||||
|
logger.warning("Partitions are empty!")
|
||||||
|
return
|
||||||
df = pl.concat(partitions.values(), how="vertical_relaxed").collect()
|
df = pl.concat(partitions.values(), how="vertical_relaxed").collect()
|
||||||
|
|
||||||
counts = dict(df.group_by("source").len().iter_rows())
|
counts = dict(df.group_by("source").len().iter_rows())
|
||||||
|
|||||||
@@ -40,11 +40,10 @@ def list_locations(context: dg.SensorEvaluationContext) -> dg.SensorResult:
|
|||||||
context.log.info(f"Discovered {len(new_locations)} new locations.")
|
context.log.info(f"Discovered {len(new_locations)} new locations.")
|
||||||
|
|
||||||
# Limit to 3 new locations
|
# Limit to 3 new locations
|
||||||
selected = new_locations[:3]
|
|
||||||
return dg.SensorResult(
|
return dg.SensorResult(
|
||||||
run_requests=[], # dg.RunRequest(partition_key=location) for location in locations],
|
run_requests=[],
|
||||||
dynamic_partitions_requests=[
|
dynamic_partitions_requests=[
|
||||||
location_partitions_def.build_add_request(selected),
|
location_partitions_def.build_add_request(new_locations),
|
||||||
latitude_partitions_def.build_add_request(new_latitudes),
|
latitude_partitions_def.build_add_request(new_latitudes),
|
||||||
longitude_partitions_def.build_add_request(new_longitudes),
|
longitude_partitions_def.build_add_request(new_longitudes),
|
||||||
],
|
],
|
||||||
|
|||||||
Reference in New Issue
Block a user