store and register scrape
This commit is contained in:
@@ -1,8 +1,12 @@
|
|||||||
import asyncio
|
import asyncio
|
||||||
|
from collections.abc import Iterator
|
||||||
|
from datetime import datetime, timezone
|
||||||
from functools import partial
|
from functools import partial
|
||||||
|
|
||||||
from config import APP, URL
|
from config import APP, URL
|
||||||
|
from partitions import daily_partitions_def
|
||||||
from playwright.async_api import async_playwright
|
from playwright.async_api import async_playwright
|
||||||
|
from utils import extract_date
|
||||||
|
|
||||||
import dagster as dg
|
import dagster as dg
|
||||||
|
|
||||||
@@ -64,11 +68,40 @@ async def main() -> str:
|
|||||||
print("Scraping done")
|
print("Scraping done")
|
||||||
|
|
||||||
# Save the page content to a file
|
# Save the page content to a file
|
||||||
with open("scraped_page.html", "w") as f:
|
with open("/cache/scraped_page.html", "w") as fp:
|
||||||
f.write(page_source)
|
fp.write(page_source)
|
||||||
|
|
||||||
return page_source
|
return page_source
|
||||||
|
|
||||||
|
|
||||||
@asset(io_manager_key="html_io_manager", name="raw")
|
@asset(io_manager_key="html_io_manager", name="raw")
|
||||||
def raw_html() -> str:
|
def raw_html(context: dg.AssetExecutionContext) -> Iterator[dg.Output[str]]:
|
||||||
return asyncio.run(main())
|
page_source = asyncio.run(main())
|
||||||
|
|
||||||
|
now = datetime.now(tz=timezone.utc)
|
||||||
|
date_str = now.strftime("%Y-%m-%d")
|
||||||
|
time_str = now.strftime("%H:%M:%S")
|
||||||
|
yield dg.Output(
|
||||||
|
page_source,
|
||||||
|
metadata={
|
||||||
|
"date": dg.MetadataValue.timestamp(now),
|
||||||
|
"path_suffix": [date_str, time_str],
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
date_obj = next(extract_date(page_source))
|
||||||
|
date_str = date_obj.strftime("%Y-%m-%d")
|
||||||
|
context.log.info(f"Found date: {date_str}")
|
||||||
|
context.log_event(
|
||||||
|
dg.AssetMaterialization(asset_key=daily_html.key, partition=date_str)
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
context.log.error(f"Parsing error: {e}")
|
||||||
|
|
||||||
|
|
||||||
|
@asset(
|
||||||
|
io_manager_key="html_io_manager",
|
||||||
|
partitions_def=daily_partitions_def,
|
||||||
|
)
|
||||||
|
def daily_html() -> str: ...
|
||||||
|
|||||||
7
apps/stocks/src/partitions.py
Normal file
7
apps/stocks/src/partitions.py
Normal file
@@ -0,0 +1,7 @@
|
|||||||
|
import os
|
||||||
|
|
||||||
|
import dagster as dg
|
||||||
|
|
||||||
|
daily_partitions_def = dg.DailyPartitionsDefinition(
|
||||||
|
start_date="2025-07-01", end_offset=1, timezone=os.environ.get("TZ", "UTC")
|
||||||
|
)
|
||||||
@@ -1,11 +1,10 @@
|
|||||||
import re
|
|
||||||
from collections.abc import Iterator
|
from collections.abc import Iterator
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
|
|
||||||
import jobs
|
import jobs
|
||||||
import requests
|
import requests
|
||||||
from bs4 import BeautifulSoup
|
|
||||||
from config import URL
|
from config import URL
|
||||||
|
from utils import extract_date
|
||||||
|
|
||||||
import dagster as dg
|
import dagster as dg
|
||||||
|
|
||||||
@@ -18,21 +17,9 @@ def check_update(context: dg.SensorEvaluationContext) -> Iterator[dg.RunRequest]
|
|||||||
response.raise_for_status()
|
response.raise_for_status()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Parse with BeautifulSoup
|
date_obj = next(extract_date(response.text))
|
||||||
soup = BeautifulSoup(response.text, "html.parser")
|
date_str = date_obj.strftime("%Y-%m-%d")
|
||||||
|
|
||||||
# Find the first <div> after </header>
|
|
||||||
if (header := soup.find("header")) and (div := header.find_next_sibling("div")):
|
|
||||||
# Extract date part using regex
|
|
||||||
match = re.search(
|
|
||||||
r"(\d{1,2})(st|nd|rd|th)?\s+([A-Za-z]+)\s+(\d{4})", div.text
|
|
||||||
)
|
|
||||||
if match:
|
|
||||||
day, _, month, year = match.groups()
|
|
||||||
date = datetime.strptime(f"{day} {month} {year}", "%d %B %Y")
|
|
||||||
date_str = date.strftime("%Y-%m-%d ")
|
|
||||||
context.log.info(f"Found date: {date_str}")
|
context.log.info(f"Found date: {date_str}")
|
||||||
|
|
||||||
if date_str > context.cursor:
|
if date_str > context.cursor:
|
||||||
context.update_cursor(date_str)
|
context.update_cursor(date_str)
|
||||||
yield dg.RunRequest()
|
yield dg.RunRequest()
|
||||||
|
|||||||
19
apps/stocks/src/utils.py
Normal file
19
apps/stocks/src/utils.py
Normal file
@@ -0,0 +1,19 @@
|
|||||||
|
import re
|
||||||
|
from collections.abc import Iterator
|
||||||
|
from datetime import date, datetime
|
||||||
|
|
||||||
|
from bs4 import BeautifulSoup
|
||||||
|
|
||||||
|
|
||||||
|
def extract_date(page_source: str) -> Iterator[date]:
|
||||||
|
# Parse with BeautifulSoup
|
||||||
|
soup = BeautifulSoup(page_source, "html.parser")
|
||||||
|
|
||||||
|
# Find the first <div> after </header>
|
||||||
|
if (header := soup.find("header")) and (div := header.find_next_sibling("div")):
|
||||||
|
# Extract date part using regex
|
||||||
|
match = re.search(r"(\d{1,2})(st|nd|rd|th)?\s+([A-Za-z]+)\s+(\d{4})", div.text)
|
||||||
|
if match:
|
||||||
|
day, _, month, year = match.groups()
|
||||||
|
date_obj = datetime.strptime(f"{day} {month} {year}", "%d %B %Y")
|
||||||
|
yield date_obj
|
||||||
Reference in New Issue
Block a user