diff --git a/apps/stocks/src/assets.py b/apps/stocks/src/assets.py index 6c4f984..4d23770 100644 --- a/apps/stocks/src/assets.py +++ b/apps/stocks/src/assets.py @@ -2,81 +2,30 @@ import asyncio from collections.abc import Iterator from datetime import datetime, timezone from functools import partial +from pathlib import Path +import structlog from config import APP, URL -from partitions import daily_partitions_def -from playwright.async_api import async_playwright -from utils import extract_date +from partitions import ( + daily_partitions_def, + daily_table_partitions_def, + table_partitions_def, +) +from utils.extracter import extract_date, extract_tables +from utils.scraper import scrape +from utils.text import slugify import dagster as dg TAGS = {"app": APP} asset = partial(dg.asset, key_prefix=APP, tags=TAGS) - -async def main() -> str: - async with async_playwright() as p: - browser = await p.chromium.launch(headless=True) - context = await browser.new_context(viewport={"width": 1000, "height": 2000}) - page = await context.new_page() - - await page.goto(URL, timeout=60000) - - # Wait until at least one toggle button is present - await page.wait_for_selector(".toggle-btn", timeout=20000) - - # Set zoom - await page.evaluate("document.body.style.zoom='50%'") - - # Find all toggle buttons - toggle_buttons = await page.query_selector_all(".toggle-btn") - print(f"Found {len(toggle_buttons)} toggle buttons") - - for i, btn in enumerate(toggle_buttons): - try: - # Ensure it's visible and enabled - if await btn.is_visible() and await btn.is_enabled(): - await btn.click() - await page.wait_for_timeout(1000) - - if i == len(toggle_buttons) - 1: - break - - # Scroll down gradually - scroll_step = 500 - total_height = await page.evaluate("() => document.body.scrollHeight") - current_position = 0 - - while current_position < total_height: - await page.evaluate(f"window.scrollTo(0, {current_position});") - await page.wait_for_timeout(100) - current_position += scroll_step - total_height = await page.evaluate( - "() => document.body.scrollHeight" - ) - - except Exception as e: - print(f"Skipped button due to error: {e}") - - # Get the page content - page_source = await page.content() - - # Close the browser - await browser.close() - - # Continue scraping logic here... - print("Scraping done") - - # Save the page content to a file - with open("/cache/scraped_page.html", "w") as fp: - fp.write(page_source) - - return page_source +logger = structlog.get_logger() @asset(io_manager_key="html_io_manager", name="raw") def raw_html(context: dg.AssetExecutionContext) -> Iterator[dg.Output[str]]: - page_source = asyncio.run(main()) + page_source = asyncio.run(scrape(url=URL)) now = datetime.now(tz=timezone.utc) date_str = now.strftime("%Y-%m-%d") @@ -94,18 +43,67 @@ def raw_html(context: dg.AssetExecutionContext) -> Iterator[dg.Output[str]]: date_str = date_obj.strftime("%Y-%m-%d") context.log.info(f"Found date: {date_str}") context.log_event( - dg.AssetMaterialization(asset_key=daily_html.key, partition=date_str) + dg.AssetMaterialization(asset_key=raw_html.key, partition=date_str) ) except Exception as e: context.log.error(f"Parsing error: {e}") +@asset(deps=[raw_html], partitions_def=daily_table_partitions_def) +def daily_table() -> None: ... + + @asset( - io_manager_key="html_io_manager", + deps=[raw_html], + io_manager_key="json_io_manager", partitions_def=daily_partitions_def, + automation_condition=dg.AutomationCondition.eager(), + output_required=False, ) -def daily_html() -> str: ... +def raw_daily(context: dg.AssetExecutionContext) -> None: + base = ( + Path(context.resources.json_io_manager.base_dir).joinpath(*raw_html.key.path) + / context.partition_key + ) + if files := list(base.glob("*.html")): + logger.info(f"Found {len(files)} html files") + page_source = open(files[-1]).read() + for title, description, df in extract_tables(page_source): + # TODO: when scraping click the "View Strategy Criteria" texts and record the + # information + if not title: + logger.info( + "No title!", + description=description, + num_rows=0 if df is None else len(df), + ) + continue -class MyAssetConfig(dg.Config): - image: str = "bla" + if df is None: + logger.info("No data!", title=title, description=description) + continue + + slug = slugify(title) + output_context = dg.build_output_context( + asset_key=dg.AssetKey( + [APP, "daily", context.partition_key, slug], + ), + resources=context.resources.original_resource_dict, + ) + context.resources.json_io_manager.handle_output( + output_context, df.to_dict(orient="records") + ) + context.log_event( + dg.AssetMaterialization( + asset_key=daily_table.key, + partition=f"{context.partition_key}|{slug}", + metadata={ + "title": dg.MetadataValue.text(title), + "slug": dg.MetadataValue.text(slug), + "description": dg.MetadataValue.text(description), + "rows": dg.MetadataValue.int(len(df)), + }, + ) + ) + context.instance.add_dynamic_partitions(table_partitions_def.name, [slug]) diff --git a/apps/stocks/src/definitions.py b/apps/stocks/src/definitions.py index ce1bef3..ce835d6 100644 --- a/apps/stocks/src/definitions.py +++ b/apps/stocks/src/definitions.py @@ -4,6 +4,7 @@ import sensors from dagster_polars import PolarsParquetIOManager from icecream import install from shared.config import APP, STORAGE_DIR +from shared.io_manager import JsonIOManager from shared.io_manager.html import HtmlIOManager import dagster as dg @@ -20,8 +21,9 @@ definitions = dg.Definitions( ], resources={ "html_io_manager": HtmlIOManager(base_dir=STORAGE_DIR), + "json_io_manager": JsonIOManager(base_dir=STORAGE_DIR), "polars_parquet_io_manager": PolarsParquetIOManager(base_dir=STORAGE_DIR), }, jobs=[jobs.raw_html_job], - sensors=[sensors.check_update], + sensors=[sensors.check_update, sensors.parse_raw], ) diff --git a/apps/stocks/src/jobs.py b/apps/stocks/src/jobs.py index 1c00465..4c78ddf 100644 --- a/apps/stocks/src/jobs.py +++ b/apps/stocks/src/jobs.py @@ -7,3 +7,8 @@ raw_html_job = dg.define_asset_job( selection=[assets.raw_html.key], tags={"docker/image": "dagster-code-stocks-playwright"}, ) + +extract_job = dg.define_asset_job( + "extract_job", + selection=[assets.raw_daily.key], +) diff --git a/apps/stocks/src/partitions.py b/apps/stocks/src/partitions.py index 19d635d..0c1e454 100644 --- a/apps/stocks/src/partitions.py +++ b/apps/stocks/src/partitions.py @@ -5,3 +5,9 @@ import dagster as dg daily_partitions_def = dg.DailyPartitionsDefinition( start_date="2025-07-01", end_offset=1, timezone=os.environ.get("TZ", "UTC") ) + +table_partitions_def = dg.DynamicPartitionsDefinition(name="tables") + +daily_table_partitions_def = dg.MultiPartitionsDefinition( + {"date": daily_partitions_def, "source": table_partitions_def} +) diff --git a/apps/stocks/src/sensors.py b/apps/stocks/src/sensors.py index 2e50736..bfffb6d 100644 --- a/apps/stocks/src/sensors.py +++ b/apps/stocks/src/sensors.py @@ -1,13 +1,18 @@ from collections.abc import Iterator from datetime import datetime +import assets import jobs +import pendulum import requests +import structlog from config import URL -from utils import extract_date +from utils.extracter import extract_date import dagster as dg +logger = structlog.get_logger() + @dg.sensor(job=jobs.raw_html_job, minimum_interval_seconds=4 * 60 * 60) def check_update(context: dg.SensorEvaluationContext) -> Iterator[dg.RunRequest]: @@ -32,3 +37,55 @@ def check_update(context: dg.SensorEvaluationContext) -> Iterator[dg.RunRequest] context.log.info(f"Saving file: {file}") with open(f"/cache/{file}") as fp: fp.write(response.text) + + +@dg.sensor(job=jobs.extract_job, minimum_interval_seconds=2 * 60 * 60) +def parse_raw(context: dg.SensorEvaluationContext): + # TODO: use cursor from sensor to filter materialization events + + # Get the known materialized partitions of daily_tables + daily_partitions = context.instance.get_materialized_partitions( + assets.daily_table.key + ) + dates = [x.split("|")[0] for x in daily_partitions] + ic(daily_partitions, dates) + + # Get metadata for the raw asset (assumes it's tracked or logged with metadata) + events = list( + context.instance.get_event_records( + event_records_filter=dg.EventRecordsFilter( + event_type=dg.DagsterEventType.ASSET_MATERIALIZATION, + asset_key=assets.raw_html.key, + ), + limit=100, + ) + ) + + # Track unique dates found in raw that are not materialized in daily_tables + unknown_dates = set() + for event in events: + metadata = event.event_log_entry.asset_materialization.metadata + date_str = None + ic(metadata) + for key, entry in metadata.items(): + # TODO: move this general logic + if key.lower() in {"date", "partition", "partition_date"}: + date_str = entry.value + break + if not date_str: + continue + + # Normalize and validate the date + try: + dt = pendulum.from_timestamp(int(date_str)) + date_str = dt.strftime("%Y-%m-%d") + except Exception as e: + logger.error(f"Failed to parse date: {date_str}", input=date_str, e=e) + continue + + if date_str not in dates: + unknown_dates.add(date_str) + + ic(unknown_dates) + for date_str in sorted(unknown_dates): + yield dg.RunRequest(partition_key=date_str)