Compare commits

...

10 Commits

Author SHA1 Message Date
030424e124 delete old files 2025-08-05 09:01:19 +02:00
81c2035d02 store old files 2025-08-05 09:01:07 +02:00
2d20cdf256 remove polars-lts-cpu 2025-08-05 08:36:49 +02:00
28a195a256 tweaks 2025-08-04 20:05:20 +02:00
629bc6c648 put entrypoint in own file 2025-08-04 19:17:32 +02:00
4bc5770cce no need for lock 2025-08-04 19:10:50 +02:00
b26ba7aa35 tweaks 2025-08-04 19:09:32 +02:00
17ca8669ef parse stocks pages 2025-08-04 18:05:53 +02:00
f7318e85cd move utils 2025-08-04 18:02:00 +02:00
a3d931c1b3 update requirements files 2025-08-04 18:01:47 +02:00
26 changed files with 305 additions and 303 deletions

View File

@@ -14,8 +14,7 @@ RUN curl -LsSf https://astral.sh/uv/install.sh | sh
WORKDIR /opt/dagster/home
COPY requirements.txt .
RUN uv pip install -r requirements.txt --system \
&& uv pip install polars-lts-cpu --system
RUN uv pip install -r requirements.txt --system
ARG APP
ENV APP=$APP

View File

@@ -1,8 +1,5 @@
FROM mcr.microsoft.com/playwright:v1.54.0-noble
ARG APP
ENV APP=$APP
ENV PYTHONPATH=/apps/$APP/src/:/shared/src/:$PYTHONPATH
ENV PATH="/venv/bin:/root/.local/bin:$PATH"
WORKDIR /opt/dagster/home
@@ -14,14 +11,15 @@ RUN curl -LsSf https://astral.sh/uv/install.sh | sh && \
COPY requirements.txt .
RUN . /venv/bin/activate && \
uv pip install -r requirements.txt && \
uv pip install polars-lts-cpu
uv pip install -r requirements.txt
RUN . /venv/bin/activate && \
uv pip install playwright && \
playwright install
ARG APP
ENV APP=$APP
ENV PYTHONPATH=/code/apps/$APP/src/:/code/shared/src/:$PYTHONPATH
# Run dagster gRPC server on port 4000
EXPOSE 4000

View File

@@ -15,22 +15,12 @@ ENV DAGSTER_HOME=/opt/dagster/home/
WORKDIR $DAGSTER_HOME
COPY dagster-requirements.txt requirements.txt
RUN uv pip install -r requirements.txt --system \
&& uv pip install polars-lts-cpu --system
RUN uv pip install -r requirements.txt --system
RUN mkdir -p $DAGSTER_HOME
# Create entrypoint that renders the dagster.yaml from a template
RUN cat << 'EOF' > /entrypoint.sh
#!/bin/sh
set -e
echo "Rendering dagster.yaml from template..."
envsubst < dagster.yaml.template > dagster.yaml
echo "Starting Dagster: $@"
exec "$@"
EOF
COPY entrypoint.sh /entrypoint.sh
RUN chmod +x /entrypoint.sh
ENTRYPOINT ["/entrypoint.sh"]

View File

@@ -6,7 +6,7 @@ annotated-types==0.7.0
# via pydantic
antlr4-python3-runtime==4.13.2
# via dagster
anyio==4.9.0
anyio==4.10.0
# via
# gql
# starlette
@@ -28,7 +28,7 @@ botocore==1.40.1
# boto3
# s3fs
# s3transfer
certifi==2025.7.14
certifi==2025.8.3
# via requests
charset-normalizer==3.4.2
# via requests

View File

@@ -6,17 +6,13 @@ annotated-types==0.7.0
# via pydantic
antlr4-python3-runtime==4.13.2
# via dagster
anyio==4.9.0
anyio==4.10.0
# via
# gql
# starlette
# watchfiles
asttokens==3.0.0
# via icecream
attrs==25.3.0
# via
# outcome
# trio
backoff==2.2.1
# via gql
beautifulsoup4==4.13.4
@@ -30,10 +26,8 @@ botocore==1.40.1
# boto3
# s3fs
# s3transfer
certifi==2025.7.14
# via
# requests
# selenium
certifi==2025.8.3
# via requests
charset-normalizer==3.4.2
# via requests
click==8.1.8
@@ -132,8 +126,6 @@ graphql-core==3.2.6
# graphql-relay
graphql-relay==3.2.0
# via graphene
greenlet==3.2.3
# via playwright
grpcio==1.74.0
# via
# dagster
@@ -141,9 +133,7 @@ grpcio==1.74.0
grpcio-health-checking==1.71.2
# via dagster
h11==0.16.0
# via
# uvicorn
# wsproto
# via uvicorn
httptools==0.6.4
# via uvicorn
humanfriendly==10.0
@@ -155,7 +145,6 @@ idna==3.10
# anyio
# email-validator
# requests
# trio
# yarl
jinja2==3.1.6
# via dagster
@@ -190,10 +179,6 @@ numpy==2.3.2
# seaborn
openpyxl==3.1.5
# via dev (pyproject.toml)
outcome==1.3.0.post0
# via
# trio
# trio-websocket
packaging==25.0
# via
# dagster-aws
@@ -210,10 +195,10 @@ patito==0.8.3
# via
# dev (pyproject.toml)
# dagster-polars
pendulum==3.1.0
# via dev (pyproject.toml)
pillow==11.3.0
# via matplotlib
playwright==1.54.0
# via dev (pyproject.toml)
polars==1.32.0
# via
# dagster-polars
@@ -240,8 +225,6 @@ pydantic-core==2.33.2
# via pydantic
pydantic-settings==2.10.1
# via dev (pyproject.toml)
pyee==13.0.0
# via playwright
pygments==2.19.2
# via
# icecream
@@ -249,15 +232,14 @@ pygments==2.19.2
pyparsing==3.2.3
# via matplotlib
pysocks==1.7.1
# via
# requests
# urllib3
# via requests
python-dateutil==2.9.0.post0
# via
# botocore
# graphene
# matplotlib
# pandas
# pendulum
python-dotenv==1.1.1
# via
# dagster
@@ -293,8 +275,6 @@ s3transfer==0.13.1
# via boto3
seaborn==0.13.2
# via dev (pyproject.toml)
selenium==4.34.2
# via dev (pyproject.toml)
setuptools==80.9.0
# via dagster
six==1.17.0
@@ -304,11 +284,7 @@ six==1.17.0
smmap==5.0.2
# via gitdb
sniffio==1.3.1
# via
# anyio
# trio
sortedcontainers==2.4.0
# via trio
# via anyio
soupsieve==2.7
# via beautifulsoup4
sqlalchemy==2.0.42
@@ -333,12 +309,6 @@ toposort==1.10
# via dagster
tqdm==4.67.1
# via dagster
trio==0.30.0
# via
# selenium
# trio-websocket
trio-websocket==0.12.2
# via selenium
typing-extensions==4.14.1
# via
# alembic
@@ -350,8 +320,6 @@ typing-extensions==4.14.1
# patito
# pydantic
# pydantic-core
# pyee
# selenium
# sqlalchemy
# starlette
# typing-inspection
@@ -360,7 +328,9 @@ typing-inspection==0.4.1
# pydantic
# pydantic-settings
tzdata==2025.2
# via pandas
# via
# pandas
# pendulum
universal-pathlib==0.2.6
# via
# dagster
@@ -370,7 +340,6 @@ urllib3==2.5.0
# botocore
# docker
# requests
# selenium
uvicorn==0.35.0
# via dagster-webserver
uvloop==0.21.0
@@ -379,12 +348,8 @@ watchdog==5.0.3
# via dagster
watchfiles==1.1.0
# via uvicorn
websocket-client==1.8.0
# via selenium
websockets==15.0.1
# via uvicorn
wsproto==1.2.0
# via trio-websocket
xlsxwriter==3.2.5
# via dev (pyproject.toml)
yarl==1.20.1

View File

@@ -2,81 +2,30 @@ import asyncio
from collections.abc import Iterator
from datetime import datetime, timezone
from functools import partial
from pathlib import Path
import structlog
from config import APP, URL
from partitions import daily_partitions_def
from playwright.async_api import async_playwright
from utils import extract_date
from partitions import (
daily_partitions_def,
daily_table_partitions_def,
table_partitions_def,
)
from utils.extracter import extract_date, extract_tables
from utils.scraper import scrape
from utils.text import slugify
import dagster as dg
TAGS = {"app": APP}
asset = partial(dg.asset, key_prefix=APP, tags=TAGS)
async def main() -> str:
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True)
context = await browser.new_context(viewport={"width": 1000, "height": 2000})
page = await context.new_page()
await page.goto(URL, timeout=60000)
# Wait until at least one toggle button is present
await page.wait_for_selector(".toggle-btn", timeout=20000)
# Set zoom
await page.evaluate("document.body.style.zoom='50%'")
# Find all toggle buttons
toggle_buttons = await page.query_selector_all(".toggle-btn")
print(f"Found {len(toggle_buttons)} toggle buttons")
for i, btn in enumerate(toggle_buttons):
try:
# Ensure it's visible and enabled
if await btn.is_visible() and await btn.is_enabled():
await btn.click()
await page.wait_for_timeout(1000)
if i == len(toggle_buttons) - 1:
break
# Scroll down gradually
scroll_step = 500
total_height = await page.evaluate("() => document.body.scrollHeight")
current_position = 0
while current_position < total_height:
await page.evaluate(f"window.scrollTo(0, {current_position});")
await page.wait_for_timeout(100)
current_position += scroll_step
total_height = await page.evaluate(
"() => document.body.scrollHeight"
)
except Exception as e:
print(f"Skipped button due to error: {e}")
# Get the page content
page_source = await page.content()
# Close the browser
await browser.close()
# Continue scraping logic here...
print("Scraping done")
# Save the page content to a file
with open("/cache/scraped_page.html", "w") as fp:
fp.write(page_source)
return page_source
logger = structlog.get_logger()
@asset(io_manager_key="html_io_manager", name="raw")
def raw_html(context: dg.AssetExecutionContext) -> Iterator[dg.Output[str]]:
page_source = asyncio.run(main())
page_source = asyncio.run(scrape(url=URL))
now = datetime.now(tz=timezone.utc)
date_str = now.strftime("%Y-%m-%d")
@@ -94,18 +43,67 @@ def raw_html(context: dg.AssetExecutionContext) -> Iterator[dg.Output[str]]:
date_str = date_obj.strftime("%Y-%m-%d")
context.log.info(f"Found date: {date_str}")
context.log_event(
dg.AssetMaterialization(asset_key=daily_html.key, partition=date_str)
dg.AssetMaterialization(asset_key=raw_html.key, partition=date_str)
)
except Exception as e:
context.log.error(f"Parsing error: {e}")
@asset(deps=[raw_html], partitions_def=daily_table_partitions_def)
def daily_table() -> None: ...
@asset(
io_manager_key="html_io_manager",
deps=[raw_html],
io_manager_key="json_io_manager",
partitions_def=daily_partitions_def,
automation_condition=dg.AutomationCondition.eager(),
output_required=False,
)
def daily_html() -> str: ...
def raw_daily(context: dg.AssetExecutionContext) -> None:
base = (
Path(context.resources.json_io_manager.base_dir).joinpath(*raw_html.key.path)
/ context.partition_key
)
if files := list(base.glob("*.html")):
logger.info(f"Found {len(files)} html files")
page_source = open(files[-1]).read()
for title, description, df in extract_tables(page_source):
# TODO: when scraping click the "View Strategy Criteria" texts and record the
# information
if not title:
logger.info(
"No title!",
description=description,
num_rows=0 if df is None else len(df),
)
continue
class MyAssetConfig(dg.Config):
image: str = "bla"
if df is None:
logger.info("No data!", title=title, description=description)
continue
slug = slugify(title)
output_context = dg.build_output_context(
asset_key=dg.AssetKey(
[APP, "daily", context.partition_key, slug],
),
resources=context.resources.original_resource_dict,
)
context.resources.json_io_manager.handle_output(
output_context, df.to_dict(orient="records")
)
context.log_event(
dg.AssetMaterialization(
asset_key=daily_table.key,
partition=f"{context.partition_key}|{slug}",
metadata={
"title": dg.MetadataValue.text(title),
"slug": dg.MetadataValue.text(slug),
"description": dg.MetadataValue.text(description),
"rows": dg.MetadataValue.int(len(df)),
},
)
)
context.instance.add_dynamic_partitions(table_partitions_def.name, [slug])

View File

@@ -4,6 +4,7 @@ import sensors
from dagster_polars import PolarsParquetIOManager
from icecream import install
from shared.config import APP, STORAGE_DIR
from shared.io_manager import JsonIOManager
from shared.io_manager.html import HtmlIOManager
import dagster as dg
@@ -20,8 +21,9 @@ definitions = dg.Definitions(
],
resources={
"html_io_manager": HtmlIOManager(base_dir=STORAGE_DIR),
"json_io_manager": JsonIOManager(base_dir=STORAGE_DIR),
"polars_parquet_io_manager": PolarsParquetIOManager(base_dir=STORAGE_DIR),
},
jobs=[jobs.raw_html_job],
sensors=[sensors.check_update],
sensors=[sensors.check_update, sensors.parse_raw],
)

View File

@@ -7,3 +7,8 @@ raw_html_job = dg.define_asset_job(
selection=[assets.raw_html.key],
tags={"docker/image": "dagster-code-stocks-playwright"},
)
extract_job = dg.define_asset_job(
"extract_job",
selection=[assets.raw_daily.key],
)

View File

@@ -5,3 +5,9 @@ import dagster as dg
daily_partitions_def = dg.DailyPartitionsDefinition(
start_date="2025-07-01", end_offset=1, timezone=os.environ.get("TZ", "UTC")
)
table_partitions_def = dg.DynamicPartitionsDefinition(name="tables")
daily_table_partitions_def = dg.MultiPartitionsDefinition(
{"date": daily_partitions_def, "source": table_partitions_def}
)

View File

@@ -1,13 +1,18 @@
from collections.abc import Iterator
from datetime import datetime
import assets
import jobs
import pendulum
import requests
import structlog
from config import URL
from utils import extract_date
from utils.extracter import extract_date
import dagster as dg
logger = structlog.get_logger()
@dg.sensor(job=jobs.raw_html_job, minimum_interval_seconds=4 * 60 * 60)
def check_update(context: dg.SensorEvaluationContext) -> Iterator[dg.RunRequest]:
@@ -30,5 +35,57 @@ def check_update(context: dg.SensorEvaluationContext) -> Iterator[dg.RunRequest]
now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
file = f"{now_str} stocks.html"
context.log.info(f"Saving file: {file}")
with open(f"/cache/{file}") as fp:
with open(f"/cache/{file}", "w") as fp:
fp.write(response.text)
@dg.sensor(job=jobs.extract_job, minimum_interval_seconds=2 * 60 * 60)
def parse_raw(context: dg.SensorEvaluationContext):
# TODO: use cursor from sensor to filter materialization events
# Get the known materialized partitions of daily_tables
daily_partitions = context.instance.get_materialized_partitions(
assets.daily_table.key
)
dates = [x.split("|")[0] for x in daily_partitions]
ic(daily_partitions, dates)
# Get metadata for the raw asset (assumes it's tracked or logged with metadata)
events = list(
context.instance.get_event_records(
event_records_filter=dg.EventRecordsFilter(
event_type=dg.DagsterEventType.ASSET_MATERIALIZATION,
asset_key=assets.raw_html.key,
),
limit=100,
)
)
# Track unique dates found in raw that are not materialized in daily_tables
unknown_dates = set()
for event in events:
metadata = event.event_log_entry.asset_materialization.metadata
date_str = None
ic(metadata)
for key, entry in metadata.items():
# TODO: move this general logic
if key.lower() in {"date", "partition", "partition_date"}:
date_str = entry.value
break
if not date_str:
continue
# Normalize and validate the date
try:
dt = pendulum.from_timestamp(int(date_str))
date_str = dt.strftime("%Y-%m-%d")
except Exception as e:
logger.error(f"Failed to parse date: {date_str}", input=date_str, e=e)
continue
if date_str not in dates:
unknown_dates.add(date_str)
ic(unknown_dates)
for date_str in sorted(unknown_dates):
yield dg.RunRequest(partition_key=date_str)

View File

@@ -1,19 +0,0 @@
import re
from collections.abc import Iterator
from datetime import date, datetime
from bs4 import BeautifulSoup
def extract_date(page_source: str) -> Iterator[date]:
# Parse with BeautifulSoup
soup = BeautifulSoup(page_source, "html.parser")
# Find the first <div> after </header>
if (header := soup.find("header")) and (div := header.find_next_sibling("div")):
# Extract date part using regex
match = re.search(r"(\d{1,2})(st|nd|rd|th)?\s+([A-Za-z]+)\s+(\d{4})", div.text)
if match:
day, _, month, year = match.groups()
date_obj = datetime.strptime(f"{day} {month} {year}", "%d %B %Y")
yield date_obj

View File

@@ -0,0 +1,55 @@
import re
from collections.abc import Iterator
from datetime import date, datetime
import pandas as pd
from bs4 import BeautifulSoup
from pandas import DataFrame
def extract_date(page_source: str) -> Iterator[date]:
# Parse with BeautifulSoup
soup = BeautifulSoup(page_source, "html.parser")
# Find the first <div> after </header>
if (header := soup.find("header")) and (div := header.find_next_sibling("div")):
# Extract date part using regex
match = re.search(r"(\d{1,2})(st|nd|rd|th)?\s+([A-Za-z]+)\s+(\d{4})", div.text)
if match:
day, _, month, year = match.groups()
date_obj = datetime.strptime(f"{day} {month} {year}", "%d %B %Y")
yield date_obj
def extract_tables(
page_source: str,
) -> Iterator[tuple[str | None, str | None, DataFrame]]:
soup = BeautifulSoup(page_source, "html.parser")
accordion_items = soup.find_all("div", class_="accordion-item")
for item in accordion_items:
# Extract the title
header = item.find("div", class_="accordion-header")
title = header.find("h2").get_text(strip=True) if header else None
# Extract the description
description_block = item.find("div", class_="accordion-description")
description = (
description_block.find("p").get_text(strip=True)
if description_block
else None
)
# Extract the table
table = item.find("table")
if table:
rows = []
for row in table.find_all("tr"):
cells = [
cell.get_text(strip=True) for cell in row.find_all(["th", "td"])
]
rows.append(cells)
if rows:
df = pd.DataFrame(rows[1:], columns=rows[0])
yield title, description, df

View File

@@ -0,0 +1,60 @@
async def scrape(url: str) -> str:
from playwright.async_api import async_playwright
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True)
context = await browser.new_context(viewport={"width": 1000, "height": 2000})
page = await context.new_page()
await page.goto(url, timeout=60000)
# Wait until at least one toggle button is present
await page.wait_for_selector(".toggle-btn", timeout=20000)
# Set zoom
await page.evaluate("document.body.style.zoom='50%'")
# Find all toggle buttons
toggle_buttons = await page.query_selector_all(".toggle-btn")
print(f"Found {len(toggle_buttons)} toggle buttons")
for i, btn in enumerate(toggle_buttons):
try:
# Ensure it's visible and enabled
if await btn.is_visible() and await btn.is_enabled():
await btn.click()
await page.wait_for_timeout(1000)
if i == len(toggle_buttons) - 1:
break
# Scroll down gradually
scroll_step = 500
total_height = await page.evaluate("() => document.body.scrollHeight")
current_position = 0
while current_position < total_height:
await page.evaluate(f"window.scrollTo(0, {current_position});")
await page.wait_for_timeout(100)
current_position += scroll_step
total_height = await page.evaluate(
"() => document.body.scrollHeight"
)
except Exception as e:
print(f"Skipped button due to error: {e}")
# Get the page content
page_source = await page.content()
# Close the browser
await browser.close()
# Continue scraping logic here...
print("Scraping done")
# Save the page content to a file
with open("/cache/scraped_page.html", "w") as fp:
fp.write(page_source)
return page_source

View File

@@ -0,0 +1,11 @@
import re
import unicodedata
def slugify(text: str) -> str:
# Normalize unicode characters
text = unicodedata.normalize("NFKD", text).encode("ascii", "ignore").decode("ascii")
# Replace non-word characters with hyphens
text = re.sub(r"[^\w\s-]", "", text).strip().lower()
# Replace spaces and repeated hyphens with a single hyphen
return re.sub(r"[-\s]+", "-", text)

View File

@@ -6,7 +6,7 @@ annotated-types==0.7.0
# via pydantic
antlr4-python3-runtime==4.13.2
# via dagster
anyio==4.9.0
anyio==4.10.0
# via
# gql
# starlette
@@ -26,7 +26,7 @@ botocore==1.40.1
# boto3
# s3fs
# s3transfer
certifi==2025.7.14
certifi==2025.8.3
# via requests
charset-normalizer==3.4.2
# via requests

View File

@@ -1,22 +0,0 @@
from typing import Any, Optional
class MyIOManager(PolarsParquetIOManager):
def _load_partition_from_path(
self,
context: InputContext,
partition_key: str,
path: "UPath",
backcompat_path: Optional["UPath"] = None,
) -> Any:
try:
return super()._load_partition_from_path(
context, partition_key, path, backcompat_path
)
except FileNotFoundError:
# Handle the case where the partition file does not exist
context.log.warning(
f"Partition file not found for key {partition_key} at path {path}. "
"Returning an empty DataFrame."
)
return None

View File

@@ -1,51 +0,0 @@
import os
from pathlib import Path
from dotenv import find_dotenv, load_dotenv
from icecream import ic
from dagster import AssetKey, DagsterInstance
def delete_partition(instance, partition_def_name, partition_key):
try:
# This does not seem to work, perhaps because it is not a dynamic partition?
# All materializations can be deleted through the UI, but not one by one
instance.delete_dynamic_partition(partition_def_name, partition_key)
except Exception as e:
print(f"Error deleting partition: {e}")
def main(instance):
print(f"Partition '{partition_key}' deleted successfully.")
def detect_previous_partition(instance, name):
ic(name)
records = instance.get_latest_materialization_events(
(AssetKey(name),),
# event_type="ASSET_MATERIALIZATION",
# asset_key=(partition_key,),
# limit=100,
)
print(records)
if __name__ == "__main__":
partition_def_name = "asset_single_1"
partition_key = "2025-07-20" # Example partition key
load_dotenv(find_dotenv())
os.environ["DAGSTER_HOME"] = str(Path(__file__).parent.parent.parent)
for k, v in os.environ.items():
if k.startswith("POSTGRES_"):
os.environ[f"DAGSTER_{k}"] = v
os.environ["DAGSTER_POSTGRES_HOST"] = "localhost"
instance = DagsterInstance.get()
# delete_partition(instance, partition_def_name, partition_key)
detect_previous_partition(instance, partition_def_name)

View File

@@ -1,8 +0,0 @@
#!/usr/bin/env bash
rsync -av /opt/dagster/src/app/vinyl/ \
/Volumes/dagster/src/app/vinyl/ \
--include='*.py' \
--include='*requirements.txt' \
--exclude='__pycache__/' \
-progress \
--delete $*

View File

@@ -1,34 +0,0 @@
import time
from dagster import AssetMaterialization, Output, config_mapping, job, op
@op(config_schema={"config_param": str})
def hello(context):
time.sleep(1)
print("halllo")
return Output(123, metadata={"aa": context.op_config["config_param"]})
@op
def goodbye(context, x: int):
time.sleep(2)
print("doooei", x)
context.log_event(
AssetMaterialization(
asset_key="my_asset",
metadata={"my_meta": 444},
description="A very useful value!",
)
)
return 2
@config_mapping(config_schema={"simplified_param": str})
def simplified_config(val):
return {"ops": {"hello": {"config": {"config_param": val["simplified_param"]}}}}
@job
def my_job():
goodbye(hello())

View File

@@ -6,7 +6,7 @@ annotated-types==0.7.0
# via pydantic
antlr4-python3-runtime==4.13.2
# via dagster
anyio==4.9.0
anyio==4.10.0
# via
# gql
# starlette
@@ -26,7 +26,7 @@ botocore==1.40.1
# boto3
# s3fs
# s3transfer
certifi==2025.7.14
certifi==2025.8.3
# via requests
charset-normalizer==3.4.2
# via requests

View File

@@ -170,7 +170,9 @@ def works(context: dg.AssetExecutionContext) -> Iterator[dg.Output[pl.DataFrame]
},
output_required=False,
dagster_type=patito_model_to_dagster_type(Deal),
automation_condition=dg.AutomationCondition.eager(),
automation_condition=dg.AutomationCondition.on_missing().without(
dg.AutomationCondition.in_latest_time_window()
),
)
def new_deals(
context: dg.AssetExecutionContext, partitions: dict[str, pl.LazyFrame | None]

View File

@@ -6,7 +6,7 @@ annotated-types==0.7.0
# via pydantic
antlr4-python3-runtime==4.13.2
# via dagster
anyio==4.9.0
anyio==4.10.0
# via
# gql
# starlette
@@ -32,7 +32,7 @@ botocore==1.40.1
# s3transfer
cattrs==25.1.1
# via requests-cache
certifi==2025.7.14
certifi==2025.8.3
# via requests
charset-normalizer==3.4.2
# via

View File

@@ -22,10 +22,9 @@ definitions = dg.Definitions(
"polars_parquet_io_manager": PolarsParquetIOManager(base_dir=STORAGE_DIR),
},
sensors=[
sensors.list_locations,
# sensors.list_locations,
sensors.list_latitudes,
sensors.list_longitudes,
sensors.retrieve_weather,
# sensors.list_longitudes,
sensors.retrieve_weather,
],
)

8
entrypoint.sh Normal file
View File

@@ -0,0 +1,8 @@
#!/bin/sh
set -e
echo "Rendering dagster.yaml from template..."
envsubst < dagster.yaml.template > dagster.yaml
echo "Starting Dagster: $@"
exec "$@"

19
poetry.lock generated
View File

@@ -1,19 +0,0 @@
# This file is automatically @generated by Poetry 1.7.1 and should not be changed by hand.
[[package]]
name = "seven"
version = "1.0.0"
description = "Python 2.5 compatibility wrapper for Python 2.7 code."
optional = false
python-versions = "*"
files = [
{file = "seven-1.0.0.tar.gz", hash = "sha256:e80157857dc378545b0cd8626668bf0e20d7f3608a5587f3fcc71a56d2416814"},
]
[package.extras]
tests = ["zope.testing"]
[metadata]
lock-version = "2.0"
python-versions = "*"
content-hash = "edfc27fcb4a7dc1b1a11f2224d7b7f3e936c5f624df1dd86207c4dc08e047b5d"