Files
dagster/apps/stocks/src/sensors.py

48 lines
1.5 KiB
Python

import re
from collections.abc import Iterator
from datetime import datetime
import jobs
import requests
from bs4 import BeautifulSoup
from config import URL
import dagster as dg
@dg.sensor(job=jobs.raw_html_job)
def check_update(context: dg.SensorEvaluationContext) -> Iterator[dg.RunRequest]:
ic(context.cursor)
response = requests.get(URL)
response.raise_for_status()
try:
# Parse with BeautifulSoup
soup = BeautifulSoup(response.text, "html.parser")
# Find the first <div> after </header>
if (header := soup.find("header")) and (div := header.find_next_sibling("div")):
# Extract date part using regex
match = re.search(
r"(\d{1,2})(st|nd|rd|th)?\s+([A-Za-z]+)\s+(\d{4})", div.text
)
if match:
day, _, month, year = match.groups()
date = datetime.strptime(f"{day} {month} {year}", "%d %B %Y")
date_str = date.strftime("%Y-%m-%d ")
context.log.info(f"Found date: {date_str}")
if date_str > context.cursor:
context.update_cursor(date_str)
yield dg.RunRequest()
return
except Exception as e:
context.log.error(f"Parsing error: {e}")
now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
file = f"{now_str} stocks.html"
context.log.info(f"Saving file: {file}")
with open(f"/cache/{file}") as fp:
fp.write(response.text)