Compare commits
10 Commits
0010df735f
...
030424e124
| Author | SHA1 | Date | |
|---|---|---|---|
| 030424e124 | |||
| 81c2035d02 | |||
| 2d20cdf256 | |||
| 28a195a256 | |||
| 629bc6c648 | |||
| 4bc5770cce | |||
| b26ba7aa35 | |||
| 17ca8669ef | |||
| f7318e85cd | |||
| a3d931c1b3 |
@@ -14,8 +14,7 @@ RUN curl -LsSf https://astral.sh/uv/install.sh | sh
|
||||
WORKDIR /opt/dagster/home
|
||||
COPY requirements.txt .
|
||||
|
||||
RUN uv pip install -r requirements.txt --system \
|
||||
&& uv pip install polars-lts-cpu --system
|
||||
RUN uv pip install -r requirements.txt --system
|
||||
|
||||
ARG APP
|
||||
ENV APP=$APP
|
||||
|
||||
@@ -1,8 +1,5 @@
|
||||
FROM mcr.microsoft.com/playwright:v1.54.0-noble
|
||||
|
||||
ARG APP
|
||||
ENV APP=$APP
|
||||
ENV PYTHONPATH=/apps/$APP/src/:/shared/src/:$PYTHONPATH
|
||||
ENV PATH="/venv/bin:/root/.local/bin:$PATH"
|
||||
|
||||
WORKDIR /opt/dagster/home
|
||||
@@ -14,14 +11,15 @@ RUN curl -LsSf https://astral.sh/uv/install.sh | sh && \
|
||||
COPY requirements.txt .
|
||||
|
||||
RUN . /venv/bin/activate && \
|
||||
uv pip install -r requirements.txt && \
|
||||
uv pip install polars-lts-cpu
|
||||
uv pip install -r requirements.txt
|
||||
|
||||
RUN . /venv/bin/activate && \
|
||||
uv pip install playwright && \
|
||||
playwright install
|
||||
|
||||
|
||||
ARG APP
|
||||
ENV APP=$APP
|
||||
ENV PYTHONPATH=/code/apps/$APP/src/:/code/shared/src/:$PYTHONPATH
|
||||
|
||||
# Run dagster gRPC server on port 4000
|
||||
EXPOSE 4000
|
||||
|
||||
@@ -15,22 +15,12 @@ ENV DAGSTER_HOME=/opt/dagster/home/
|
||||
WORKDIR $DAGSTER_HOME
|
||||
COPY dagster-requirements.txt requirements.txt
|
||||
|
||||
RUN uv pip install -r requirements.txt --system \
|
||||
&& uv pip install polars-lts-cpu --system
|
||||
RUN uv pip install -r requirements.txt --system
|
||||
|
||||
RUN mkdir -p $DAGSTER_HOME
|
||||
|
||||
# Create entrypoint that renders the dagster.yaml from a template
|
||||
RUN cat << 'EOF' > /entrypoint.sh
|
||||
#!/bin/sh
|
||||
set -e
|
||||
|
||||
echo "Rendering dagster.yaml from template..."
|
||||
envsubst < dagster.yaml.template > dagster.yaml
|
||||
|
||||
echo "Starting Dagster: $@"
|
||||
exec "$@"
|
||||
EOF
|
||||
COPY entrypoint.sh /entrypoint.sh
|
||||
RUN chmod +x /entrypoint.sh
|
||||
ENTRYPOINT ["/entrypoint.sh"]
|
||||
|
||||
|
||||
@@ -6,7 +6,7 @@ annotated-types==0.7.0
|
||||
# via pydantic
|
||||
antlr4-python3-runtime==4.13.2
|
||||
# via dagster
|
||||
anyio==4.9.0
|
||||
anyio==4.10.0
|
||||
# via
|
||||
# gql
|
||||
# starlette
|
||||
@@ -28,7 +28,7 @@ botocore==1.40.1
|
||||
# boto3
|
||||
# s3fs
|
||||
# s3transfer
|
||||
certifi==2025.7.14
|
||||
certifi==2025.8.3
|
||||
# via requests
|
||||
charset-normalizer==3.4.2
|
||||
# via requests
|
||||
|
||||
@@ -6,17 +6,13 @@ annotated-types==0.7.0
|
||||
# via pydantic
|
||||
antlr4-python3-runtime==4.13.2
|
||||
# via dagster
|
||||
anyio==4.9.0
|
||||
anyio==4.10.0
|
||||
# via
|
||||
# gql
|
||||
# starlette
|
||||
# watchfiles
|
||||
asttokens==3.0.0
|
||||
# via icecream
|
||||
attrs==25.3.0
|
||||
# via
|
||||
# outcome
|
||||
# trio
|
||||
backoff==2.2.1
|
||||
# via gql
|
||||
beautifulsoup4==4.13.4
|
||||
@@ -30,10 +26,8 @@ botocore==1.40.1
|
||||
# boto3
|
||||
# s3fs
|
||||
# s3transfer
|
||||
certifi==2025.7.14
|
||||
# via
|
||||
# requests
|
||||
# selenium
|
||||
certifi==2025.8.3
|
||||
# via requests
|
||||
charset-normalizer==3.4.2
|
||||
# via requests
|
||||
click==8.1.8
|
||||
@@ -132,8 +126,6 @@ graphql-core==3.2.6
|
||||
# graphql-relay
|
||||
graphql-relay==3.2.0
|
||||
# via graphene
|
||||
greenlet==3.2.3
|
||||
# via playwright
|
||||
grpcio==1.74.0
|
||||
# via
|
||||
# dagster
|
||||
@@ -141,9 +133,7 @@ grpcio==1.74.0
|
||||
grpcio-health-checking==1.71.2
|
||||
# via dagster
|
||||
h11==0.16.0
|
||||
# via
|
||||
# uvicorn
|
||||
# wsproto
|
||||
# via uvicorn
|
||||
httptools==0.6.4
|
||||
# via uvicorn
|
||||
humanfriendly==10.0
|
||||
@@ -155,7 +145,6 @@ idna==3.10
|
||||
# anyio
|
||||
# email-validator
|
||||
# requests
|
||||
# trio
|
||||
# yarl
|
||||
jinja2==3.1.6
|
||||
# via dagster
|
||||
@@ -190,10 +179,6 @@ numpy==2.3.2
|
||||
# seaborn
|
||||
openpyxl==3.1.5
|
||||
# via dev (pyproject.toml)
|
||||
outcome==1.3.0.post0
|
||||
# via
|
||||
# trio
|
||||
# trio-websocket
|
||||
packaging==25.0
|
||||
# via
|
||||
# dagster-aws
|
||||
@@ -210,10 +195,10 @@ patito==0.8.3
|
||||
# via
|
||||
# dev (pyproject.toml)
|
||||
# dagster-polars
|
||||
pendulum==3.1.0
|
||||
# via dev (pyproject.toml)
|
||||
pillow==11.3.0
|
||||
# via matplotlib
|
||||
playwright==1.54.0
|
||||
# via dev (pyproject.toml)
|
||||
polars==1.32.0
|
||||
# via
|
||||
# dagster-polars
|
||||
@@ -240,8 +225,6 @@ pydantic-core==2.33.2
|
||||
# via pydantic
|
||||
pydantic-settings==2.10.1
|
||||
# via dev (pyproject.toml)
|
||||
pyee==13.0.0
|
||||
# via playwright
|
||||
pygments==2.19.2
|
||||
# via
|
||||
# icecream
|
||||
@@ -249,15 +232,14 @@ pygments==2.19.2
|
||||
pyparsing==3.2.3
|
||||
# via matplotlib
|
||||
pysocks==1.7.1
|
||||
# via
|
||||
# requests
|
||||
# urllib3
|
||||
# via requests
|
||||
python-dateutil==2.9.0.post0
|
||||
# via
|
||||
# botocore
|
||||
# graphene
|
||||
# matplotlib
|
||||
# pandas
|
||||
# pendulum
|
||||
python-dotenv==1.1.1
|
||||
# via
|
||||
# dagster
|
||||
@@ -293,8 +275,6 @@ s3transfer==0.13.1
|
||||
# via boto3
|
||||
seaborn==0.13.2
|
||||
# via dev (pyproject.toml)
|
||||
selenium==4.34.2
|
||||
# via dev (pyproject.toml)
|
||||
setuptools==80.9.0
|
||||
# via dagster
|
||||
six==1.17.0
|
||||
@@ -304,11 +284,7 @@ six==1.17.0
|
||||
smmap==5.0.2
|
||||
# via gitdb
|
||||
sniffio==1.3.1
|
||||
# via
|
||||
# anyio
|
||||
# trio
|
||||
sortedcontainers==2.4.0
|
||||
# via trio
|
||||
# via anyio
|
||||
soupsieve==2.7
|
||||
# via beautifulsoup4
|
||||
sqlalchemy==2.0.42
|
||||
@@ -333,12 +309,6 @@ toposort==1.10
|
||||
# via dagster
|
||||
tqdm==4.67.1
|
||||
# via dagster
|
||||
trio==0.30.0
|
||||
# via
|
||||
# selenium
|
||||
# trio-websocket
|
||||
trio-websocket==0.12.2
|
||||
# via selenium
|
||||
typing-extensions==4.14.1
|
||||
# via
|
||||
# alembic
|
||||
@@ -350,8 +320,6 @@ typing-extensions==4.14.1
|
||||
# patito
|
||||
# pydantic
|
||||
# pydantic-core
|
||||
# pyee
|
||||
# selenium
|
||||
# sqlalchemy
|
||||
# starlette
|
||||
# typing-inspection
|
||||
@@ -360,7 +328,9 @@ typing-inspection==0.4.1
|
||||
# pydantic
|
||||
# pydantic-settings
|
||||
tzdata==2025.2
|
||||
# via pandas
|
||||
# via
|
||||
# pandas
|
||||
# pendulum
|
||||
universal-pathlib==0.2.6
|
||||
# via
|
||||
# dagster
|
||||
@@ -370,7 +340,6 @@ urllib3==2.5.0
|
||||
# botocore
|
||||
# docker
|
||||
# requests
|
||||
# selenium
|
||||
uvicorn==0.35.0
|
||||
# via dagster-webserver
|
||||
uvloop==0.21.0
|
||||
@@ -379,12 +348,8 @@ watchdog==5.0.3
|
||||
# via dagster
|
||||
watchfiles==1.1.0
|
||||
# via uvicorn
|
||||
websocket-client==1.8.0
|
||||
# via selenium
|
||||
websockets==15.0.1
|
||||
# via uvicorn
|
||||
wsproto==1.2.0
|
||||
# via trio-websocket
|
||||
xlsxwriter==3.2.5
|
||||
# via dev (pyproject.toml)
|
||||
yarl==1.20.1
|
||||
|
||||
@@ -2,81 +2,30 @@ import asyncio
|
||||
from collections.abc import Iterator
|
||||
from datetime import datetime, timezone
|
||||
from functools import partial
|
||||
from pathlib import Path
|
||||
|
||||
import structlog
|
||||
from config import APP, URL
|
||||
from partitions import daily_partitions_def
|
||||
from playwright.async_api import async_playwright
|
||||
from utils import extract_date
|
||||
from partitions import (
|
||||
daily_partitions_def,
|
||||
daily_table_partitions_def,
|
||||
table_partitions_def,
|
||||
)
|
||||
from utils.extracter import extract_date, extract_tables
|
||||
from utils.scraper import scrape
|
||||
from utils.text import slugify
|
||||
|
||||
import dagster as dg
|
||||
|
||||
TAGS = {"app": APP}
|
||||
asset = partial(dg.asset, key_prefix=APP, tags=TAGS)
|
||||
|
||||
|
||||
async def main() -> str:
|
||||
async with async_playwright() as p:
|
||||
browser = await p.chromium.launch(headless=True)
|
||||
context = await browser.new_context(viewport={"width": 1000, "height": 2000})
|
||||
page = await context.new_page()
|
||||
|
||||
await page.goto(URL, timeout=60000)
|
||||
|
||||
# Wait until at least one toggle button is present
|
||||
await page.wait_for_selector(".toggle-btn", timeout=20000)
|
||||
|
||||
# Set zoom
|
||||
await page.evaluate("document.body.style.zoom='50%'")
|
||||
|
||||
# Find all toggle buttons
|
||||
toggle_buttons = await page.query_selector_all(".toggle-btn")
|
||||
print(f"Found {len(toggle_buttons)} toggle buttons")
|
||||
|
||||
for i, btn in enumerate(toggle_buttons):
|
||||
try:
|
||||
# Ensure it's visible and enabled
|
||||
if await btn.is_visible() and await btn.is_enabled():
|
||||
await btn.click()
|
||||
await page.wait_for_timeout(1000)
|
||||
|
||||
if i == len(toggle_buttons) - 1:
|
||||
break
|
||||
|
||||
# Scroll down gradually
|
||||
scroll_step = 500
|
||||
total_height = await page.evaluate("() => document.body.scrollHeight")
|
||||
current_position = 0
|
||||
|
||||
while current_position < total_height:
|
||||
await page.evaluate(f"window.scrollTo(0, {current_position});")
|
||||
await page.wait_for_timeout(100)
|
||||
current_position += scroll_step
|
||||
total_height = await page.evaluate(
|
||||
"() => document.body.scrollHeight"
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
print(f"Skipped button due to error: {e}")
|
||||
|
||||
# Get the page content
|
||||
page_source = await page.content()
|
||||
|
||||
# Close the browser
|
||||
await browser.close()
|
||||
|
||||
# Continue scraping logic here...
|
||||
print("Scraping done")
|
||||
|
||||
# Save the page content to a file
|
||||
with open("/cache/scraped_page.html", "w") as fp:
|
||||
fp.write(page_source)
|
||||
|
||||
return page_source
|
||||
logger = structlog.get_logger()
|
||||
|
||||
|
||||
@asset(io_manager_key="html_io_manager", name="raw")
|
||||
def raw_html(context: dg.AssetExecutionContext) -> Iterator[dg.Output[str]]:
|
||||
page_source = asyncio.run(main())
|
||||
page_source = asyncio.run(scrape(url=URL))
|
||||
|
||||
now = datetime.now(tz=timezone.utc)
|
||||
date_str = now.strftime("%Y-%m-%d")
|
||||
@@ -94,18 +43,67 @@ def raw_html(context: dg.AssetExecutionContext) -> Iterator[dg.Output[str]]:
|
||||
date_str = date_obj.strftime("%Y-%m-%d")
|
||||
context.log.info(f"Found date: {date_str}")
|
||||
context.log_event(
|
||||
dg.AssetMaterialization(asset_key=daily_html.key, partition=date_str)
|
||||
dg.AssetMaterialization(asset_key=raw_html.key, partition=date_str)
|
||||
)
|
||||
except Exception as e:
|
||||
context.log.error(f"Parsing error: {e}")
|
||||
|
||||
|
||||
@asset(deps=[raw_html], partitions_def=daily_table_partitions_def)
|
||||
def daily_table() -> None: ...
|
||||
|
||||
|
||||
@asset(
|
||||
io_manager_key="html_io_manager",
|
||||
deps=[raw_html],
|
||||
io_manager_key="json_io_manager",
|
||||
partitions_def=daily_partitions_def,
|
||||
automation_condition=dg.AutomationCondition.eager(),
|
||||
output_required=False,
|
||||
)
|
||||
def daily_html() -> str: ...
|
||||
def raw_daily(context: dg.AssetExecutionContext) -> None:
|
||||
base = (
|
||||
Path(context.resources.json_io_manager.base_dir).joinpath(*raw_html.key.path)
|
||||
/ context.partition_key
|
||||
)
|
||||
if files := list(base.glob("*.html")):
|
||||
logger.info(f"Found {len(files)} html files")
|
||||
page_source = open(files[-1]).read()
|
||||
|
||||
for title, description, df in extract_tables(page_source):
|
||||
# TODO: when scraping click the "View Strategy Criteria" texts and record the
|
||||
# information
|
||||
if not title:
|
||||
logger.info(
|
||||
"No title!",
|
||||
description=description,
|
||||
num_rows=0 if df is None else len(df),
|
||||
)
|
||||
continue
|
||||
|
||||
class MyAssetConfig(dg.Config):
|
||||
image: str = "bla"
|
||||
if df is None:
|
||||
logger.info("No data!", title=title, description=description)
|
||||
continue
|
||||
|
||||
slug = slugify(title)
|
||||
output_context = dg.build_output_context(
|
||||
asset_key=dg.AssetKey(
|
||||
[APP, "daily", context.partition_key, slug],
|
||||
),
|
||||
resources=context.resources.original_resource_dict,
|
||||
)
|
||||
context.resources.json_io_manager.handle_output(
|
||||
output_context, df.to_dict(orient="records")
|
||||
)
|
||||
context.log_event(
|
||||
dg.AssetMaterialization(
|
||||
asset_key=daily_table.key,
|
||||
partition=f"{context.partition_key}|{slug}",
|
||||
metadata={
|
||||
"title": dg.MetadataValue.text(title),
|
||||
"slug": dg.MetadataValue.text(slug),
|
||||
"description": dg.MetadataValue.text(description),
|
||||
"rows": dg.MetadataValue.int(len(df)),
|
||||
},
|
||||
)
|
||||
)
|
||||
context.instance.add_dynamic_partitions(table_partitions_def.name, [slug])
|
||||
|
||||
@@ -4,6 +4,7 @@ import sensors
|
||||
from dagster_polars import PolarsParquetIOManager
|
||||
from icecream import install
|
||||
from shared.config import APP, STORAGE_DIR
|
||||
from shared.io_manager import JsonIOManager
|
||||
from shared.io_manager.html import HtmlIOManager
|
||||
|
||||
import dagster as dg
|
||||
@@ -20,8 +21,9 @@ definitions = dg.Definitions(
|
||||
],
|
||||
resources={
|
||||
"html_io_manager": HtmlIOManager(base_dir=STORAGE_DIR),
|
||||
"json_io_manager": JsonIOManager(base_dir=STORAGE_DIR),
|
||||
"polars_parquet_io_manager": PolarsParquetIOManager(base_dir=STORAGE_DIR),
|
||||
},
|
||||
jobs=[jobs.raw_html_job],
|
||||
sensors=[sensors.check_update],
|
||||
sensors=[sensors.check_update, sensors.parse_raw],
|
||||
)
|
||||
|
||||
@@ -7,3 +7,8 @@ raw_html_job = dg.define_asset_job(
|
||||
selection=[assets.raw_html.key],
|
||||
tags={"docker/image": "dagster-code-stocks-playwright"},
|
||||
)
|
||||
|
||||
extract_job = dg.define_asset_job(
|
||||
"extract_job",
|
||||
selection=[assets.raw_daily.key],
|
||||
)
|
||||
|
||||
@@ -5,3 +5,9 @@ import dagster as dg
|
||||
daily_partitions_def = dg.DailyPartitionsDefinition(
|
||||
start_date="2025-07-01", end_offset=1, timezone=os.environ.get("TZ", "UTC")
|
||||
)
|
||||
|
||||
table_partitions_def = dg.DynamicPartitionsDefinition(name="tables")
|
||||
|
||||
daily_table_partitions_def = dg.MultiPartitionsDefinition(
|
||||
{"date": daily_partitions_def, "source": table_partitions_def}
|
||||
)
|
||||
|
||||
@@ -1,13 +1,18 @@
|
||||
from collections.abc import Iterator
|
||||
from datetime import datetime
|
||||
|
||||
import assets
|
||||
import jobs
|
||||
import pendulum
|
||||
import requests
|
||||
import structlog
|
||||
from config import URL
|
||||
from utils import extract_date
|
||||
from utils.extracter import extract_date
|
||||
|
||||
import dagster as dg
|
||||
|
||||
logger = structlog.get_logger()
|
||||
|
||||
|
||||
@dg.sensor(job=jobs.raw_html_job, minimum_interval_seconds=4 * 60 * 60)
|
||||
def check_update(context: dg.SensorEvaluationContext) -> Iterator[dg.RunRequest]:
|
||||
@@ -30,5 +35,57 @@ def check_update(context: dg.SensorEvaluationContext) -> Iterator[dg.RunRequest]
|
||||
now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||
file = f"{now_str} stocks.html"
|
||||
context.log.info(f"Saving file: {file}")
|
||||
with open(f"/cache/{file}") as fp:
|
||||
with open(f"/cache/{file}", "w") as fp:
|
||||
fp.write(response.text)
|
||||
|
||||
|
||||
@dg.sensor(job=jobs.extract_job, minimum_interval_seconds=2 * 60 * 60)
|
||||
def parse_raw(context: dg.SensorEvaluationContext):
|
||||
# TODO: use cursor from sensor to filter materialization events
|
||||
|
||||
# Get the known materialized partitions of daily_tables
|
||||
daily_partitions = context.instance.get_materialized_partitions(
|
||||
assets.daily_table.key
|
||||
)
|
||||
dates = [x.split("|")[0] for x in daily_partitions]
|
||||
ic(daily_partitions, dates)
|
||||
|
||||
# Get metadata for the raw asset (assumes it's tracked or logged with metadata)
|
||||
events = list(
|
||||
context.instance.get_event_records(
|
||||
event_records_filter=dg.EventRecordsFilter(
|
||||
event_type=dg.DagsterEventType.ASSET_MATERIALIZATION,
|
||||
asset_key=assets.raw_html.key,
|
||||
),
|
||||
limit=100,
|
||||
)
|
||||
)
|
||||
|
||||
# Track unique dates found in raw that are not materialized in daily_tables
|
||||
unknown_dates = set()
|
||||
for event in events:
|
||||
metadata = event.event_log_entry.asset_materialization.metadata
|
||||
date_str = None
|
||||
ic(metadata)
|
||||
for key, entry in metadata.items():
|
||||
# TODO: move this general logic
|
||||
if key.lower() in {"date", "partition", "partition_date"}:
|
||||
date_str = entry.value
|
||||
break
|
||||
if not date_str:
|
||||
continue
|
||||
|
||||
# Normalize and validate the date
|
||||
try:
|
||||
dt = pendulum.from_timestamp(int(date_str))
|
||||
date_str = dt.strftime("%Y-%m-%d")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to parse date: {date_str}", input=date_str, e=e)
|
||||
continue
|
||||
|
||||
if date_str not in dates:
|
||||
unknown_dates.add(date_str)
|
||||
|
||||
ic(unknown_dates)
|
||||
for date_str in sorted(unknown_dates):
|
||||
yield dg.RunRequest(partition_key=date_str)
|
||||
|
||||
@@ -1,19 +0,0 @@
|
||||
import re
|
||||
from collections.abc import Iterator
|
||||
from datetime import date, datetime
|
||||
|
||||
from bs4 import BeautifulSoup
|
||||
|
||||
|
||||
def extract_date(page_source: str) -> Iterator[date]:
|
||||
# Parse with BeautifulSoup
|
||||
soup = BeautifulSoup(page_source, "html.parser")
|
||||
|
||||
# Find the first <div> after </header>
|
||||
if (header := soup.find("header")) and (div := header.find_next_sibling("div")):
|
||||
# Extract date part using regex
|
||||
match = re.search(r"(\d{1,2})(st|nd|rd|th)?\s+([A-Za-z]+)\s+(\d{4})", div.text)
|
||||
if match:
|
||||
day, _, month, year = match.groups()
|
||||
date_obj = datetime.strptime(f"{day} {month} {year}", "%d %B %Y")
|
||||
yield date_obj
|
||||
55
apps/stocks/src/utils/extracter.py
Normal file
55
apps/stocks/src/utils/extracter.py
Normal file
@@ -0,0 +1,55 @@
|
||||
import re
|
||||
from collections.abc import Iterator
|
||||
from datetime import date, datetime
|
||||
|
||||
import pandas as pd
|
||||
from bs4 import BeautifulSoup
|
||||
from pandas import DataFrame
|
||||
|
||||
|
||||
def extract_date(page_source: str) -> Iterator[date]:
|
||||
# Parse with BeautifulSoup
|
||||
soup = BeautifulSoup(page_source, "html.parser")
|
||||
|
||||
# Find the first <div> after </header>
|
||||
if (header := soup.find("header")) and (div := header.find_next_sibling("div")):
|
||||
# Extract date part using regex
|
||||
match = re.search(r"(\d{1,2})(st|nd|rd|th)?\s+([A-Za-z]+)\s+(\d{4})", div.text)
|
||||
if match:
|
||||
day, _, month, year = match.groups()
|
||||
date_obj = datetime.strptime(f"{day} {month} {year}", "%d %B %Y")
|
||||
yield date_obj
|
||||
|
||||
|
||||
def extract_tables(
|
||||
page_source: str,
|
||||
) -> Iterator[tuple[str | None, str | None, DataFrame]]:
|
||||
soup = BeautifulSoup(page_source, "html.parser")
|
||||
accordion_items = soup.find_all("div", class_="accordion-item")
|
||||
|
||||
for item in accordion_items:
|
||||
# Extract the title
|
||||
header = item.find("div", class_="accordion-header")
|
||||
title = header.find("h2").get_text(strip=True) if header else None
|
||||
|
||||
# Extract the description
|
||||
description_block = item.find("div", class_="accordion-description")
|
||||
description = (
|
||||
description_block.find("p").get_text(strip=True)
|
||||
if description_block
|
||||
else None
|
||||
)
|
||||
|
||||
# Extract the table
|
||||
table = item.find("table")
|
||||
if table:
|
||||
rows = []
|
||||
for row in table.find_all("tr"):
|
||||
cells = [
|
||||
cell.get_text(strip=True) for cell in row.find_all(["th", "td"])
|
||||
]
|
||||
rows.append(cells)
|
||||
|
||||
if rows:
|
||||
df = pd.DataFrame(rows[1:], columns=rows[0])
|
||||
yield title, description, df
|
||||
60
apps/stocks/src/utils/scraper.py
Normal file
60
apps/stocks/src/utils/scraper.py
Normal file
@@ -0,0 +1,60 @@
|
||||
async def scrape(url: str) -> str:
|
||||
from playwright.async_api import async_playwright
|
||||
|
||||
async with async_playwright() as p:
|
||||
browser = await p.chromium.launch(headless=True)
|
||||
context = await browser.new_context(viewport={"width": 1000, "height": 2000})
|
||||
page = await context.new_page()
|
||||
|
||||
await page.goto(url, timeout=60000)
|
||||
|
||||
# Wait until at least one toggle button is present
|
||||
await page.wait_for_selector(".toggle-btn", timeout=20000)
|
||||
|
||||
# Set zoom
|
||||
await page.evaluate("document.body.style.zoom='50%'")
|
||||
|
||||
# Find all toggle buttons
|
||||
toggle_buttons = await page.query_selector_all(".toggle-btn")
|
||||
print(f"Found {len(toggle_buttons)} toggle buttons")
|
||||
|
||||
for i, btn in enumerate(toggle_buttons):
|
||||
try:
|
||||
# Ensure it's visible and enabled
|
||||
if await btn.is_visible() and await btn.is_enabled():
|
||||
await btn.click()
|
||||
await page.wait_for_timeout(1000)
|
||||
|
||||
if i == len(toggle_buttons) - 1:
|
||||
break
|
||||
|
||||
# Scroll down gradually
|
||||
scroll_step = 500
|
||||
total_height = await page.evaluate("() => document.body.scrollHeight")
|
||||
current_position = 0
|
||||
|
||||
while current_position < total_height:
|
||||
await page.evaluate(f"window.scrollTo(0, {current_position});")
|
||||
await page.wait_for_timeout(100)
|
||||
current_position += scroll_step
|
||||
total_height = await page.evaluate(
|
||||
"() => document.body.scrollHeight"
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
print(f"Skipped button due to error: {e}")
|
||||
|
||||
# Get the page content
|
||||
page_source = await page.content()
|
||||
|
||||
# Close the browser
|
||||
await browser.close()
|
||||
|
||||
# Continue scraping logic here...
|
||||
print("Scraping done")
|
||||
|
||||
# Save the page content to a file
|
||||
with open("/cache/scraped_page.html", "w") as fp:
|
||||
fp.write(page_source)
|
||||
|
||||
return page_source
|
||||
11
apps/stocks/src/utils/text.py
Normal file
11
apps/stocks/src/utils/text.py
Normal file
@@ -0,0 +1,11 @@
|
||||
import re
|
||||
import unicodedata
|
||||
|
||||
|
||||
def slugify(text: str) -> str:
|
||||
# Normalize unicode characters
|
||||
text = unicodedata.normalize("NFKD", text).encode("ascii", "ignore").decode("ascii")
|
||||
# Replace non-word characters with hyphens
|
||||
text = re.sub(r"[^\w\s-]", "", text).strip().lower()
|
||||
# Replace spaces and repeated hyphens with a single hyphen
|
||||
return re.sub(r"[-\s]+", "-", text)
|
||||
@@ -6,7 +6,7 @@ annotated-types==0.7.0
|
||||
# via pydantic
|
||||
antlr4-python3-runtime==4.13.2
|
||||
# via dagster
|
||||
anyio==4.9.0
|
||||
anyio==4.10.0
|
||||
# via
|
||||
# gql
|
||||
# starlette
|
||||
@@ -26,7 +26,7 @@ botocore==1.40.1
|
||||
# boto3
|
||||
# s3fs
|
||||
# s3transfer
|
||||
certifi==2025.7.14
|
||||
certifi==2025.8.3
|
||||
# via requests
|
||||
charset-normalizer==3.4.2
|
||||
# via requests
|
||||
|
||||
@@ -1,22 +0,0 @@
|
||||
from typing import Any, Optional
|
||||
|
||||
|
||||
class MyIOManager(PolarsParquetIOManager):
|
||||
def _load_partition_from_path(
|
||||
self,
|
||||
context: InputContext,
|
||||
partition_key: str,
|
||||
path: "UPath",
|
||||
backcompat_path: Optional["UPath"] = None,
|
||||
) -> Any:
|
||||
try:
|
||||
return super()._load_partition_from_path(
|
||||
context, partition_key, path, backcompat_path
|
||||
)
|
||||
except FileNotFoundError:
|
||||
# Handle the case where the partition file does not exist
|
||||
context.log.warning(
|
||||
f"Partition file not found for key {partition_key} at path {path}. "
|
||||
"Returning an empty DataFrame."
|
||||
)
|
||||
return None
|
||||
@@ -1,51 +0,0 @@
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
from dotenv import find_dotenv, load_dotenv
|
||||
from icecream import ic
|
||||
|
||||
from dagster import AssetKey, DagsterInstance
|
||||
|
||||
|
||||
def delete_partition(instance, partition_def_name, partition_key):
|
||||
try:
|
||||
# This does not seem to work, perhaps because it is not a dynamic partition?
|
||||
# All materializations can be deleted through the UI, but not one by one
|
||||
instance.delete_dynamic_partition(partition_def_name, partition_key)
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error deleting partition: {e}")
|
||||
|
||||
|
||||
def main(instance):
|
||||
print(f"Partition '{partition_key}' deleted successfully.")
|
||||
|
||||
|
||||
def detect_previous_partition(instance, name):
|
||||
ic(name)
|
||||
records = instance.get_latest_materialization_events(
|
||||
(AssetKey(name),),
|
||||
# event_type="ASSET_MATERIALIZATION",
|
||||
# asset_key=(partition_key,),
|
||||
# limit=100,
|
||||
)
|
||||
print(records)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
partition_def_name = "asset_single_1"
|
||||
partition_key = "2025-07-20" # Example partition key
|
||||
|
||||
load_dotenv(find_dotenv())
|
||||
os.environ["DAGSTER_HOME"] = str(Path(__file__).parent.parent.parent)
|
||||
|
||||
for k, v in os.environ.items():
|
||||
if k.startswith("POSTGRES_"):
|
||||
os.environ[f"DAGSTER_{k}"] = v
|
||||
|
||||
os.environ["DAGSTER_POSTGRES_HOST"] = "localhost"
|
||||
instance = DagsterInstance.get()
|
||||
|
||||
# delete_partition(instance, partition_def_name, partition_key)
|
||||
|
||||
detect_previous_partition(instance, partition_def_name)
|
||||
@@ -1,8 +0,0 @@
|
||||
#!/usr/bin/env bash
|
||||
rsync -av /opt/dagster/src/app/vinyl/ \
|
||||
/Volumes/dagster/src/app/vinyl/ \
|
||||
--include='*.py' \
|
||||
--include='*requirements.txt' \
|
||||
--exclude='__pycache__/' \
|
||||
-progress \
|
||||
--delete $*
|
||||
@@ -1,34 +0,0 @@
|
||||
import time
|
||||
|
||||
from dagster import AssetMaterialization, Output, config_mapping, job, op
|
||||
|
||||
|
||||
@op(config_schema={"config_param": str})
|
||||
def hello(context):
|
||||
time.sleep(1)
|
||||
print("halllo")
|
||||
return Output(123, metadata={"aa": context.op_config["config_param"]})
|
||||
|
||||
|
||||
@op
|
||||
def goodbye(context, x: int):
|
||||
time.sleep(2)
|
||||
print("doooei", x)
|
||||
context.log_event(
|
||||
AssetMaterialization(
|
||||
asset_key="my_asset",
|
||||
metadata={"my_meta": 444},
|
||||
description="A very useful value!",
|
||||
)
|
||||
)
|
||||
return 2
|
||||
|
||||
|
||||
@config_mapping(config_schema={"simplified_param": str})
|
||||
def simplified_config(val):
|
||||
return {"ops": {"hello": {"config": {"config_param": val["simplified_param"]}}}}
|
||||
|
||||
|
||||
@job
|
||||
def my_job():
|
||||
goodbye(hello())
|
||||
@@ -6,7 +6,7 @@ annotated-types==0.7.0
|
||||
# via pydantic
|
||||
antlr4-python3-runtime==4.13.2
|
||||
# via dagster
|
||||
anyio==4.9.0
|
||||
anyio==4.10.0
|
||||
# via
|
||||
# gql
|
||||
# starlette
|
||||
@@ -26,7 +26,7 @@ botocore==1.40.1
|
||||
# boto3
|
||||
# s3fs
|
||||
# s3transfer
|
||||
certifi==2025.7.14
|
||||
certifi==2025.8.3
|
||||
# via requests
|
||||
charset-normalizer==3.4.2
|
||||
# via requests
|
||||
|
||||
@@ -170,7 +170,9 @@ def works(context: dg.AssetExecutionContext) -> Iterator[dg.Output[pl.DataFrame]
|
||||
},
|
||||
output_required=False,
|
||||
dagster_type=patito_model_to_dagster_type(Deal),
|
||||
automation_condition=dg.AutomationCondition.eager(),
|
||||
automation_condition=dg.AutomationCondition.on_missing().without(
|
||||
dg.AutomationCondition.in_latest_time_window()
|
||||
),
|
||||
)
|
||||
def new_deals(
|
||||
context: dg.AssetExecutionContext, partitions: dict[str, pl.LazyFrame | None]
|
||||
|
||||
@@ -6,7 +6,7 @@ annotated-types==0.7.0
|
||||
# via pydantic
|
||||
antlr4-python3-runtime==4.13.2
|
||||
# via dagster
|
||||
anyio==4.9.0
|
||||
anyio==4.10.0
|
||||
# via
|
||||
# gql
|
||||
# starlette
|
||||
@@ -32,7 +32,7 @@ botocore==1.40.1
|
||||
# s3transfer
|
||||
cattrs==25.1.1
|
||||
# via requests-cache
|
||||
certifi==2025.7.14
|
||||
certifi==2025.8.3
|
||||
# via requests
|
||||
charset-normalizer==3.4.2
|
||||
# via
|
||||
|
||||
@@ -22,10 +22,9 @@ definitions = dg.Definitions(
|
||||
"polars_parquet_io_manager": PolarsParquetIOManager(base_dir=STORAGE_DIR),
|
||||
},
|
||||
sensors=[
|
||||
sensors.list_locations,
|
||||
# sensors.list_locations,
|
||||
sensors.list_latitudes,
|
||||
sensors.list_longitudes,
|
||||
sensors.retrieve_weather,
|
||||
# sensors.list_longitudes,
|
||||
sensors.retrieve_weather,
|
||||
],
|
||||
)
|
||||
|
||||
8
entrypoint.sh
Normal file
8
entrypoint.sh
Normal file
@@ -0,0 +1,8 @@
|
||||
#!/bin/sh
|
||||
set -e
|
||||
|
||||
echo "Rendering dagster.yaml from template..."
|
||||
envsubst < dagster.yaml.template > dagster.yaml
|
||||
|
||||
echo "Starting Dagster: $@"
|
||||
exec "$@"
|
||||
19
poetry.lock
generated
19
poetry.lock
generated
@@ -1,19 +0,0 @@
|
||||
# This file is automatically @generated by Poetry 1.7.1 and should not be changed by hand.
|
||||
|
||||
[[package]]
|
||||
name = "seven"
|
||||
version = "1.0.0"
|
||||
description = "Python 2.5 compatibility wrapper for Python 2.7 code."
|
||||
optional = false
|
||||
python-versions = "*"
|
||||
files = [
|
||||
{file = "seven-1.0.0.tar.gz", hash = "sha256:e80157857dc378545b0cd8626668bf0e20d7f3608a5587f3fcc71a56d2416814"},
|
||||
]
|
||||
|
||||
[package.extras]
|
||||
tests = ["zope.testing"]
|
||||
|
||||
[metadata]
|
||||
lock-version = "2.0"
|
||||
python-versions = "*"
|
||||
content-hash = "edfc27fcb4a7dc1b1a11f2224d7b7f3e936c5f624df1dd86207c4dc08e047b5d"
|
||||
Reference in New Issue
Block a user