move utils

This commit is contained in:
2025-08-04 18:02:00 +02:00
parent a3d931c1b3
commit f7318e85cd
5 changed files with 126 additions and 19 deletions

View File

@@ -1,19 +0,0 @@
import re
from collections.abc import Iterator
from datetime import date, datetime
from bs4 import BeautifulSoup
def extract_date(page_source: str) -> Iterator[date]:
# Parse with BeautifulSoup
soup = BeautifulSoup(page_source, "html.parser")
# Find the first <div> after </header>
if (header := soup.find("header")) and (div := header.find_next_sibling("div")):
# Extract date part using regex
match = re.search(r"(\d{1,2})(st|nd|rd|th)?\s+([A-Za-z]+)\s+(\d{4})", div.text)
if match:
day, _, month, year = match.groups()
date_obj = datetime.strptime(f"{day} {month} {year}", "%d %B %Y")
yield date_obj

View File

View File

@@ -0,0 +1,55 @@
import re
from collections.abc import Iterator
from datetime import date, datetime
import pandas as pd
from bs4 import BeautifulSoup
from pandas import DataFrame
def extract_date(page_source: str) -> Iterator[date]:
# Parse with BeautifulSoup
soup = BeautifulSoup(page_source, "html.parser")
# Find the first <div> after </header>
if (header := soup.find("header")) and (div := header.find_next_sibling("div")):
# Extract date part using regex
match = re.search(r"(\d{1,2})(st|nd|rd|th)?\s+([A-Za-z]+)\s+(\d{4})", div.text)
if match:
day, _, month, year = match.groups()
date_obj = datetime.strptime(f"{day} {month} {year}", "%d %B %Y")
yield date_obj
def extract_tables(
page_source: str,
) -> Iterator[tuple[str | None, str | None, DataFrame]]:
soup = BeautifulSoup(page_source, "html.parser")
accordion_items = soup.find_all("div", class_="accordion-item")
for item in accordion_items:
# Extract the title
header = item.find("div", class_="accordion-header")
title = header.find("h2").get_text(strip=True) if header else None
# Extract the description
description_block = item.find("div", class_="accordion-description")
description = (
description_block.find("p").get_text(strip=True)
if description_block
else None
)
# Extract the table
table = item.find("table")
if table:
rows = []
for row in table.find_all("tr"):
cells = [
cell.get_text(strip=True) for cell in row.find_all(["th", "td"])
]
rows.append(cells)
if rows:
df = pd.DataFrame(rows[1:], columns=rows[0])
yield title, description, df

View File

@@ -0,0 +1,60 @@
async def scrape(url: str) -> str:
from playwright.async_api import async_playwright
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True)
context = await browser.new_context(viewport={"width": 1000, "height": 2000})
page = await context.new_page()
await page.goto(url, timeout=60000)
# Wait until at least one toggle button is present
await page.wait_for_selector(".toggle-btn", timeout=20000)
# Set zoom
await page.evaluate("document.body.style.zoom='50%'")
# Find all toggle buttons
toggle_buttons = await page.query_selector_all(".toggle-btn")
print(f"Found {len(toggle_buttons)} toggle buttons")
for i, btn in enumerate(toggle_buttons):
try:
# Ensure it's visible and enabled
if await btn.is_visible() and await btn.is_enabled():
await btn.click()
await page.wait_for_timeout(1000)
if i == len(toggle_buttons) - 1:
break
# Scroll down gradually
scroll_step = 500
total_height = await page.evaluate("() => document.body.scrollHeight")
current_position = 0
while current_position < total_height:
await page.evaluate(f"window.scrollTo(0, {current_position});")
await page.wait_for_timeout(100)
current_position += scroll_step
total_height = await page.evaluate(
"() => document.body.scrollHeight"
)
except Exception as e:
print(f"Skipped button due to error: {e}")
# Get the page content
page_source = await page.content()
# Close the browser
await browser.close()
# Continue scraping logic here...
print("Scraping done")
# Save the page content to a file
with open("/cache/scraped_page.html", "w") as fp:
fp.write(page_source)
return page_source

View File

@@ -0,0 +1,11 @@
import re
import unicodedata
def slugify(text: str) -> str:
# Normalize unicode characters
text = unicodedata.normalize("NFKD", text).encode("ascii", "ignore").decode("ascii")
# Replace non-word characters with hyphens
text = re.sub(r"[^\w\s-]", "", text).strip().lower()
# Replace spaces and repeated hyphens with a single hyphen
return re.sub(r"[-\s]+", "-", text)