diff --git a/apps/stocks/src/assets.py b/apps/stocks/src/assets.py index b97d856..3e8c7c9 100644 --- a/apps/stocks/src/assets.py +++ b/apps/stocks/src/assets.py @@ -1,8 +1,12 @@ import asyncio +from collections.abc import Iterator +from datetime import datetime, timezone from functools import partial from config import APP, URL +from partitions import daily_partitions_def from playwright.async_api import async_playwright +from utils import extract_date import dagster as dg @@ -64,11 +68,40 @@ async def main() -> str: print("Scraping done") # Save the page content to a file - with open("scraped_page.html", "w") as f: - f.write(page_source) - return page_source + with open("/cache/scraped_page.html", "w") as fp: + fp.write(page_source) + + return page_source @asset(io_manager_key="html_io_manager", name="raw") -def raw_html() -> str: - return asyncio.run(main()) +def raw_html(context: dg.AssetExecutionContext) -> Iterator[dg.Output[str]]: + page_source = asyncio.run(main()) + + now = datetime.now(tz=timezone.utc) + date_str = now.strftime("%Y-%m-%d") + time_str = now.strftime("%H:%M:%S") + yield dg.Output( + page_source, + metadata={ + "date": dg.MetadataValue.timestamp(now), + "path_suffix": [date_str, time_str], + }, + ) + + try: + date_obj = next(extract_date(page_source)) + date_str = date_obj.strftime("%Y-%m-%d") + context.log.info(f"Found date: {date_str}") + context.log_event( + dg.AssetMaterialization(asset_key=daily_html.key, partition=date_str) + ) + except Exception as e: + context.log.error(f"Parsing error: {e}") + + +@asset( + io_manager_key="html_io_manager", + partitions_def=daily_partitions_def, +) +def daily_html() -> str: ... diff --git a/apps/stocks/src/partitions.py b/apps/stocks/src/partitions.py new file mode 100644 index 0000000..19d635d --- /dev/null +++ b/apps/stocks/src/partitions.py @@ -0,0 +1,7 @@ +import os + +import dagster as dg + +daily_partitions_def = dg.DailyPartitionsDefinition( + start_date="2025-07-01", end_offset=1, timezone=os.environ.get("TZ", "UTC") +) diff --git a/apps/stocks/src/sensors.py b/apps/stocks/src/sensors.py index e819db7..6f1625b 100644 --- a/apps/stocks/src/sensors.py +++ b/apps/stocks/src/sensors.py @@ -1,11 +1,10 @@ -import re from collections.abc import Iterator from datetime import datetime import jobs import requests -from bs4 import BeautifulSoup from config import URL +from utils import extract_date import dagster as dg @@ -18,25 +17,13 @@ def check_update(context: dg.SensorEvaluationContext) -> Iterator[dg.RunRequest] response.raise_for_status() try: - # Parse with BeautifulSoup - soup = BeautifulSoup(response.text, "html.parser") - - # Find the first