diff --git a/apps/stocks/src/assets.py b/apps/stocks/src/assets.py index e69de29..5ab9ee0 100644 --- a/apps/stocks/src/assets.py +++ b/apps/stocks/src/assets.py @@ -0,0 +1,12 @@ +from functools import partial + +from config import APP + +import dagster as dg + +asset = partial(dg.asset, key_prefix=APP) + + +@asset +def raw_html() -> None: + print("todo") diff --git a/apps/stocks/src/config.py b/apps/stocks/src/config.py new file mode 100644 index 0000000..fefeadf --- /dev/null +++ b/apps/stocks/src/config.py @@ -0,0 +1,4 @@ +import os +from pathlib import Path + +APP = os.environ.get("APP", Path(__file__).parent.parent.name) diff --git a/apps/stocks/src/definitions.py b/apps/stocks/src/definitions.py index 7e0e20b..db3528d 100644 --- a/apps/stocks/src/definitions.py +++ b/apps/stocks/src/definitions.py @@ -1,6 +1,7 @@ import os import assets +import sensors from dagster_polars import PolarsParquetIOManager from icecream import install @@ -24,4 +25,5 @@ definitions = dg.Definitions( base_dir=os.environ.get("STORAGE_DIR", "/storage") + f"/{APP}" ), }, + sensors=[sensors.check_update], ) diff --git a/apps/stocks/src/jobs.py b/apps/stocks/src/jobs.py new file mode 100644 index 0000000..d7c3099 --- /dev/null +++ b/apps/stocks/src/jobs.py @@ -0,0 +1,5 @@ +import assets + +import dagster as dg + +raw_html_job = dg.define_asset_job("deals_job", selection=[assets.raw_html.key]) diff --git a/apps/stocks/src/sensors.py b/apps/stocks/src/sensors.py new file mode 100644 index 0000000..bed19ab --- /dev/null +++ b/apps/stocks/src/sensors.py @@ -0,0 +1,15 @@ +from collections.abc import Iterator +from datetime import date + +import jobs + +import dagster as dg + + +@dg.sensor(job=jobs.raw_html_job) +def check_update(context: dg.SensorEvaluationContext) -> Iterator[dg.RunRequest]: + ic(context.cursor) + + yield dg.RunRequest() + + context.update_cursor(date.today().strftime("%Y-%m-%d"))