parse stocks pages

This commit is contained in:
2025-08-04 18:05:53 +02:00
parent f7318e85cd
commit 17ca8669ef
5 changed files with 138 additions and 70 deletions

View File

@@ -2,81 +2,30 @@ import asyncio
from collections.abc import Iterator from collections.abc import Iterator
from datetime import datetime, timezone from datetime import datetime, timezone
from functools import partial from functools import partial
from pathlib import Path
import structlog
from config import APP, URL from config import APP, URL
from partitions import daily_partitions_def from partitions import (
from playwright.async_api import async_playwright daily_partitions_def,
from utils import extract_date daily_table_partitions_def,
table_partitions_def,
)
from utils.extracter import extract_date, extract_tables
from utils.scraper import scrape
from utils.text import slugify
import dagster as dg import dagster as dg
TAGS = {"app": APP} TAGS = {"app": APP}
asset = partial(dg.asset, key_prefix=APP, tags=TAGS) asset = partial(dg.asset, key_prefix=APP, tags=TAGS)
logger = structlog.get_logger()
async def main() -> str:
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True)
context = await browser.new_context(viewport={"width": 1000, "height": 2000})
page = await context.new_page()
await page.goto(URL, timeout=60000)
# Wait until at least one toggle button is present
await page.wait_for_selector(".toggle-btn", timeout=20000)
# Set zoom
await page.evaluate("document.body.style.zoom='50%'")
# Find all toggle buttons
toggle_buttons = await page.query_selector_all(".toggle-btn")
print(f"Found {len(toggle_buttons)} toggle buttons")
for i, btn in enumerate(toggle_buttons):
try:
# Ensure it's visible and enabled
if await btn.is_visible() and await btn.is_enabled():
await btn.click()
await page.wait_for_timeout(1000)
if i == len(toggle_buttons) - 1:
break
# Scroll down gradually
scroll_step = 500
total_height = await page.evaluate("() => document.body.scrollHeight")
current_position = 0
while current_position < total_height:
await page.evaluate(f"window.scrollTo(0, {current_position});")
await page.wait_for_timeout(100)
current_position += scroll_step
total_height = await page.evaluate(
"() => document.body.scrollHeight"
)
except Exception as e:
print(f"Skipped button due to error: {e}")
# Get the page content
page_source = await page.content()
# Close the browser
await browser.close()
# Continue scraping logic here...
print("Scraping done")
# Save the page content to a file
with open("/cache/scraped_page.html", "w") as fp:
fp.write(page_source)
return page_source
@asset(io_manager_key="html_io_manager", name="raw") @asset(io_manager_key="html_io_manager", name="raw")
def raw_html(context: dg.AssetExecutionContext) -> Iterator[dg.Output[str]]: def raw_html(context: dg.AssetExecutionContext) -> Iterator[dg.Output[str]]:
page_source = asyncio.run(main()) page_source = asyncio.run(scrape(url=URL))
now = datetime.now(tz=timezone.utc) now = datetime.now(tz=timezone.utc)
date_str = now.strftime("%Y-%m-%d") date_str = now.strftime("%Y-%m-%d")
@@ -94,18 +43,67 @@ def raw_html(context: dg.AssetExecutionContext) -> Iterator[dg.Output[str]]:
date_str = date_obj.strftime("%Y-%m-%d") date_str = date_obj.strftime("%Y-%m-%d")
context.log.info(f"Found date: {date_str}") context.log.info(f"Found date: {date_str}")
context.log_event( context.log_event(
dg.AssetMaterialization(asset_key=daily_html.key, partition=date_str) dg.AssetMaterialization(asset_key=raw_html.key, partition=date_str)
) )
except Exception as e: except Exception as e:
context.log.error(f"Parsing error: {e}") context.log.error(f"Parsing error: {e}")
@asset(deps=[raw_html], partitions_def=daily_table_partitions_def)
def daily_table() -> None: ...
@asset( @asset(
io_manager_key="html_io_manager", deps=[raw_html],
io_manager_key="json_io_manager",
partitions_def=daily_partitions_def, partitions_def=daily_partitions_def,
automation_condition=dg.AutomationCondition.eager(),
output_required=False,
) )
def daily_html() -> str: ... def raw_daily(context: dg.AssetExecutionContext) -> None:
base = (
Path(context.resources.json_io_manager.base_dir).joinpath(*raw_html.key.path)
/ context.partition_key
)
if files := list(base.glob("*.html")):
logger.info(f"Found {len(files)} html files")
page_source = open(files[-1]).read()
for title, description, df in extract_tables(page_source):
# TODO: when scraping click the "View Strategy Criteria" texts and record the
# information
if not title:
logger.info(
"No title!",
description=description,
num_rows=0 if df is None else len(df),
)
continue
class MyAssetConfig(dg.Config): if df is None:
image: str = "bla" logger.info("No data!", title=title, description=description)
continue
slug = slugify(title)
output_context = dg.build_output_context(
asset_key=dg.AssetKey(
[APP, "daily", context.partition_key, slug],
),
resources=context.resources.original_resource_dict,
)
context.resources.json_io_manager.handle_output(
output_context, df.to_dict(orient="records")
)
context.log_event(
dg.AssetMaterialization(
asset_key=daily_table.key,
partition=f"{context.partition_key}|{slug}",
metadata={
"title": dg.MetadataValue.text(title),
"slug": dg.MetadataValue.text(slug),
"description": dg.MetadataValue.text(description),
"rows": dg.MetadataValue.int(len(df)),
},
)
)
context.instance.add_dynamic_partitions(table_partitions_def.name, [slug])

View File

@@ -4,6 +4,7 @@ import sensors
from dagster_polars import PolarsParquetIOManager from dagster_polars import PolarsParquetIOManager
from icecream import install from icecream import install
from shared.config import APP, STORAGE_DIR from shared.config import APP, STORAGE_DIR
from shared.io_manager import JsonIOManager
from shared.io_manager.html import HtmlIOManager from shared.io_manager.html import HtmlIOManager
import dagster as dg import dagster as dg
@@ -20,8 +21,9 @@ definitions = dg.Definitions(
], ],
resources={ resources={
"html_io_manager": HtmlIOManager(base_dir=STORAGE_DIR), "html_io_manager": HtmlIOManager(base_dir=STORAGE_DIR),
"json_io_manager": JsonIOManager(base_dir=STORAGE_DIR),
"polars_parquet_io_manager": PolarsParquetIOManager(base_dir=STORAGE_DIR), "polars_parquet_io_manager": PolarsParquetIOManager(base_dir=STORAGE_DIR),
}, },
jobs=[jobs.raw_html_job], jobs=[jobs.raw_html_job],
sensors=[sensors.check_update], sensors=[sensors.check_update, sensors.parse_raw],
) )

View File

@@ -7,3 +7,8 @@ raw_html_job = dg.define_asset_job(
selection=[assets.raw_html.key], selection=[assets.raw_html.key],
tags={"docker/image": "dagster-code-stocks-playwright"}, tags={"docker/image": "dagster-code-stocks-playwright"},
) )
extract_job = dg.define_asset_job(
"extract_job",
selection=[assets.raw_daily.key],
)

View File

@@ -5,3 +5,9 @@ import dagster as dg
daily_partitions_def = dg.DailyPartitionsDefinition( daily_partitions_def = dg.DailyPartitionsDefinition(
start_date="2025-07-01", end_offset=1, timezone=os.environ.get("TZ", "UTC") start_date="2025-07-01", end_offset=1, timezone=os.environ.get("TZ", "UTC")
) )
table_partitions_def = dg.DynamicPartitionsDefinition(name="tables")
daily_table_partitions_def = dg.MultiPartitionsDefinition(
{"date": daily_partitions_def, "source": table_partitions_def}
)

View File

@@ -1,13 +1,18 @@
from collections.abc import Iterator from collections.abc import Iterator
from datetime import datetime from datetime import datetime
import assets
import jobs import jobs
import pendulum
import requests import requests
import structlog
from config import URL from config import URL
from utils import extract_date from utils.extracter import extract_date
import dagster as dg import dagster as dg
logger = structlog.get_logger()
@dg.sensor(job=jobs.raw_html_job, minimum_interval_seconds=4 * 60 * 60) @dg.sensor(job=jobs.raw_html_job, minimum_interval_seconds=4 * 60 * 60)
def check_update(context: dg.SensorEvaluationContext) -> Iterator[dg.RunRequest]: def check_update(context: dg.SensorEvaluationContext) -> Iterator[dg.RunRequest]:
@@ -32,3 +37,55 @@ def check_update(context: dg.SensorEvaluationContext) -> Iterator[dg.RunRequest]
context.log.info(f"Saving file: {file}") context.log.info(f"Saving file: {file}")
with open(f"/cache/{file}") as fp: with open(f"/cache/{file}") as fp:
fp.write(response.text) fp.write(response.text)
@dg.sensor(job=jobs.extract_job, minimum_interval_seconds=2 * 60 * 60)
def parse_raw(context: dg.SensorEvaluationContext):
# TODO: use cursor from sensor to filter materialization events
# Get the known materialized partitions of daily_tables
daily_partitions = context.instance.get_materialized_partitions(
assets.daily_table.key
)
dates = [x.split("|")[0] for x in daily_partitions]
ic(daily_partitions, dates)
# Get metadata for the raw asset (assumes it's tracked or logged with metadata)
events = list(
context.instance.get_event_records(
event_records_filter=dg.EventRecordsFilter(
event_type=dg.DagsterEventType.ASSET_MATERIALIZATION,
asset_key=assets.raw_html.key,
),
limit=100,
)
)
# Track unique dates found in raw that are not materialized in daily_tables
unknown_dates = set()
for event in events:
metadata = event.event_log_entry.asset_materialization.metadata
date_str = None
ic(metadata)
for key, entry in metadata.items():
# TODO: move this general logic
if key.lower() in {"date", "partition", "partition_date"}:
date_str = entry.value
break
if not date_str:
continue
# Normalize and validate the date
try:
dt = pendulum.from_timestamp(int(date_str))
date_str = dt.strftime("%Y-%m-%d")
except Exception as e:
logger.error(f"Failed to parse date: {date_str}", input=date_str, e=e)
continue
if date_str not in dates:
unknown_dates.add(date_str)
ic(unknown_dates)
for date_str in sorted(unknown_dates):
yield dg.RunRequest(partition_key=date_str)