Compare commits
13 Commits
67a7e2dacf
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 87924620fd | |||
| b15aaaa0dc | |||
| 3f99f354de | |||
| 204565118b | |||
| aa4a2fa5b1 | |||
| af913e258a | |||
| 7a8f15b1d6 | |||
| e9ad1677ef | |||
| 2a4da9abb9 | |||
| a9b9197150 | |||
| 883ecf86be | |||
| 7a600f6264 | |||
| 127a773c82 |
@@ -19,85 +19,8 @@ def borg_repos(context: dg.SensorEvaluationContext) -> dg.SensorResult:
|
|||||||
|
|
||||||
new_repos = list(set(repos) - existing_repos)
|
new_repos = list(set(repos) - existing_repos)
|
||||||
return dg.SensorResult(
|
return dg.SensorResult(
|
||||||
run_requests=[dg.RunRequest(partition_key=repo) for repo in new_repos],
|
# run_requests=[dg.RunRequest(partition_key=repo) for repo in new_repos],
|
||||||
dynamic_partitions_requests=[
|
dynamic_partitions_requests=[
|
||||||
borg_repo_partitions_def.build_add_request(new_repos),
|
borg_repo_partitions_def.build_add_request(new_repos),
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
# @dg.sensor(job=jobs.raw_html_job, minimum_interval_seconds=4 * 60 * 60)
|
|
||||||
# def list_archives(context: dg.SensorEvaluationContext) -> Iterator[dg.RunRequest]:
|
|
||||||
# ic(context.cursor)
|
|
||||||
#
|
|
||||||
# response = requests.get(URL)
|
|
||||||
# response.raise_for_status()
|
|
||||||
#
|
|
||||||
# try:
|
|
||||||
# date_obj = next(extract_date(response.text))
|
|
||||||
# date_str = date_obj.strftime("%Y-%m-%d")
|
|
||||||
# context.log.info(f"Found date: {date_str}")
|
|
||||||
# if date_str > context.cursor:
|
|
||||||
# context.update_cursor(date_str)
|
|
||||||
# yield dg.RunRequest()
|
|
||||||
# return
|
|
||||||
# except Exception as e:
|
|
||||||
# context.log.error(f"Parsing error: {e}")
|
|
||||||
#
|
|
||||||
# now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
||||||
# file = f"{now_str} stocks.html"
|
|
||||||
# context.log.info(f"Saving file: {file}")
|
|
||||||
# with open(f"/cache/{file}", "w") as fp:
|
|
||||||
# fp.write(response.text)
|
|
||||||
#
|
|
||||||
#
|
|
||||||
# @dg.sensor(job=jobs.extract_job, minimum_interval_seconds=2 * 60 * 60)
|
|
||||||
# def parse_raw(context: dg.SensorEvaluationContext):
|
|
||||||
# # TODO: use cursor from sensor to filter materialization events
|
|
||||||
#
|
|
||||||
# # Get the known materialized partitions of daily_tables
|
|
||||||
# daily_partitions = context.instance.get_materialized_partitions(
|
|
||||||
# assets.daily_table.key
|
|
||||||
# )
|
|
||||||
# dates = [x.split("|")[0] for x in daily_partitions]
|
|
||||||
# ic(daily_partitions, dates)
|
|
||||||
#
|
|
||||||
# # Get metadata for the raw asset (assumes it's tracked or logged with metadata)
|
|
||||||
# events = list(
|
|
||||||
# context.instance.get_event_records(
|
|
||||||
# event_records_filter=dg.EventRecordsFilter(
|
|
||||||
# event_type=dg.DagsterEventType.ASSET_MATERIALIZATION,
|
|
||||||
# asset_key=assets.raw_html.key,
|
|
||||||
# ),
|
|
||||||
# limit=100,
|
|
||||||
# )
|
|
||||||
# )
|
|
||||||
#
|
|
||||||
# # Track unique dates found in raw that are not materialized in daily_tables
|
|
||||||
# unknown_dates = set()
|
|
||||||
# for event in events:
|
|
||||||
# metadata = event.event_log_entry.asset_materialization.metadata
|
|
||||||
# date_str = None
|
|
||||||
# ic(metadata)
|
|
||||||
# for key, entry in metadata.items():
|
|
||||||
# # TODO: move this general logic
|
|
||||||
# if key.lower() in {"date", "partition", "partition_date"}:
|
|
||||||
# date_str = entry.value
|
|
||||||
# break
|
|
||||||
# if not date_str:
|
|
||||||
# continue
|
|
||||||
#
|
|
||||||
# # Normalize and validate the date
|
|
||||||
# try:
|
|
||||||
# dt = pendulum.from_timestamp(int(date_str))
|
|
||||||
# date_str = dt.strftime("%Y-%m-%d")
|
|
||||||
# except Exception as e:
|
|
||||||
# logger.error(f"Failed to parse date: {date_str}", input=date_str, e=e)
|
|
||||||
# continue
|
|
||||||
#
|
|
||||||
# if date_str not in dates:
|
|
||||||
# unknown_dates.add(date_str)
|
|
||||||
#
|
|
||||||
# ic(unknown_dates)
|
|
||||||
# for date_str in sorted(unknown_dates):
|
|
||||||
# yield dg.RunRequest(partition_key=date_str)
|
|
||||||
|
|||||||
11
apps/backup/src/test.py
Normal file
11
apps/backup/src/test.py
Normal file
@@ -0,0 +1,11 @@
|
|||||||
|
import structlog
|
||||||
|
from utils.borg import get_ssh_client, list_repos
|
||||||
|
|
||||||
|
logger = structlog.get_logger()
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
with get_ssh_client() as client:
|
||||||
|
parent = "/mnt/yotta/xenon/borg/"
|
||||||
|
repos = set(list_repos(client, parent))
|
||||||
|
|
||||||
|
print(repos)
|
||||||
@@ -8,31 +8,33 @@ async def scrape(url: str) -> str:
|
|||||||
|
|
||||||
await page.goto(url, timeout=60000)
|
await page.goto(url, timeout=60000)
|
||||||
|
|
||||||
# Wait until at least one toggle button is present
|
# Wait until buttons are available
|
||||||
await page.wait_for_selector(".toggle-btn", timeout=20000)
|
await page.wait_for_selector('div[role="button"][aria-expanded]', timeout=20000)
|
||||||
|
|
||||||
# Set zoom
|
# Zoom out for full view
|
||||||
await page.evaluate("document.body.style.zoom='50%'")
|
await page.evaluate("document.body.style.zoom='50%'")
|
||||||
|
|
||||||
# Find all toggle buttons
|
# Find collapsible buttons
|
||||||
toggle_buttons = await page.query_selector_all(".toggle-btn")
|
toggle_buttons = await page.query_selector_all(
|
||||||
print(f"Found {len(toggle_buttons)} toggle buttons")
|
'div[role="button"][aria-expanded]'
|
||||||
|
)
|
||||||
|
print(f"Found {len(toggle_buttons)} expandable buttons")
|
||||||
|
|
||||||
for i, btn in enumerate(toggle_buttons):
|
for i, btn in enumerate(toggle_buttons):
|
||||||
try:
|
try:
|
||||||
# Ensure it's visible and enabled
|
aria_expanded = await btn.get_attribute("aria-expanded")
|
||||||
if await btn.is_visible() and await btn.is_enabled():
|
if aria_expanded == "false":
|
||||||
await btn.click()
|
if await btn.is_visible() and await btn.is_enabled():
|
||||||
await page.wait_for_timeout(1000)
|
await btn.click()
|
||||||
|
await page.wait_for_timeout(1000)
|
||||||
|
|
||||||
if i == len(toggle_buttons) - 1:
|
if i == len(toggle_buttons) - 1:
|
||||||
break
|
break
|
||||||
|
|
||||||
# Scroll down gradually
|
# Scroll gradually
|
||||||
scroll_step = 500
|
scroll_step = 500
|
||||||
total_height = await page.evaluate("() => document.body.scrollHeight")
|
total_height = await page.evaluate("() => document.body.scrollHeight")
|
||||||
current_position = 0
|
current_position = 0
|
||||||
|
|
||||||
while current_position < total_height:
|
while current_position < total_height:
|
||||||
await page.evaluate(f"window.scrollTo(0, {current_position});")
|
await page.evaluate(f"window.scrollTo(0, {current_position});")
|
||||||
await page.wait_for_timeout(100)
|
await page.wait_for_timeout(100)
|
||||||
@@ -44,17 +46,14 @@ async def scrape(url: str) -> str:
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"Skipped button due to error: {e}")
|
print(f"Skipped button due to error: {e}")
|
||||||
|
|
||||||
# Get the page content
|
# Capture expanded HTML
|
||||||
page_source = await page.content()
|
page_source = await page.content()
|
||||||
|
|
||||||
# Close the browser
|
|
||||||
await browser.close()
|
await browser.close()
|
||||||
|
|
||||||
# Continue scraping logic here...
|
# Save to file
|
||||||
print("Scraping done")
|
|
||||||
|
|
||||||
# Save the page content to a file
|
|
||||||
with open("/cache/scraped_page.html", "w") as fp:
|
with open("/cache/scraped_page.html", "w") as fp:
|
||||||
fp.write(page_source)
|
fp.write(page_source)
|
||||||
|
|
||||||
|
print("Scraping done")
|
||||||
|
|
||||||
return page_source
|
return page_source
|
||||||
|
|||||||
@@ -176,9 +176,7 @@ def works(context: dg.AssetExecutionContext) -> Iterator[dg.Output[pl.DataFrame]
|
|||||||
},
|
},
|
||||||
output_required=False,
|
output_required=False,
|
||||||
dagster_type=patito_model_to_dagster_type(Deal),
|
dagster_type=patito_model_to_dagster_type(Deal),
|
||||||
automation_condition=dg.AutomationCondition.on_missing().ignore(
|
automation_condition=dg.AutomationCondition.eager(),
|
||||||
dg.AssetSelection.assets(cleaned_deals.key)
|
|
||||||
),
|
|
||||||
)
|
)
|
||||||
def new_deals(
|
def new_deals(
|
||||||
context: dg.AssetExecutionContext, partitions: dict[str, pl.LazyFrame | None]
|
context: dg.AssetExecutionContext, partitions: dict[str, pl.LazyFrame | None]
|
||||||
@@ -198,7 +196,6 @@ def new_deals(
|
|||||||
|
|
||||||
if len(partition_keys := sorted(partitions.keys())) < 2:
|
if len(partition_keys := sorted(partitions.keys())) < 2:
|
||||||
context.log.warning("Not enough partitions to fetch new deals!")
|
context.log.warning("Not enough partitions to fetch new deals!")
|
||||||
|
|
||||||
return
|
return
|
||||||
|
|
||||||
before, after = partition_keys[-2:]
|
before, after = partition_keys[-2:]
|
||||||
@@ -220,7 +217,9 @@ def new_deals(
|
|||||||
new_df = df_after.join(df_before.select("id"), on="id", how="anti").collect()
|
new_df = df_after.join(df_before.select("id"), on="id", how="anti").collect()
|
||||||
if new_df.height:
|
if new_df.height:
|
||||||
context.log.info(f"New deals found ({new_df.height}x)!")
|
context.log.info(f"New deals found ({new_df.height}x)!")
|
||||||
yield dg.Output(Deal.DataFrame(new_df))
|
yield dg.Output(
|
||||||
|
Deal.DataFrame(new_df.with_columns(pl.col("release").cast(pl.Date)))
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
context.log.info("No new deals found!")
|
context.log.info("No new deals found!")
|
||||||
|
|
||||||
@@ -233,7 +232,9 @@ def new_deals(
|
|||||||
},
|
},
|
||||||
ins={"partitions": dg.AssetIn(key=new_deals.key)},
|
ins={"partitions": dg.AssetIn(key=new_deals.key)},
|
||||||
output_required=False,
|
output_required=False,
|
||||||
automation_condition=dg.AutomationCondition.eager(),
|
automation_condition=dg.AutomationCondition.eager().without(
|
||||||
|
~dg.AutomationCondition.any_deps_missing()
|
||||||
|
),
|
||||||
)
|
)
|
||||||
def good_deals(
|
def good_deals(
|
||||||
context: dg.AssetExecutionContext,
|
context: dg.AssetExecutionContext,
|
||||||
@@ -243,6 +244,9 @@ def good_deals(
|
|||||||
parsed_partition_keys = parse_partition_keys(context, "partitions")
|
parsed_partition_keys = parse_partition_keys(context, "partitions")
|
||||||
ic(parsed_partition_keys)
|
ic(parsed_partition_keys)
|
||||||
|
|
||||||
|
if not partitions:
|
||||||
|
logger.warning("Partitions are empty!")
|
||||||
|
return
|
||||||
df = pl.concat(partitions.values(), how="vertical_relaxed").collect()
|
df = pl.concat(partitions.values(), how="vertical_relaxed").collect()
|
||||||
|
|
||||||
counts = dict(df.group_by("source").len().iter_rows())
|
counts = dict(df.group_by("source").len().iter_rows())
|
||||||
|
|||||||
@@ -12,8 +12,8 @@ define_asset_job = partial(dg.define_asset_job, **kwargs)
|
|||||||
|
|
||||||
deals_job = dg.define_asset_job(
|
deals_job = dg.define_asset_job(
|
||||||
"deals_job",
|
"deals_job",
|
||||||
selection=[assets.deals.key],
|
selection=dg.AssetSelection.assets(assets.new_deals.key).upstream(),
|
||||||
partitions_def=assets.deals.partitions_def,
|
partitions_def=assets.new_deals.partitions_def,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -10,5 +10,7 @@ class Deal(pt.Model):
|
|||||||
title: str = pt.Field(description="Title of the deal.")
|
title: str = pt.Field(description="Title of the deal.")
|
||||||
url: str = pt.Field(description="URL to the deal.")
|
url: str = pt.Field(description="URL to the deal.")
|
||||||
date: datetime.date = pt.Field(description="Day the deal was listed.")
|
date: datetime.date = pt.Field(description="Day the deal was listed.")
|
||||||
release: datetime.date = pt.Field(description="Release date.")
|
release: datetime.date | None = pt.Field(
|
||||||
|
description="Release date.", allow_missing=True
|
||||||
|
)
|
||||||
price: float = pt.Field(description="Price of the deal in EUR.")
|
price: float = pt.Field(description="Price of the deal in EUR.")
|
||||||
|
|||||||
@@ -102,13 +102,15 @@ def raw_weather(context: dg.AssetExecutionContext) -> Any:
|
|||||||
now = datetime.now(tz=timezone.utc)
|
now = datetime.now(tz=timezone.utc)
|
||||||
date_str = now.strftime("%Y-%m-%d")
|
date_str = now.strftime("%Y-%m-%d")
|
||||||
time_str = now.strftime("%H:%M:%S")
|
time_str = now.strftime("%H:%M:%S")
|
||||||
|
|
||||||
|
latitude_str, longitude_str = partition_key[:5], partition_key[5:]
|
||||||
yield dg.Output(
|
yield dg.Output(
|
||||||
data,
|
data,
|
||||||
metadata={
|
metadata={
|
||||||
"date": dg.MetadataValue.timestamp(now),
|
"date": dg.MetadataValue.timestamp(now),
|
||||||
"latitude": dg.MetadataValue.float(latitude),
|
"latitude": dg.MetadataValue.float(latitude),
|
||||||
"longitude": dg.MetadataValue.float(longitude),
|
"longitude": dg.MetadataValue.float(longitude),
|
||||||
"path_suffix": [date_str, time_str],
|
"path": [APP, "raw", date_str, latitude_str, longitude_str, time_str],
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|||||||
@@ -39,12 +39,12 @@ def list_locations(context: dg.SensorEvaluationContext) -> dg.SensorResult:
|
|||||||
if new_locations:
|
if new_locations:
|
||||||
context.log.info(f"Discovered {len(new_locations)} new locations.")
|
context.log.info(f"Discovered {len(new_locations)} new locations.")
|
||||||
|
|
||||||
# Limit to 3 new locations
|
|
||||||
selected = new_locations[:3]
|
|
||||||
return dg.SensorResult(
|
return dg.SensorResult(
|
||||||
run_requests=[], # dg.RunRequest(partition_key=location) for location in locations],
|
run_requests=[
|
||||||
|
dg.RunRequest(partition_key=location) for location in new_locations[:3]
|
||||||
|
],
|
||||||
dynamic_partitions_requests=[
|
dynamic_partitions_requests=[
|
||||||
location_partitions_def.build_add_request(selected),
|
location_partitions_def.build_add_request(new_locations),
|
||||||
latitude_partitions_def.build_add_request(new_latitudes),
|
latitude_partitions_def.build_add_request(new_latitudes),
|
||||||
longitude_partitions_def.build_add_request(new_longitudes),
|
longitude_partitions_def.build_add_request(new_longitudes),
|
||||||
],
|
],
|
||||||
|
|||||||
@@ -35,8 +35,8 @@ services:
|
|||||||
- /opt/dagster/apps/:/code/apps/:ro
|
- /opt/dagster/apps/:/code/apps/:ro
|
||||||
- /opt/dagster/shared/:/code/shared/:ro
|
- /opt/dagster/shared/:/code/shared/:ro
|
||||||
- /opt/dagster/logs/:/logs:rw
|
- /opt/dagster/logs/:/logs:rw
|
||||||
- /opt/dagster/storage/import/:/storage/import/:ro
|
# - /mnt/mezzo/scratch/dagster/import/:/storage/import/:ro
|
||||||
- /opt/dagster/storage/deals/:/storage/deals/:rw
|
- /mnt/mezzo/scratch/dagster/deals/:/storage/deals/:rw
|
||||||
networks:
|
networks:
|
||||||
- dagster
|
- dagster
|
||||||
|
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ x-postgres-env: &postgres_env
|
|||||||
POSTGRES_DB: ${POSTGRES_DB}
|
POSTGRES_DB: ${POSTGRES_DB}
|
||||||
x-system-env: &system_env
|
x-system-env: &system_env
|
||||||
TZ: Europe/Amsterdam
|
TZ: Europe/Amsterdam
|
||||||
|
DATA_DIR: ${DATA_DIR}
|
||||||
CACHE_DIR: /tmp/cache
|
CACHE_DIR: /tmp/cache
|
||||||
x-dagster-env: &dagster_env
|
x-dagster-env: &dagster_env
|
||||||
DAGSTER_POSTGRES_HOST: ${POSTGRES_HOST}
|
DAGSTER_POSTGRES_HOST: ${POSTGRES_HOST}
|
||||||
@@ -26,7 +27,7 @@ x-volumes: &volumes
|
|||||||
- /opt/dagster/dagster.yaml:/opt/dagster/home/dagster.yaml.template:ro
|
- /opt/dagster/dagster.yaml:/opt/dagster/home/dagster.yaml.template:ro
|
||||||
- /opt/dagster/workspace.yaml:/opt/dagster/home/workspace.yaml:ro
|
- /opt/dagster/workspace.yaml:/opt/dagster/home/workspace.yaml:ro
|
||||||
- /opt/dagster/system/:/code/system/:ro
|
- /opt/dagster/system/:/code/system/:ro
|
||||||
- /opt/dagster/storage/:/storage/:rw
|
- /mnt/mezzo/scratch/dagster/:/storage/:rw
|
||||||
- /opt/dagster/logs/:/logs:rw
|
- /opt/dagster/logs/:/logs:rw
|
||||||
- /var/run/docker.sock:/var/run/docker.sock:rw
|
- /var/run/docker.sock:/var/run/docker.sock:rw
|
||||||
|
|
||||||
@@ -41,7 +42,7 @@ services:
|
|||||||
networks:
|
networks:
|
||||||
- dagster
|
- dagster
|
||||||
ports:
|
ports:
|
||||||
- '15432:5432'
|
- '25432:5432'
|
||||||
volumes:
|
volumes:
|
||||||
- /opt/dagster/db/:/var/lib/postgresql/data/
|
- /opt/dagster/db/:/var/lib/postgresql/data/
|
||||||
|
|
||||||
|
|||||||
@@ -21,6 +21,7 @@ run_launcher:
|
|||||||
- DAGSTER_POSTGRES_USER
|
- DAGSTER_POSTGRES_USER
|
||||||
- DAGSTER_POSTGRES_PASSWORD
|
- DAGSTER_POSTGRES_PASSWORD
|
||||||
- DAGSTER_POSTGRES_DB
|
- DAGSTER_POSTGRES_DB
|
||||||
|
- DATA_DIR
|
||||||
- SMTP_SERVER
|
- SMTP_SERVER
|
||||||
- SMTP_PORT
|
- SMTP_PORT
|
||||||
- SMTP_USERNAME
|
- SMTP_USERNAME
|
||||||
@@ -33,8 +34,8 @@ run_launcher:
|
|||||||
volumes:
|
volumes:
|
||||||
- /opt/dagster/apps/:/code/apps/:ro
|
- /opt/dagster/apps/:/code/apps/:ro
|
||||||
- /opt/dagster/shared/:/code/shared/:ro
|
- /opt/dagster/shared/:/code/shared/:ro
|
||||||
- /opt/dagster/storage/:/storage/:rw
|
|
||||||
- /opt/dagster/logs/:/logs:rw
|
- /opt/dagster/logs/:/logs:rw
|
||||||
|
- ${DATA_DIR}:/storage/:rw
|
||||||
- ${CACHE_DIR}:/cache:rw
|
- ${CACHE_DIR}:/cache:rw
|
||||||
|
|
||||||
run_storage:
|
run_storage:
|
||||||
|
|||||||
@@ -6,6 +6,10 @@ from pydantic import Field, PrivateAttr
|
|||||||
from upath import UPath
|
from upath import UPath
|
||||||
|
|
||||||
import dagster as dg
|
import dagster as dg
|
||||||
|
from dagster import (
|
||||||
|
InputContext,
|
||||||
|
OutputContext,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def _process_env_vars(config: dict[str, Any]) -> dict[str, Any]:
|
def _process_env_vars(config: dict[str, Any]) -> dict[str, Any]:
|
||||||
@@ -60,12 +64,26 @@ class BaseIOManager(dg.ConfigurableIOManager, dg.UPathIOManager, ABC):
|
|||||||
with path.open("r") as fp:
|
with path.open("r") as fp:
|
||||||
return json.load(fp)
|
return json.load(fp)
|
||||||
|
|
||||||
|
def get_path_for_partition(
|
||||||
|
self, context: InputContext | OutputContext, path: "UPath", partition: str
|
||||||
|
) -> UPath:
|
||||||
|
"""Use path from metadata when provided."""
|
||||||
|
ic()
|
||||||
|
context_metadata = context.output_metadata or {}
|
||||||
|
ic(context_metadata)
|
||||||
|
|
||||||
|
if "path" in context_metadata:
|
||||||
|
return UPath(*context_metadata["path"].value)
|
||||||
|
return super().get_path_for_partition(context)
|
||||||
|
|
||||||
def get_asset_relative_path(
|
def get_asset_relative_path(
|
||||||
self, context: dg.InputContext | dg.OutputContext
|
self, context: dg.InputContext | dg.OutputContext
|
||||||
) -> UPath:
|
) -> UPath:
|
||||||
"""Get the relative path for the asset based on context metadata."""
|
"""Get the relative path for the asset based on context metadata."""
|
||||||
|
ic()
|
||||||
context_metadata = context.output_metadata or {}
|
context_metadata = context.output_metadata or {}
|
||||||
ic(context_metadata)
|
ic(context_metadata)
|
||||||
|
|
||||||
path_prefix = (
|
path_prefix = (
|
||||||
context_metadata["path_prefix"].value
|
context_metadata["path_prefix"].value
|
||||||
if "path_prefix" in context_metadata
|
if "path_prefix" in context_metadata
|
||||||
|
|||||||
Reference in New Issue
Block a user