From f7318e85cda3f15e31c1e1a0a43908325a91b286 Mon Sep 17 00:00:00 2001 From: Rik Veenboer Date: Mon, 4 Aug 2025 18:02:00 +0200 Subject: [PATCH] move utils --- apps/stocks/src/utils.py | 19 ---------- apps/stocks/src/utils/__init__.py | 0 apps/stocks/src/utils/extracter.py | 55 +++++++++++++++++++++++++++ apps/stocks/src/utils/scraper.py | 60 ++++++++++++++++++++++++++++++ apps/stocks/src/utils/text.py | 11 ++++++ 5 files changed, 126 insertions(+), 19 deletions(-) delete mode 100644 apps/stocks/src/utils.py create mode 100644 apps/stocks/src/utils/__init__.py create mode 100644 apps/stocks/src/utils/extracter.py create mode 100644 apps/stocks/src/utils/scraper.py create mode 100644 apps/stocks/src/utils/text.py diff --git a/apps/stocks/src/utils.py b/apps/stocks/src/utils.py deleted file mode 100644 index f7ffe72..0000000 --- a/apps/stocks/src/utils.py +++ /dev/null @@ -1,19 +0,0 @@ -import re -from collections.abc import Iterator -from datetime import date, datetime - -from bs4 import BeautifulSoup - - -def extract_date(page_source: str) -> Iterator[date]: - # Parse with BeautifulSoup - soup = BeautifulSoup(page_source, "html.parser") - - # Find the first
after - if (header := soup.find("header")) and (div := header.find_next_sibling("div")): - # Extract date part using regex - match = re.search(r"(\d{1,2})(st|nd|rd|th)?\s+([A-Za-z]+)\s+(\d{4})", div.text) - if match: - day, _, month, year = match.groups() - date_obj = datetime.strptime(f"{day} {month} {year}", "%d %B %Y") - yield date_obj diff --git a/apps/stocks/src/utils/__init__.py b/apps/stocks/src/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/apps/stocks/src/utils/extracter.py b/apps/stocks/src/utils/extracter.py new file mode 100644 index 0000000..5bcdb4b --- /dev/null +++ b/apps/stocks/src/utils/extracter.py @@ -0,0 +1,55 @@ +import re +from collections.abc import Iterator +from datetime import date, datetime + +import pandas as pd +from bs4 import BeautifulSoup +from pandas import DataFrame + + +def extract_date(page_source: str) -> Iterator[date]: + # Parse with BeautifulSoup + soup = BeautifulSoup(page_source, "html.parser") + + # Find the first
after + if (header := soup.find("header")) and (div := header.find_next_sibling("div")): + # Extract date part using regex + match = re.search(r"(\d{1,2})(st|nd|rd|th)?\s+([A-Za-z]+)\s+(\d{4})", div.text) + if match: + day, _, month, year = match.groups() + date_obj = datetime.strptime(f"{day} {month} {year}", "%d %B %Y") + yield date_obj + + +def extract_tables( + page_source: str, +) -> Iterator[tuple[str | None, str | None, DataFrame]]: + soup = BeautifulSoup(page_source, "html.parser") + accordion_items = soup.find_all("div", class_="accordion-item") + + for item in accordion_items: + # Extract the title + header = item.find("div", class_="accordion-header") + title = header.find("h2").get_text(strip=True) if header else None + + # Extract the description + description_block = item.find("div", class_="accordion-description") + description = ( + description_block.find("p").get_text(strip=True) + if description_block + else None + ) + + # Extract the table + table = item.find("table") + if table: + rows = [] + for row in table.find_all("tr"): + cells = [ + cell.get_text(strip=True) for cell in row.find_all(["th", "td"]) + ] + rows.append(cells) + + if rows: + df = pd.DataFrame(rows[1:], columns=rows[0]) + yield title, description, df diff --git a/apps/stocks/src/utils/scraper.py b/apps/stocks/src/utils/scraper.py new file mode 100644 index 0000000..405c0b9 --- /dev/null +++ b/apps/stocks/src/utils/scraper.py @@ -0,0 +1,60 @@ +async def scrape(url: str) -> str: + from playwright.async_api import async_playwright + + async with async_playwright() as p: + browser = await p.chromium.launch(headless=True) + context = await browser.new_context(viewport={"width": 1000, "height": 2000}) + page = await context.new_page() + + await page.goto(url, timeout=60000) + + # Wait until at least one toggle button is present + await page.wait_for_selector(".toggle-btn", timeout=20000) + + # Set zoom + await page.evaluate("document.body.style.zoom='50%'") + + # Find all toggle buttons + toggle_buttons = await page.query_selector_all(".toggle-btn") + print(f"Found {len(toggle_buttons)} toggle buttons") + + for i, btn in enumerate(toggle_buttons): + try: + # Ensure it's visible and enabled + if await btn.is_visible() and await btn.is_enabled(): + await btn.click() + await page.wait_for_timeout(1000) + + if i == len(toggle_buttons) - 1: + break + + # Scroll down gradually + scroll_step = 500 + total_height = await page.evaluate("() => document.body.scrollHeight") + current_position = 0 + + while current_position < total_height: + await page.evaluate(f"window.scrollTo(0, {current_position});") + await page.wait_for_timeout(100) + current_position += scroll_step + total_height = await page.evaluate( + "() => document.body.scrollHeight" + ) + + except Exception as e: + print(f"Skipped button due to error: {e}") + + # Get the page content + page_source = await page.content() + + # Close the browser + await browser.close() + + # Continue scraping logic here... + print("Scraping done") + + # Save the page content to a file + with open("/cache/scraped_page.html", "w") as fp: + fp.write(page_source) + + return page_source diff --git a/apps/stocks/src/utils/text.py b/apps/stocks/src/utils/text.py new file mode 100644 index 0000000..a74e5a5 --- /dev/null +++ b/apps/stocks/src/utils/text.py @@ -0,0 +1,11 @@ +import re +import unicodedata + + +def slugify(text: str) -> str: + # Normalize unicode characters + text = unicodedata.normalize("NFKD", text).encode("ascii", "ignore").decode("ascii") + # Replace non-word characters with hyphens + text = re.sub(r"[^\w\s-]", "", text).strip().lower() + # Replace spaces and repeated hyphens with a single hyphen + return re.sub(r"[-\s]+", "-", text)