Compare commits

...

4 Commits

Author SHA1 Message Date
87924620fd move storage dir 2026-01-08 15:52:11 +01:00
b15aaaa0dc update stocks scraper 2025-11-09 18:30:10 +01:00
3f99f354de align raw weather output files 2025-10-29 13:59:21 +01:00
204565118b run request for 3 new locations 2025-10-29 11:17:12 +01:00
7 changed files with 48 additions and 26 deletions

View File

@@ -8,31 +8,33 @@ async def scrape(url: str) -> str:
await page.goto(url, timeout=60000) await page.goto(url, timeout=60000)
# Wait until at least one toggle button is present # Wait until buttons are available
await page.wait_for_selector(".toggle-btn", timeout=20000) await page.wait_for_selector('div[role="button"][aria-expanded]', timeout=20000)
# Set zoom # Zoom out for full view
await page.evaluate("document.body.style.zoom='50%'") await page.evaluate("document.body.style.zoom='50%'")
# Find all toggle buttons # Find collapsible buttons
toggle_buttons = await page.query_selector_all(".toggle-btn") toggle_buttons = await page.query_selector_all(
print(f"Found {len(toggle_buttons)} toggle buttons") 'div[role="button"][aria-expanded]'
)
print(f"Found {len(toggle_buttons)} expandable buttons")
for i, btn in enumerate(toggle_buttons): for i, btn in enumerate(toggle_buttons):
try: try:
# Ensure it's visible and enabled aria_expanded = await btn.get_attribute("aria-expanded")
if await btn.is_visible() and await btn.is_enabled(): if aria_expanded == "false":
await btn.click() if await btn.is_visible() and await btn.is_enabled():
await page.wait_for_timeout(1000) await btn.click()
await page.wait_for_timeout(1000)
if i == len(toggle_buttons) - 1: if i == len(toggle_buttons) - 1:
break break
# Scroll down gradually # Scroll gradually
scroll_step = 500 scroll_step = 500
total_height = await page.evaluate("() => document.body.scrollHeight") total_height = await page.evaluate("() => document.body.scrollHeight")
current_position = 0 current_position = 0
while current_position < total_height: while current_position < total_height:
await page.evaluate(f"window.scrollTo(0, {current_position});") await page.evaluate(f"window.scrollTo(0, {current_position});")
await page.wait_for_timeout(100) await page.wait_for_timeout(100)
@@ -44,17 +46,14 @@ async def scrape(url: str) -> str:
except Exception as e: except Exception as e:
print(f"Skipped button due to error: {e}") print(f"Skipped button due to error: {e}")
# Get the page content # Capture expanded HTML
page_source = await page.content() page_source = await page.content()
# Close the browser
await browser.close() await browser.close()
# Continue scraping logic here... # Save to file
print("Scraping done")
# Save the page content to a file
with open("/cache/scraped_page.html", "w") as fp: with open("/cache/scraped_page.html", "w") as fp:
fp.write(page_source) fp.write(page_source)
print("Scraping done")
return page_source return page_source

View File

@@ -102,13 +102,15 @@ def raw_weather(context: dg.AssetExecutionContext) -> Any:
now = datetime.now(tz=timezone.utc) now = datetime.now(tz=timezone.utc)
date_str = now.strftime("%Y-%m-%d") date_str = now.strftime("%Y-%m-%d")
time_str = now.strftime("%H:%M:%S") time_str = now.strftime("%H:%M:%S")
latitude_str, longitude_str = partition_key[:5], partition_key[5:]
yield dg.Output( yield dg.Output(
data, data,
metadata={ metadata={
"date": dg.MetadataValue.timestamp(now), "date": dg.MetadataValue.timestamp(now),
"latitude": dg.MetadataValue.float(latitude), "latitude": dg.MetadataValue.float(latitude),
"longitude": dg.MetadataValue.float(longitude), "longitude": dg.MetadataValue.float(longitude),
"path_suffix": [date_str, time_str], "path": [APP, "raw", date_str, latitude_str, longitude_str, time_str],
}, },
) )

View File

@@ -39,9 +39,10 @@ def list_locations(context: dg.SensorEvaluationContext) -> dg.SensorResult:
if new_locations: if new_locations:
context.log.info(f"Discovered {len(new_locations)} new locations.") context.log.info(f"Discovered {len(new_locations)} new locations.")
# Limit to 3 new locations
return dg.SensorResult( return dg.SensorResult(
run_requests=[], run_requests=[
dg.RunRequest(partition_key=location) for location in new_locations[:3]
],
dynamic_partitions_requests=[ dynamic_partitions_requests=[
location_partitions_def.build_add_request(new_locations), location_partitions_def.build_add_request(new_locations),
latitude_partitions_def.build_add_request(new_latitudes), latitude_partitions_def.build_add_request(new_latitudes),

View File

@@ -35,8 +35,8 @@ services:
- /opt/dagster/apps/:/code/apps/:ro - /opt/dagster/apps/:/code/apps/:ro
- /opt/dagster/shared/:/code/shared/:ro - /opt/dagster/shared/:/code/shared/:ro
- /opt/dagster/logs/:/logs:rw - /opt/dagster/logs/:/logs:rw
- /opt/dagster/storage/import/:/storage/import/:ro # - /mnt/mezzo/scratch/dagster/import/:/storage/import/:ro
- /opt/dagster/storage/deals/:/storage/deals/:rw - /mnt/mezzo/scratch/dagster/deals/:/storage/deals/:rw
networks: networks:
- dagster - dagster

View File

@@ -6,6 +6,7 @@ x-postgres-env: &postgres_env
POSTGRES_DB: ${POSTGRES_DB} POSTGRES_DB: ${POSTGRES_DB}
x-system-env: &system_env x-system-env: &system_env
TZ: Europe/Amsterdam TZ: Europe/Amsterdam
DATA_DIR: ${DATA_DIR}
CACHE_DIR: /tmp/cache CACHE_DIR: /tmp/cache
x-dagster-env: &dagster_env x-dagster-env: &dagster_env
DAGSTER_POSTGRES_HOST: ${POSTGRES_HOST} DAGSTER_POSTGRES_HOST: ${POSTGRES_HOST}
@@ -26,7 +27,7 @@ x-volumes: &volumes
- /opt/dagster/dagster.yaml:/opt/dagster/home/dagster.yaml.template:ro - /opt/dagster/dagster.yaml:/opt/dagster/home/dagster.yaml.template:ro
- /opt/dagster/workspace.yaml:/opt/dagster/home/workspace.yaml:ro - /opt/dagster/workspace.yaml:/opt/dagster/home/workspace.yaml:ro
- /opt/dagster/system/:/code/system/:ro - /opt/dagster/system/:/code/system/:ro
- /opt/dagster/storage/:/storage/:rw - /mnt/mezzo/scratch/dagster/:/storage/:rw
- /opt/dagster/logs/:/logs:rw - /opt/dagster/logs/:/logs:rw
- /var/run/docker.sock:/var/run/docker.sock:rw - /var/run/docker.sock:/var/run/docker.sock:rw

View File

@@ -21,6 +21,7 @@ run_launcher:
- DAGSTER_POSTGRES_USER - DAGSTER_POSTGRES_USER
- DAGSTER_POSTGRES_PASSWORD - DAGSTER_POSTGRES_PASSWORD
- DAGSTER_POSTGRES_DB - DAGSTER_POSTGRES_DB
- DATA_DIR
- SMTP_SERVER - SMTP_SERVER
- SMTP_PORT - SMTP_PORT
- SMTP_USERNAME - SMTP_USERNAME
@@ -33,8 +34,8 @@ run_launcher:
volumes: volumes:
- /opt/dagster/apps/:/code/apps/:ro - /opt/dagster/apps/:/code/apps/:ro
- /opt/dagster/shared/:/code/shared/:ro - /opt/dagster/shared/:/code/shared/:ro
- /opt/dagster/storage/:/storage/:rw
- /opt/dagster/logs/:/logs:rw - /opt/dagster/logs/:/logs:rw
- ${DATA_DIR}:/storage/:rw
- ${CACHE_DIR}:/cache:rw - ${CACHE_DIR}:/cache:rw
run_storage: run_storage:

View File

@@ -6,6 +6,10 @@ from pydantic import Field, PrivateAttr
from upath import UPath from upath import UPath
import dagster as dg import dagster as dg
from dagster import (
InputContext,
OutputContext,
)
def _process_env_vars(config: dict[str, Any]) -> dict[str, Any]: def _process_env_vars(config: dict[str, Any]) -> dict[str, Any]:
@@ -60,12 +64,26 @@ class BaseIOManager(dg.ConfigurableIOManager, dg.UPathIOManager, ABC):
with path.open("r") as fp: with path.open("r") as fp:
return json.load(fp) return json.load(fp)
def get_path_for_partition(
self, context: InputContext | OutputContext, path: "UPath", partition: str
) -> UPath:
"""Use path from metadata when provided."""
ic()
context_metadata = context.output_metadata or {}
ic(context_metadata)
if "path" in context_metadata:
return UPath(*context_metadata["path"].value)
return super().get_path_for_partition(context)
def get_asset_relative_path( def get_asset_relative_path(
self, context: dg.InputContext | dg.OutputContext self, context: dg.InputContext | dg.OutputContext
) -> UPath: ) -> UPath:
"""Get the relative path for the asset based on context metadata.""" """Get the relative path for the asset based on context metadata."""
ic()
context_metadata = context.output_metadata or {} context_metadata = context.output_metadata or {}
ic(context_metadata) ic(context_metadata)
path_prefix = ( path_prefix = (
context_metadata["path_prefix"].value context_metadata["path_prefix"].value
if "path_prefix" in context_metadata if "path_prefix" in context_metadata