From 02db619c6d8ef0b8b372f60af52d8c419d45c90d Mon Sep 17 00:00:00 2001 From: Rik Veenboer Date: Tue, 29 Jul 2025 17:22:42 +0200 Subject: [PATCH] store and register scrape --- apps/stocks/src/assets.py | 43 +++++++++++++++++++++++++++++++---- apps/stocks/src/partitions.py | 7 ++++++ apps/stocks/src/sensors.py | 29 +++++++---------------- apps/stocks/src/utils.py | 19 ++++++++++++++++ 4 files changed, 72 insertions(+), 26 deletions(-) create mode 100644 apps/stocks/src/partitions.py create mode 100644 apps/stocks/src/utils.py diff --git a/apps/stocks/src/assets.py b/apps/stocks/src/assets.py index b97d856..3e8c7c9 100644 --- a/apps/stocks/src/assets.py +++ b/apps/stocks/src/assets.py @@ -1,8 +1,12 @@ import asyncio +from collections.abc import Iterator +from datetime import datetime, timezone from functools import partial from config import APP, URL +from partitions import daily_partitions_def from playwright.async_api import async_playwright +from utils import extract_date import dagster as dg @@ -64,11 +68,40 @@ async def main() -> str: print("Scraping done") # Save the page content to a file - with open("scraped_page.html", "w") as f: - f.write(page_source) - return page_source + with open("/cache/scraped_page.html", "w") as fp: + fp.write(page_source) + + return page_source @asset(io_manager_key="html_io_manager", name="raw") -def raw_html() -> str: - return asyncio.run(main()) +def raw_html(context: dg.AssetExecutionContext) -> Iterator[dg.Output[str]]: + page_source = asyncio.run(main()) + + now = datetime.now(tz=timezone.utc) + date_str = now.strftime("%Y-%m-%d") + time_str = now.strftime("%H:%M:%S") + yield dg.Output( + page_source, + metadata={ + "date": dg.MetadataValue.timestamp(now), + "path_suffix": [date_str, time_str], + }, + ) + + try: + date_obj = next(extract_date(page_source)) + date_str = date_obj.strftime("%Y-%m-%d") + context.log.info(f"Found date: {date_str}") + context.log_event( + dg.AssetMaterialization(asset_key=daily_html.key, partition=date_str) + ) + except Exception as e: + context.log.error(f"Parsing error: {e}") + + +@asset( + io_manager_key="html_io_manager", + partitions_def=daily_partitions_def, +) +def daily_html() -> str: ... diff --git a/apps/stocks/src/partitions.py b/apps/stocks/src/partitions.py new file mode 100644 index 0000000..19d635d --- /dev/null +++ b/apps/stocks/src/partitions.py @@ -0,0 +1,7 @@ +import os + +import dagster as dg + +daily_partitions_def = dg.DailyPartitionsDefinition( + start_date="2025-07-01", end_offset=1, timezone=os.environ.get("TZ", "UTC") +) diff --git a/apps/stocks/src/sensors.py b/apps/stocks/src/sensors.py index e819db7..6f1625b 100644 --- a/apps/stocks/src/sensors.py +++ b/apps/stocks/src/sensors.py @@ -1,11 +1,10 @@ -import re from collections.abc import Iterator from datetime import datetime import jobs import requests -from bs4 import BeautifulSoup from config import URL +from utils import extract_date import dagster as dg @@ -18,25 +17,13 @@ def check_update(context: dg.SensorEvaluationContext) -> Iterator[dg.RunRequest] response.raise_for_status() try: - # Parse with BeautifulSoup - soup = BeautifulSoup(response.text, "html.parser") - - # Find the first
after - if (header := soup.find("header")) and (div := header.find_next_sibling("div")): - # Extract date part using regex - match = re.search( - r"(\d{1,2})(st|nd|rd|th)?\s+([A-Za-z]+)\s+(\d{4})", div.text - ) - if match: - day, _, month, year = match.groups() - date = datetime.strptime(f"{day} {month} {year}", "%d %B %Y") - date_str = date.strftime("%Y-%m-%d ") - context.log.info(f"Found date: {date_str}") - - if date_str > context.cursor: - context.update_cursor(date_str) - yield dg.RunRequest() - return + date_obj = next(extract_date(response.text)) + date_str = date_obj.strftime("%Y-%m-%d") + context.log.info(f"Found date: {date_str}") + if date_str > context.cursor: + context.update_cursor(date_str) + yield dg.RunRequest() + return except Exception as e: context.log.error(f"Parsing error: {e}") diff --git a/apps/stocks/src/utils.py b/apps/stocks/src/utils.py new file mode 100644 index 0000000..f7ffe72 --- /dev/null +++ b/apps/stocks/src/utils.py @@ -0,0 +1,19 @@ +import re +from collections.abc import Iterator +from datetime import date, datetime + +from bs4 import BeautifulSoup + + +def extract_date(page_source: str) -> Iterator[date]: + # Parse with BeautifulSoup + soup = BeautifulSoup(page_source, "html.parser") + + # Find the first
after + if (header := soup.find("header")) and (div := header.find_next_sibling("div")): + # Extract date part using regex + match = re.search(r"(\d{1,2})(st|nd|rd|th)?\s+([A-Za-z]+)\s+(\d{4})", div.text) + if match: + day, _, month, year = match.groups() + date_obj = datetime.strptime(f"{day} {month} {year}", "%d %B %Y") + yield date_obj