Compare commits

..

10 Commits

Author SHA1 Message Date
030424e124 delete old files 2025-08-05 09:01:19 +02:00
81c2035d02 store old files 2025-08-05 09:01:07 +02:00
2d20cdf256 remove polars-lts-cpu 2025-08-05 08:36:49 +02:00
28a195a256 tweaks 2025-08-04 20:05:20 +02:00
629bc6c648 put entrypoint in own file 2025-08-04 19:17:32 +02:00
4bc5770cce no need for lock 2025-08-04 19:10:50 +02:00
b26ba7aa35 tweaks 2025-08-04 19:09:32 +02:00
17ca8669ef parse stocks pages 2025-08-04 18:05:53 +02:00
f7318e85cd move utils 2025-08-04 18:02:00 +02:00
a3d931c1b3 update requirements files 2025-08-04 18:01:47 +02:00
26 changed files with 305 additions and 303 deletions

View File

@@ -14,8 +14,7 @@ RUN curl -LsSf https://astral.sh/uv/install.sh | sh
WORKDIR /opt/dagster/home WORKDIR /opt/dagster/home
COPY requirements.txt . COPY requirements.txt .
RUN uv pip install -r requirements.txt --system \ RUN uv pip install -r requirements.txt --system
&& uv pip install polars-lts-cpu --system
ARG APP ARG APP
ENV APP=$APP ENV APP=$APP

View File

@@ -1,8 +1,5 @@
FROM mcr.microsoft.com/playwright:v1.54.0-noble FROM mcr.microsoft.com/playwright:v1.54.0-noble
ARG APP
ENV APP=$APP
ENV PYTHONPATH=/apps/$APP/src/:/shared/src/:$PYTHONPATH
ENV PATH="/venv/bin:/root/.local/bin:$PATH" ENV PATH="/venv/bin:/root/.local/bin:$PATH"
WORKDIR /opt/dagster/home WORKDIR /opt/dagster/home
@@ -14,14 +11,15 @@ RUN curl -LsSf https://astral.sh/uv/install.sh | sh && \
COPY requirements.txt . COPY requirements.txt .
RUN . /venv/bin/activate && \ RUN . /venv/bin/activate && \
uv pip install -r requirements.txt && \ uv pip install -r requirements.txt
uv pip install polars-lts-cpu
RUN . /venv/bin/activate && \ RUN . /venv/bin/activate && \
uv pip install playwright && \ uv pip install playwright && \
playwright install playwright install
ARG APP
ENV APP=$APP
ENV PYTHONPATH=/code/apps/$APP/src/:/code/shared/src/:$PYTHONPATH
# Run dagster gRPC server on port 4000 # Run dagster gRPC server on port 4000
EXPOSE 4000 EXPOSE 4000

View File

@@ -15,22 +15,12 @@ ENV DAGSTER_HOME=/opt/dagster/home/
WORKDIR $DAGSTER_HOME WORKDIR $DAGSTER_HOME
COPY dagster-requirements.txt requirements.txt COPY dagster-requirements.txt requirements.txt
RUN uv pip install -r requirements.txt --system \ RUN uv pip install -r requirements.txt --system
&& uv pip install polars-lts-cpu --system
RUN mkdir -p $DAGSTER_HOME RUN mkdir -p $DAGSTER_HOME
# Create entrypoint that renders the dagster.yaml from a template # Create entrypoint that renders the dagster.yaml from a template
RUN cat << 'EOF' > /entrypoint.sh COPY entrypoint.sh /entrypoint.sh
#!/bin/sh
set -e
echo "Rendering dagster.yaml from template..."
envsubst < dagster.yaml.template > dagster.yaml
echo "Starting Dagster: $@"
exec "$@"
EOF
RUN chmod +x /entrypoint.sh RUN chmod +x /entrypoint.sh
ENTRYPOINT ["/entrypoint.sh"] ENTRYPOINT ["/entrypoint.sh"]

View File

@@ -6,7 +6,7 @@ annotated-types==0.7.0
# via pydantic # via pydantic
antlr4-python3-runtime==4.13.2 antlr4-python3-runtime==4.13.2
# via dagster # via dagster
anyio==4.9.0 anyio==4.10.0
# via # via
# gql # gql
# starlette # starlette
@@ -28,7 +28,7 @@ botocore==1.40.1
# boto3 # boto3
# s3fs # s3fs
# s3transfer # s3transfer
certifi==2025.7.14 certifi==2025.8.3
# via requests # via requests
charset-normalizer==3.4.2 charset-normalizer==3.4.2
# via requests # via requests

View File

@@ -6,17 +6,13 @@ annotated-types==0.7.0
# via pydantic # via pydantic
antlr4-python3-runtime==4.13.2 antlr4-python3-runtime==4.13.2
# via dagster # via dagster
anyio==4.9.0 anyio==4.10.0
# via # via
# gql # gql
# starlette # starlette
# watchfiles # watchfiles
asttokens==3.0.0 asttokens==3.0.0
# via icecream # via icecream
attrs==25.3.0
# via
# outcome
# trio
backoff==2.2.1 backoff==2.2.1
# via gql # via gql
beautifulsoup4==4.13.4 beautifulsoup4==4.13.4
@@ -30,10 +26,8 @@ botocore==1.40.1
# boto3 # boto3
# s3fs # s3fs
# s3transfer # s3transfer
certifi==2025.7.14 certifi==2025.8.3
# via # via requests
# requests
# selenium
charset-normalizer==3.4.2 charset-normalizer==3.4.2
# via requests # via requests
click==8.1.8 click==8.1.8
@@ -132,8 +126,6 @@ graphql-core==3.2.6
# graphql-relay # graphql-relay
graphql-relay==3.2.0 graphql-relay==3.2.0
# via graphene # via graphene
greenlet==3.2.3
# via playwright
grpcio==1.74.0 grpcio==1.74.0
# via # via
# dagster # dagster
@@ -141,9 +133,7 @@ grpcio==1.74.0
grpcio-health-checking==1.71.2 grpcio-health-checking==1.71.2
# via dagster # via dagster
h11==0.16.0 h11==0.16.0
# via # via uvicorn
# uvicorn
# wsproto
httptools==0.6.4 httptools==0.6.4
# via uvicorn # via uvicorn
humanfriendly==10.0 humanfriendly==10.0
@@ -155,7 +145,6 @@ idna==3.10
# anyio # anyio
# email-validator # email-validator
# requests # requests
# trio
# yarl # yarl
jinja2==3.1.6 jinja2==3.1.6
# via dagster # via dagster
@@ -190,10 +179,6 @@ numpy==2.3.2
# seaborn # seaborn
openpyxl==3.1.5 openpyxl==3.1.5
# via dev (pyproject.toml) # via dev (pyproject.toml)
outcome==1.3.0.post0
# via
# trio
# trio-websocket
packaging==25.0 packaging==25.0
# via # via
# dagster-aws # dagster-aws
@@ -210,10 +195,10 @@ patito==0.8.3
# via # via
# dev (pyproject.toml) # dev (pyproject.toml)
# dagster-polars # dagster-polars
pendulum==3.1.0
# via dev (pyproject.toml)
pillow==11.3.0 pillow==11.3.0
# via matplotlib # via matplotlib
playwright==1.54.0
# via dev (pyproject.toml)
polars==1.32.0 polars==1.32.0
# via # via
# dagster-polars # dagster-polars
@@ -240,8 +225,6 @@ pydantic-core==2.33.2
# via pydantic # via pydantic
pydantic-settings==2.10.1 pydantic-settings==2.10.1
# via dev (pyproject.toml) # via dev (pyproject.toml)
pyee==13.0.0
# via playwright
pygments==2.19.2 pygments==2.19.2
# via # via
# icecream # icecream
@@ -249,15 +232,14 @@ pygments==2.19.2
pyparsing==3.2.3 pyparsing==3.2.3
# via matplotlib # via matplotlib
pysocks==1.7.1 pysocks==1.7.1
# via # via requests
# requests
# urllib3
python-dateutil==2.9.0.post0 python-dateutil==2.9.0.post0
# via # via
# botocore # botocore
# graphene # graphene
# matplotlib # matplotlib
# pandas # pandas
# pendulum
python-dotenv==1.1.1 python-dotenv==1.1.1
# via # via
# dagster # dagster
@@ -293,8 +275,6 @@ s3transfer==0.13.1
# via boto3 # via boto3
seaborn==0.13.2 seaborn==0.13.2
# via dev (pyproject.toml) # via dev (pyproject.toml)
selenium==4.34.2
# via dev (pyproject.toml)
setuptools==80.9.0 setuptools==80.9.0
# via dagster # via dagster
six==1.17.0 six==1.17.0
@@ -304,11 +284,7 @@ six==1.17.0
smmap==5.0.2 smmap==5.0.2
# via gitdb # via gitdb
sniffio==1.3.1 sniffio==1.3.1
# via # via anyio
# anyio
# trio
sortedcontainers==2.4.0
# via trio
soupsieve==2.7 soupsieve==2.7
# via beautifulsoup4 # via beautifulsoup4
sqlalchemy==2.0.42 sqlalchemy==2.0.42
@@ -333,12 +309,6 @@ toposort==1.10
# via dagster # via dagster
tqdm==4.67.1 tqdm==4.67.1
# via dagster # via dagster
trio==0.30.0
# via
# selenium
# trio-websocket
trio-websocket==0.12.2
# via selenium
typing-extensions==4.14.1 typing-extensions==4.14.1
# via # via
# alembic # alembic
@@ -350,8 +320,6 @@ typing-extensions==4.14.1
# patito # patito
# pydantic # pydantic
# pydantic-core # pydantic-core
# pyee
# selenium
# sqlalchemy # sqlalchemy
# starlette # starlette
# typing-inspection # typing-inspection
@@ -360,7 +328,9 @@ typing-inspection==0.4.1
# pydantic # pydantic
# pydantic-settings # pydantic-settings
tzdata==2025.2 tzdata==2025.2
# via pandas # via
# pandas
# pendulum
universal-pathlib==0.2.6 universal-pathlib==0.2.6
# via # via
# dagster # dagster
@@ -370,7 +340,6 @@ urllib3==2.5.0
# botocore # botocore
# docker # docker
# requests # requests
# selenium
uvicorn==0.35.0 uvicorn==0.35.0
# via dagster-webserver # via dagster-webserver
uvloop==0.21.0 uvloop==0.21.0
@@ -379,12 +348,8 @@ watchdog==5.0.3
# via dagster # via dagster
watchfiles==1.1.0 watchfiles==1.1.0
# via uvicorn # via uvicorn
websocket-client==1.8.0
# via selenium
websockets==15.0.1 websockets==15.0.1
# via uvicorn # via uvicorn
wsproto==1.2.0
# via trio-websocket
xlsxwriter==3.2.5 xlsxwriter==3.2.5
# via dev (pyproject.toml) # via dev (pyproject.toml)
yarl==1.20.1 yarl==1.20.1

View File

@@ -2,81 +2,30 @@ import asyncio
from collections.abc import Iterator from collections.abc import Iterator
from datetime import datetime, timezone from datetime import datetime, timezone
from functools import partial from functools import partial
from pathlib import Path
import structlog
from config import APP, URL from config import APP, URL
from partitions import daily_partitions_def from partitions import (
from playwright.async_api import async_playwright daily_partitions_def,
from utils import extract_date daily_table_partitions_def,
table_partitions_def,
)
from utils.extracter import extract_date, extract_tables
from utils.scraper import scrape
from utils.text import slugify
import dagster as dg import dagster as dg
TAGS = {"app": APP} TAGS = {"app": APP}
asset = partial(dg.asset, key_prefix=APP, tags=TAGS) asset = partial(dg.asset, key_prefix=APP, tags=TAGS)
logger = structlog.get_logger()
async def main() -> str:
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True)
context = await browser.new_context(viewport={"width": 1000, "height": 2000})
page = await context.new_page()
await page.goto(URL, timeout=60000)
# Wait until at least one toggle button is present
await page.wait_for_selector(".toggle-btn", timeout=20000)
# Set zoom
await page.evaluate("document.body.style.zoom='50%'")
# Find all toggle buttons
toggle_buttons = await page.query_selector_all(".toggle-btn")
print(f"Found {len(toggle_buttons)} toggle buttons")
for i, btn in enumerate(toggle_buttons):
try:
# Ensure it's visible and enabled
if await btn.is_visible() and await btn.is_enabled():
await btn.click()
await page.wait_for_timeout(1000)
if i == len(toggle_buttons) - 1:
break
# Scroll down gradually
scroll_step = 500
total_height = await page.evaluate("() => document.body.scrollHeight")
current_position = 0
while current_position < total_height:
await page.evaluate(f"window.scrollTo(0, {current_position});")
await page.wait_for_timeout(100)
current_position += scroll_step
total_height = await page.evaluate(
"() => document.body.scrollHeight"
)
except Exception as e:
print(f"Skipped button due to error: {e}")
# Get the page content
page_source = await page.content()
# Close the browser
await browser.close()
# Continue scraping logic here...
print("Scraping done")
# Save the page content to a file
with open("/cache/scraped_page.html", "w") as fp:
fp.write(page_source)
return page_source
@asset(io_manager_key="html_io_manager", name="raw") @asset(io_manager_key="html_io_manager", name="raw")
def raw_html(context: dg.AssetExecutionContext) -> Iterator[dg.Output[str]]: def raw_html(context: dg.AssetExecutionContext) -> Iterator[dg.Output[str]]:
page_source = asyncio.run(main()) page_source = asyncio.run(scrape(url=URL))
now = datetime.now(tz=timezone.utc) now = datetime.now(tz=timezone.utc)
date_str = now.strftime("%Y-%m-%d") date_str = now.strftime("%Y-%m-%d")
@@ -94,18 +43,67 @@ def raw_html(context: dg.AssetExecutionContext) -> Iterator[dg.Output[str]]:
date_str = date_obj.strftime("%Y-%m-%d") date_str = date_obj.strftime("%Y-%m-%d")
context.log.info(f"Found date: {date_str}") context.log.info(f"Found date: {date_str}")
context.log_event( context.log_event(
dg.AssetMaterialization(asset_key=daily_html.key, partition=date_str) dg.AssetMaterialization(asset_key=raw_html.key, partition=date_str)
) )
except Exception as e: except Exception as e:
context.log.error(f"Parsing error: {e}") context.log.error(f"Parsing error: {e}")
@asset(deps=[raw_html], partitions_def=daily_table_partitions_def)
def daily_table() -> None: ...
@asset( @asset(
io_manager_key="html_io_manager", deps=[raw_html],
io_manager_key="json_io_manager",
partitions_def=daily_partitions_def, partitions_def=daily_partitions_def,
automation_condition=dg.AutomationCondition.eager(),
output_required=False,
) )
def daily_html() -> str: ... def raw_daily(context: dg.AssetExecutionContext) -> None:
base = (
Path(context.resources.json_io_manager.base_dir).joinpath(*raw_html.key.path)
/ context.partition_key
)
if files := list(base.glob("*.html")):
logger.info(f"Found {len(files)} html files")
page_source = open(files[-1]).read()
for title, description, df in extract_tables(page_source):
# TODO: when scraping click the "View Strategy Criteria" texts and record the
# information
if not title:
logger.info(
"No title!",
description=description,
num_rows=0 if df is None else len(df),
)
continue
class MyAssetConfig(dg.Config): if df is None:
image: str = "bla" logger.info("No data!", title=title, description=description)
continue
slug = slugify(title)
output_context = dg.build_output_context(
asset_key=dg.AssetKey(
[APP, "daily", context.partition_key, slug],
),
resources=context.resources.original_resource_dict,
)
context.resources.json_io_manager.handle_output(
output_context, df.to_dict(orient="records")
)
context.log_event(
dg.AssetMaterialization(
asset_key=daily_table.key,
partition=f"{context.partition_key}|{slug}",
metadata={
"title": dg.MetadataValue.text(title),
"slug": dg.MetadataValue.text(slug),
"description": dg.MetadataValue.text(description),
"rows": dg.MetadataValue.int(len(df)),
},
)
)
context.instance.add_dynamic_partitions(table_partitions_def.name, [slug])

View File

@@ -4,6 +4,7 @@ import sensors
from dagster_polars import PolarsParquetIOManager from dagster_polars import PolarsParquetIOManager
from icecream import install from icecream import install
from shared.config import APP, STORAGE_DIR from shared.config import APP, STORAGE_DIR
from shared.io_manager import JsonIOManager
from shared.io_manager.html import HtmlIOManager from shared.io_manager.html import HtmlIOManager
import dagster as dg import dagster as dg
@@ -20,8 +21,9 @@ definitions = dg.Definitions(
], ],
resources={ resources={
"html_io_manager": HtmlIOManager(base_dir=STORAGE_DIR), "html_io_manager": HtmlIOManager(base_dir=STORAGE_DIR),
"json_io_manager": JsonIOManager(base_dir=STORAGE_DIR),
"polars_parquet_io_manager": PolarsParquetIOManager(base_dir=STORAGE_DIR), "polars_parquet_io_manager": PolarsParquetIOManager(base_dir=STORAGE_DIR),
}, },
jobs=[jobs.raw_html_job], jobs=[jobs.raw_html_job],
sensors=[sensors.check_update], sensors=[sensors.check_update, sensors.parse_raw],
) )

View File

@@ -7,3 +7,8 @@ raw_html_job = dg.define_asset_job(
selection=[assets.raw_html.key], selection=[assets.raw_html.key],
tags={"docker/image": "dagster-code-stocks-playwright"}, tags={"docker/image": "dagster-code-stocks-playwright"},
) )
extract_job = dg.define_asset_job(
"extract_job",
selection=[assets.raw_daily.key],
)

View File

@@ -5,3 +5,9 @@ import dagster as dg
daily_partitions_def = dg.DailyPartitionsDefinition( daily_partitions_def = dg.DailyPartitionsDefinition(
start_date="2025-07-01", end_offset=1, timezone=os.environ.get("TZ", "UTC") start_date="2025-07-01", end_offset=1, timezone=os.environ.get("TZ", "UTC")
) )
table_partitions_def = dg.DynamicPartitionsDefinition(name="tables")
daily_table_partitions_def = dg.MultiPartitionsDefinition(
{"date": daily_partitions_def, "source": table_partitions_def}
)

View File

@@ -1,13 +1,18 @@
from collections.abc import Iterator from collections.abc import Iterator
from datetime import datetime from datetime import datetime
import assets
import jobs import jobs
import pendulum
import requests import requests
import structlog
from config import URL from config import URL
from utils import extract_date from utils.extracter import extract_date
import dagster as dg import dagster as dg
logger = structlog.get_logger()
@dg.sensor(job=jobs.raw_html_job, minimum_interval_seconds=4 * 60 * 60) @dg.sensor(job=jobs.raw_html_job, minimum_interval_seconds=4 * 60 * 60)
def check_update(context: dg.SensorEvaluationContext) -> Iterator[dg.RunRequest]: def check_update(context: dg.SensorEvaluationContext) -> Iterator[dg.RunRequest]:
@@ -30,5 +35,57 @@ def check_update(context: dg.SensorEvaluationContext) -> Iterator[dg.RunRequest]
now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S") now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
file = f"{now_str} stocks.html" file = f"{now_str} stocks.html"
context.log.info(f"Saving file: {file}") context.log.info(f"Saving file: {file}")
with open(f"/cache/{file}") as fp: with open(f"/cache/{file}", "w") as fp:
fp.write(response.text) fp.write(response.text)
@dg.sensor(job=jobs.extract_job, minimum_interval_seconds=2 * 60 * 60)
def parse_raw(context: dg.SensorEvaluationContext):
# TODO: use cursor from sensor to filter materialization events
# Get the known materialized partitions of daily_tables
daily_partitions = context.instance.get_materialized_partitions(
assets.daily_table.key
)
dates = [x.split("|")[0] for x in daily_partitions]
ic(daily_partitions, dates)
# Get metadata for the raw asset (assumes it's tracked or logged with metadata)
events = list(
context.instance.get_event_records(
event_records_filter=dg.EventRecordsFilter(
event_type=dg.DagsterEventType.ASSET_MATERIALIZATION,
asset_key=assets.raw_html.key,
),
limit=100,
)
)
# Track unique dates found in raw that are not materialized in daily_tables
unknown_dates = set()
for event in events:
metadata = event.event_log_entry.asset_materialization.metadata
date_str = None
ic(metadata)
for key, entry in metadata.items():
# TODO: move this general logic
if key.lower() in {"date", "partition", "partition_date"}:
date_str = entry.value
break
if not date_str:
continue
# Normalize and validate the date
try:
dt = pendulum.from_timestamp(int(date_str))
date_str = dt.strftime("%Y-%m-%d")
except Exception as e:
logger.error(f"Failed to parse date: {date_str}", input=date_str, e=e)
continue
if date_str not in dates:
unknown_dates.add(date_str)
ic(unknown_dates)
for date_str in sorted(unknown_dates):
yield dg.RunRequest(partition_key=date_str)

View File

@@ -1,19 +0,0 @@
import re
from collections.abc import Iterator
from datetime import date, datetime
from bs4 import BeautifulSoup
def extract_date(page_source: str) -> Iterator[date]:
# Parse with BeautifulSoup
soup = BeautifulSoup(page_source, "html.parser")
# Find the first <div> after </header>
if (header := soup.find("header")) and (div := header.find_next_sibling("div")):
# Extract date part using regex
match = re.search(r"(\d{1,2})(st|nd|rd|th)?\s+([A-Za-z]+)\s+(\d{4})", div.text)
if match:
day, _, month, year = match.groups()
date_obj = datetime.strptime(f"{day} {month} {year}", "%d %B %Y")
yield date_obj

View File

@@ -0,0 +1,55 @@
import re
from collections.abc import Iterator
from datetime import date, datetime
import pandas as pd
from bs4 import BeautifulSoup
from pandas import DataFrame
def extract_date(page_source: str) -> Iterator[date]:
# Parse with BeautifulSoup
soup = BeautifulSoup(page_source, "html.parser")
# Find the first <div> after </header>
if (header := soup.find("header")) and (div := header.find_next_sibling("div")):
# Extract date part using regex
match = re.search(r"(\d{1,2})(st|nd|rd|th)?\s+([A-Za-z]+)\s+(\d{4})", div.text)
if match:
day, _, month, year = match.groups()
date_obj = datetime.strptime(f"{day} {month} {year}", "%d %B %Y")
yield date_obj
def extract_tables(
page_source: str,
) -> Iterator[tuple[str | None, str | None, DataFrame]]:
soup = BeautifulSoup(page_source, "html.parser")
accordion_items = soup.find_all("div", class_="accordion-item")
for item in accordion_items:
# Extract the title
header = item.find("div", class_="accordion-header")
title = header.find("h2").get_text(strip=True) if header else None
# Extract the description
description_block = item.find("div", class_="accordion-description")
description = (
description_block.find("p").get_text(strip=True)
if description_block
else None
)
# Extract the table
table = item.find("table")
if table:
rows = []
for row in table.find_all("tr"):
cells = [
cell.get_text(strip=True) for cell in row.find_all(["th", "td"])
]
rows.append(cells)
if rows:
df = pd.DataFrame(rows[1:], columns=rows[0])
yield title, description, df

View File

@@ -0,0 +1,60 @@
async def scrape(url: str) -> str:
from playwright.async_api import async_playwright
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True)
context = await browser.new_context(viewport={"width": 1000, "height": 2000})
page = await context.new_page()
await page.goto(url, timeout=60000)
# Wait until at least one toggle button is present
await page.wait_for_selector(".toggle-btn", timeout=20000)
# Set zoom
await page.evaluate("document.body.style.zoom='50%'")
# Find all toggle buttons
toggle_buttons = await page.query_selector_all(".toggle-btn")
print(f"Found {len(toggle_buttons)} toggle buttons")
for i, btn in enumerate(toggle_buttons):
try:
# Ensure it's visible and enabled
if await btn.is_visible() and await btn.is_enabled():
await btn.click()
await page.wait_for_timeout(1000)
if i == len(toggle_buttons) - 1:
break
# Scroll down gradually
scroll_step = 500
total_height = await page.evaluate("() => document.body.scrollHeight")
current_position = 0
while current_position < total_height:
await page.evaluate(f"window.scrollTo(0, {current_position});")
await page.wait_for_timeout(100)
current_position += scroll_step
total_height = await page.evaluate(
"() => document.body.scrollHeight"
)
except Exception as e:
print(f"Skipped button due to error: {e}")
# Get the page content
page_source = await page.content()
# Close the browser
await browser.close()
# Continue scraping logic here...
print("Scraping done")
# Save the page content to a file
with open("/cache/scraped_page.html", "w") as fp:
fp.write(page_source)
return page_source

View File

@@ -0,0 +1,11 @@
import re
import unicodedata
def slugify(text: str) -> str:
# Normalize unicode characters
text = unicodedata.normalize("NFKD", text).encode("ascii", "ignore").decode("ascii")
# Replace non-word characters with hyphens
text = re.sub(r"[^\w\s-]", "", text).strip().lower()
# Replace spaces and repeated hyphens with a single hyphen
return re.sub(r"[-\s]+", "-", text)

View File

@@ -6,7 +6,7 @@ annotated-types==0.7.0
# via pydantic # via pydantic
antlr4-python3-runtime==4.13.2 antlr4-python3-runtime==4.13.2
# via dagster # via dagster
anyio==4.9.0 anyio==4.10.0
# via # via
# gql # gql
# starlette # starlette
@@ -26,7 +26,7 @@ botocore==1.40.1
# boto3 # boto3
# s3fs # s3fs
# s3transfer # s3transfer
certifi==2025.7.14 certifi==2025.8.3
# via requests # via requests
charset-normalizer==3.4.2 charset-normalizer==3.4.2
# via requests # via requests

View File

@@ -1,22 +0,0 @@
from typing import Any, Optional
class MyIOManager(PolarsParquetIOManager):
def _load_partition_from_path(
self,
context: InputContext,
partition_key: str,
path: "UPath",
backcompat_path: Optional["UPath"] = None,
) -> Any:
try:
return super()._load_partition_from_path(
context, partition_key, path, backcompat_path
)
except FileNotFoundError:
# Handle the case where the partition file does not exist
context.log.warning(
f"Partition file not found for key {partition_key} at path {path}. "
"Returning an empty DataFrame."
)
return None

View File

@@ -1,51 +0,0 @@
import os
from pathlib import Path
from dotenv import find_dotenv, load_dotenv
from icecream import ic
from dagster import AssetKey, DagsterInstance
def delete_partition(instance, partition_def_name, partition_key):
try:
# This does not seem to work, perhaps because it is not a dynamic partition?
# All materializations can be deleted through the UI, but not one by one
instance.delete_dynamic_partition(partition_def_name, partition_key)
except Exception as e:
print(f"Error deleting partition: {e}")
def main(instance):
print(f"Partition '{partition_key}' deleted successfully.")
def detect_previous_partition(instance, name):
ic(name)
records = instance.get_latest_materialization_events(
(AssetKey(name),),
# event_type="ASSET_MATERIALIZATION",
# asset_key=(partition_key,),
# limit=100,
)
print(records)
if __name__ == "__main__":
partition_def_name = "asset_single_1"
partition_key = "2025-07-20" # Example partition key
load_dotenv(find_dotenv())
os.environ["DAGSTER_HOME"] = str(Path(__file__).parent.parent.parent)
for k, v in os.environ.items():
if k.startswith("POSTGRES_"):
os.environ[f"DAGSTER_{k}"] = v
os.environ["DAGSTER_POSTGRES_HOST"] = "localhost"
instance = DagsterInstance.get()
# delete_partition(instance, partition_def_name, partition_key)
detect_previous_partition(instance, partition_def_name)

View File

@@ -1,8 +0,0 @@
#!/usr/bin/env bash
rsync -av /opt/dagster/src/app/vinyl/ \
/Volumes/dagster/src/app/vinyl/ \
--include='*.py' \
--include='*requirements.txt' \
--exclude='__pycache__/' \
-progress \
--delete $*

View File

@@ -1,34 +0,0 @@
import time
from dagster import AssetMaterialization, Output, config_mapping, job, op
@op(config_schema={"config_param": str})
def hello(context):
time.sleep(1)
print("halllo")
return Output(123, metadata={"aa": context.op_config["config_param"]})
@op
def goodbye(context, x: int):
time.sleep(2)
print("doooei", x)
context.log_event(
AssetMaterialization(
asset_key="my_asset",
metadata={"my_meta": 444},
description="A very useful value!",
)
)
return 2
@config_mapping(config_schema={"simplified_param": str})
def simplified_config(val):
return {"ops": {"hello": {"config": {"config_param": val["simplified_param"]}}}}
@job
def my_job():
goodbye(hello())

View File

@@ -6,7 +6,7 @@ annotated-types==0.7.0
# via pydantic # via pydantic
antlr4-python3-runtime==4.13.2 antlr4-python3-runtime==4.13.2
# via dagster # via dagster
anyio==4.9.0 anyio==4.10.0
# via # via
# gql # gql
# starlette # starlette
@@ -26,7 +26,7 @@ botocore==1.40.1
# boto3 # boto3
# s3fs # s3fs
# s3transfer # s3transfer
certifi==2025.7.14 certifi==2025.8.3
# via requests # via requests
charset-normalizer==3.4.2 charset-normalizer==3.4.2
# via requests # via requests

View File

@@ -170,7 +170,9 @@ def works(context: dg.AssetExecutionContext) -> Iterator[dg.Output[pl.DataFrame]
}, },
output_required=False, output_required=False,
dagster_type=patito_model_to_dagster_type(Deal), dagster_type=patito_model_to_dagster_type(Deal),
automation_condition=dg.AutomationCondition.eager(), automation_condition=dg.AutomationCondition.on_missing().without(
dg.AutomationCondition.in_latest_time_window()
),
) )
def new_deals( def new_deals(
context: dg.AssetExecutionContext, partitions: dict[str, pl.LazyFrame | None] context: dg.AssetExecutionContext, partitions: dict[str, pl.LazyFrame | None]

View File

@@ -6,7 +6,7 @@ annotated-types==0.7.0
# via pydantic # via pydantic
antlr4-python3-runtime==4.13.2 antlr4-python3-runtime==4.13.2
# via dagster # via dagster
anyio==4.9.0 anyio==4.10.0
# via # via
# gql # gql
# starlette # starlette
@@ -32,7 +32,7 @@ botocore==1.40.1
# s3transfer # s3transfer
cattrs==25.1.1 cattrs==25.1.1
# via requests-cache # via requests-cache
certifi==2025.7.14 certifi==2025.8.3
# via requests # via requests
charset-normalizer==3.4.2 charset-normalizer==3.4.2
# via # via

View File

@@ -22,10 +22,9 @@ definitions = dg.Definitions(
"polars_parquet_io_manager": PolarsParquetIOManager(base_dir=STORAGE_DIR), "polars_parquet_io_manager": PolarsParquetIOManager(base_dir=STORAGE_DIR),
}, },
sensors=[ sensors=[
sensors.list_locations, # sensors.list_locations,
sensors.list_latitudes, sensors.list_latitudes,
sensors.list_longitudes, # sensors.list_longitudes,
sensors.retrieve_weather,
sensors.retrieve_weather, sensors.retrieve_weather,
], ],
) )

8
entrypoint.sh Normal file
View File

@@ -0,0 +1,8 @@
#!/bin/sh
set -e
echo "Rendering dagster.yaml from template..."
envsubst < dagster.yaml.template > dagster.yaml
echo "Starting Dagster: $@"
exec "$@"

19
poetry.lock generated
View File

@@ -1,19 +0,0 @@
# This file is automatically @generated by Poetry 1.7.1 and should not be changed by hand.
[[package]]
name = "seven"
version = "1.0.0"
description = "Python 2.5 compatibility wrapper for Python 2.7 code."
optional = false
python-versions = "*"
files = [
{file = "seven-1.0.0.tar.gz", hash = "sha256:e80157857dc378545b0cd8626668bf0e20d7f3608a5587f3fcc71a56d2416814"},
]
[package.extras]
tests = ["zope.testing"]
[metadata]
lock-version = "2.0"
python-versions = "*"
content-hash = "edfc27fcb4a7dc1b1a11f2224d7b7f3e936c5f624df1dd86207c4dc08e047b5d"