from collections.abc import Iterator from datetime import datetime import jobs import requests from config import URL from utils import extract_date import dagster as dg @dg.sensor(job=jobs.raw_html_job, minimum_interval_seconds=4 * 60 * 60) def check_update(context: dg.SensorEvaluationContext) -> Iterator[dg.RunRequest]: ic(context.cursor) response = requests.get(URL) response.raise_for_status() try: date_obj = next(extract_date(response.text)) date_str = date_obj.strftime("%Y-%m-%d") context.log.info(f"Found date: {date_str}") if date_str > context.cursor: context.update_cursor(date_str) yield dg.RunRequest() return except Exception as e: context.log.error(f"Parsing error: {e}") now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S") file = f"{now_str} stocks.html" context.log.info(f"Saving file: {file}") with open(f"/cache/{file}") as fp: fp.write(response.text)