Compare commits
4 Commits
aa4a2fa5b1
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 87924620fd | |||
| b15aaaa0dc | |||
| 3f99f354de | |||
| 204565118b |
@@ -8,31 +8,33 @@ async def scrape(url: str) -> str:
|
||||
|
||||
await page.goto(url, timeout=60000)
|
||||
|
||||
# Wait until at least one toggle button is present
|
||||
await page.wait_for_selector(".toggle-btn", timeout=20000)
|
||||
# Wait until buttons are available
|
||||
await page.wait_for_selector('div[role="button"][aria-expanded]', timeout=20000)
|
||||
|
||||
# Set zoom
|
||||
# Zoom out for full view
|
||||
await page.evaluate("document.body.style.zoom='50%'")
|
||||
|
||||
# Find all toggle buttons
|
||||
toggle_buttons = await page.query_selector_all(".toggle-btn")
|
||||
print(f"Found {len(toggle_buttons)} toggle buttons")
|
||||
# Find collapsible buttons
|
||||
toggle_buttons = await page.query_selector_all(
|
||||
'div[role="button"][aria-expanded]'
|
||||
)
|
||||
print(f"Found {len(toggle_buttons)} expandable buttons")
|
||||
|
||||
for i, btn in enumerate(toggle_buttons):
|
||||
try:
|
||||
# Ensure it's visible and enabled
|
||||
if await btn.is_visible() and await btn.is_enabled():
|
||||
await btn.click()
|
||||
await page.wait_for_timeout(1000)
|
||||
aria_expanded = await btn.get_attribute("aria-expanded")
|
||||
if aria_expanded == "false":
|
||||
if await btn.is_visible() and await btn.is_enabled():
|
||||
await btn.click()
|
||||
await page.wait_for_timeout(1000)
|
||||
|
||||
if i == len(toggle_buttons) - 1:
|
||||
break
|
||||
|
||||
# Scroll down gradually
|
||||
# Scroll gradually
|
||||
scroll_step = 500
|
||||
total_height = await page.evaluate("() => document.body.scrollHeight")
|
||||
current_position = 0
|
||||
|
||||
while current_position < total_height:
|
||||
await page.evaluate(f"window.scrollTo(0, {current_position});")
|
||||
await page.wait_for_timeout(100)
|
||||
@@ -44,17 +46,14 @@ async def scrape(url: str) -> str:
|
||||
except Exception as e:
|
||||
print(f"Skipped button due to error: {e}")
|
||||
|
||||
# Get the page content
|
||||
# Capture expanded HTML
|
||||
page_source = await page.content()
|
||||
|
||||
# Close the browser
|
||||
await browser.close()
|
||||
|
||||
# Continue scraping logic here...
|
||||
print("Scraping done")
|
||||
|
||||
# Save the page content to a file
|
||||
# Save to file
|
||||
with open("/cache/scraped_page.html", "w") as fp:
|
||||
fp.write(page_source)
|
||||
|
||||
print("Scraping done")
|
||||
|
||||
return page_source
|
||||
|
||||
@@ -102,13 +102,15 @@ def raw_weather(context: dg.AssetExecutionContext) -> Any:
|
||||
now = datetime.now(tz=timezone.utc)
|
||||
date_str = now.strftime("%Y-%m-%d")
|
||||
time_str = now.strftime("%H:%M:%S")
|
||||
|
||||
latitude_str, longitude_str = partition_key[:5], partition_key[5:]
|
||||
yield dg.Output(
|
||||
data,
|
||||
metadata={
|
||||
"date": dg.MetadataValue.timestamp(now),
|
||||
"latitude": dg.MetadataValue.float(latitude),
|
||||
"longitude": dg.MetadataValue.float(longitude),
|
||||
"path_suffix": [date_str, time_str],
|
||||
"path": [APP, "raw", date_str, latitude_str, longitude_str, time_str],
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
@@ -39,9 +39,10 @@ def list_locations(context: dg.SensorEvaluationContext) -> dg.SensorResult:
|
||||
if new_locations:
|
||||
context.log.info(f"Discovered {len(new_locations)} new locations.")
|
||||
|
||||
# Limit to 3 new locations
|
||||
return dg.SensorResult(
|
||||
run_requests=[],
|
||||
run_requests=[
|
||||
dg.RunRequest(partition_key=location) for location in new_locations[:3]
|
||||
],
|
||||
dynamic_partitions_requests=[
|
||||
location_partitions_def.build_add_request(new_locations),
|
||||
latitude_partitions_def.build_add_request(new_latitudes),
|
||||
|
||||
@@ -35,8 +35,8 @@ services:
|
||||
- /opt/dagster/apps/:/code/apps/:ro
|
||||
- /opt/dagster/shared/:/code/shared/:ro
|
||||
- /opt/dagster/logs/:/logs:rw
|
||||
- /opt/dagster/storage/import/:/storage/import/:ro
|
||||
- /opt/dagster/storage/deals/:/storage/deals/:rw
|
||||
# - /mnt/mezzo/scratch/dagster/import/:/storage/import/:ro
|
||||
- /mnt/mezzo/scratch/dagster/deals/:/storage/deals/:rw
|
||||
networks:
|
||||
- dagster
|
||||
|
||||
|
||||
@@ -6,6 +6,7 @@ x-postgres-env: &postgres_env
|
||||
POSTGRES_DB: ${POSTGRES_DB}
|
||||
x-system-env: &system_env
|
||||
TZ: Europe/Amsterdam
|
||||
DATA_DIR: ${DATA_DIR}
|
||||
CACHE_DIR: /tmp/cache
|
||||
x-dagster-env: &dagster_env
|
||||
DAGSTER_POSTGRES_HOST: ${POSTGRES_HOST}
|
||||
@@ -26,7 +27,7 @@ x-volumes: &volumes
|
||||
- /opt/dagster/dagster.yaml:/opt/dagster/home/dagster.yaml.template:ro
|
||||
- /opt/dagster/workspace.yaml:/opt/dagster/home/workspace.yaml:ro
|
||||
- /opt/dagster/system/:/code/system/:ro
|
||||
- /opt/dagster/storage/:/storage/:rw
|
||||
- /mnt/mezzo/scratch/dagster/:/storage/:rw
|
||||
- /opt/dagster/logs/:/logs:rw
|
||||
- /var/run/docker.sock:/var/run/docker.sock:rw
|
||||
|
||||
|
||||
@@ -21,6 +21,7 @@ run_launcher:
|
||||
- DAGSTER_POSTGRES_USER
|
||||
- DAGSTER_POSTGRES_PASSWORD
|
||||
- DAGSTER_POSTGRES_DB
|
||||
- DATA_DIR
|
||||
- SMTP_SERVER
|
||||
- SMTP_PORT
|
||||
- SMTP_USERNAME
|
||||
@@ -33,8 +34,8 @@ run_launcher:
|
||||
volumes:
|
||||
- /opt/dagster/apps/:/code/apps/:ro
|
||||
- /opt/dagster/shared/:/code/shared/:ro
|
||||
- /opt/dagster/storage/:/storage/:rw
|
||||
- /opt/dagster/logs/:/logs:rw
|
||||
- ${DATA_DIR}:/storage/:rw
|
||||
- ${CACHE_DIR}:/cache:rw
|
||||
|
||||
run_storage:
|
||||
|
||||
@@ -6,6 +6,10 @@ from pydantic import Field, PrivateAttr
|
||||
from upath import UPath
|
||||
|
||||
import dagster as dg
|
||||
from dagster import (
|
||||
InputContext,
|
||||
OutputContext,
|
||||
)
|
||||
|
||||
|
||||
def _process_env_vars(config: dict[str, Any]) -> dict[str, Any]:
|
||||
@@ -60,12 +64,26 @@ class BaseIOManager(dg.ConfigurableIOManager, dg.UPathIOManager, ABC):
|
||||
with path.open("r") as fp:
|
||||
return json.load(fp)
|
||||
|
||||
def get_path_for_partition(
|
||||
self, context: InputContext | OutputContext, path: "UPath", partition: str
|
||||
) -> UPath:
|
||||
"""Use path from metadata when provided."""
|
||||
ic()
|
||||
context_metadata = context.output_metadata or {}
|
||||
ic(context_metadata)
|
||||
|
||||
if "path" in context_metadata:
|
||||
return UPath(*context_metadata["path"].value)
|
||||
return super().get_path_for_partition(context)
|
||||
|
||||
def get_asset_relative_path(
|
||||
self, context: dg.InputContext | dg.OutputContext
|
||||
) -> UPath:
|
||||
"""Get the relative path for the asset based on context metadata."""
|
||||
ic()
|
||||
context_metadata = context.output_metadata or {}
|
||||
ic(context_metadata)
|
||||
|
||||
path_prefix = (
|
||||
context_metadata["path_prefix"].value
|
||||
if "path_prefix" in context_metadata
|
||||
|
||||
Reference in New Issue
Block a user