rewrite to dagster as dg

This commit is contained in:
2025-07-26 10:03:39 +02:00
parent ded11d9e60
commit 62c25b32c8
5 changed files with 51 additions and 66 deletions

View File

@@ -9,35 +9,27 @@ from plato.fetch import scrape_plato
from sounds.fetch import fetch_deals from sounds.fetch import fetch_deals
from utils import parse_date from utils import parse_date
from dagster import ( import dagster as dg
DailyPartitionsDefinition,
Failure,
Field,
MultiPartitionsDefinition,
OpExecutionContext,
StaticPartitionsDefinition,
asset,
)
SOURCES = ["plato", "sounds"] SOURCES = ["plato", "sounds"]
logger = structlog.get_logger() logger = structlog.get_logger()
partitions_def = MultiPartitionsDefinition( partitions_def = dg.MultiPartitionsDefinition(
{ {
"date": DailyPartitionsDefinition(start_date="2024-09-01", end_offset=1), "date": dg.DailyPartitionsDefinition(start_date="2024-09-01", end_offset=1),
"source": StaticPartitionsDefinition(SOURCES), "source": dg.StaticPartitionsDefinition(SOURCES),
} }
) )
@asset( @dg.asset(
io_manager_key="polars_parquet_io_manager", io_manager_key="polars_parquet_io_manager",
partitions_def=partitions_def, partitions_def=partitions_def,
metadata={ metadata={
"partition_by": ["date", "source"], "partition_by": ["date", "source"],
}, },
config_schema={"import_dir": Field(str, default_value="/storage/import")}, config_schema={"import_dir": dg.Field(str, default_value="/storage/import")},
) )
def deals(context): def deals(context):
ic() ic()
@@ -53,7 +45,7 @@ def deals(context):
days = (date - datetime.today()).days days = (date - datetime.today()).days
ic(days) ic(days)
if days > 0: if days > 0:
raise Failure(f"Cannot materialize for the future: {date.date()}") raise dg.Failure(f"Cannot materialize for the future: {date.date()}")
if days < -1: if days < -1:
if source == "sounds": if source == "sounds":
pattern = f"{import_dir}/{date.date()}_*_sounds.csv" pattern = f"{import_dir}/{date.date()}_*_sounds.csv"
@@ -70,7 +62,7 @@ def deals(context):
) )
except Exception as e: except Exception as e:
logger.error("Failed to load CSV file!", error=e) logger.error("Failed to load CSV file!", error=e)
raise Failure(f"Cannot materialize for the past: {date.date()}") raise dg.Failure(f"Cannot materialize for the past: {date.date()}")
if source == "plato": if source == "plato":
logger.info("Scraping Plato") logger.info("Scraping Plato")
@@ -90,8 +82,13 @@ def deals(context):
) )
@asset(deps=[deals], io_manager_key="polars_parquet_io_manager") @dg.asset(
def new_deals(context: OpExecutionContext) -> pl.DataFrame: deps=[deals.key],
ins={"df": dg.AssetIn(key=deals.key)},
automation_condition=dg.AutomationCondition.eager(),
io_manager_key="polars_parquet_io_manager",
)
def new_deals(context: dg.OpExecutionContext) -> pl.DataFrame:
"""Combine deals from Plato and Sounds into a single DataFrame.""" """Combine deals from Plato and Sounds into a single DataFrame."""
ic() ic()
storage_dir = context.resources.polars_parquet_io_manager.base_dir storage_dir = context.resources.polars_parquet_io_manager.base_dir
@@ -160,17 +157,12 @@ def new_deals(context: OpExecutionContext) -> pl.DataFrame:
).pl() ).pl()
@asset( @dg.asset(
deps=[new_deals.key],
ins={"df": dg.AssetIn(key=new_deals.key)},
io_manager_key="polars_parquet_io_manager", io_manager_key="polars_parquet_io_manager",
automation_condition=dg.AutomationCondition.eager(),
) )
def works(new_deals: pl.DataFrame) -> pl.DataFrame: def works(df: pl.DataFrame) -> pl.DataFrame:
# Pandas columns = ["artist", "title", "release"]
# columns = ["artist", "title"] return df[columns].unique()
# return pl.from_pandas(new_deals[columns].to_pandas().drop_duplicates())
# Polars
# return new_deals[columns].unique(subset=columns)
# DuckDB
with duckdb.connect() as con:
return con.execute("SELECT DISTINCT artist, title, release FROM new_deals").pl()

View File

@@ -10,7 +10,7 @@ from jobs import check_partitions_job, deals_job, musicbrainz_lookup_job
from schedules import deals_schedule from schedules import deals_schedule
from sensors import musicbrainz_lookup_sensor from sensors import musicbrainz_lookup_sensor
from dagster import Definitions, load_assets_from_modules import dagster as dg
class PandasDuckDBIOManager(DuckDBIOManager): class PandasDuckDBIOManager(DuckDBIOManager):
@@ -20,13 +20,13 @@ class PandasDuckDBIOManager(DuckDBIOManager):
install() install()
definitions = Definitions( definitions = dg.Definitions(
assets=[ assets=[
asset.with_attributes( asset.with_attributes(
group_names_by_key={asset.key: "vinyl"}, group_names_by_key={asset.key: "vinyl"},
tags_by_key={asset.key: {"app": "vinyl"}}, tags_by_key={asset.key: {"app": "vinyl"}},
) )
for asset in load_assets_from_modules([assets]) for asset in dg.load_assets_from_modules([assets])
], ],
resources={ resources={
"polars_parquet_io_manager": PolarsParquetIOManager(base_dir="/storage"), "polars_parquet_io_manager": PolarsParquetIOManager(base_dir="/storage"),
@@ -34,5 +34,12 @@ definitions = Definitions(
}, },
jobs=[deals_job, check_partitions_job, musicbrainz_lookup_job], jobs=[deals_job, check_partitions_job, musicbrainz_lookup_job],
schedules=[deals_schedule], schedules=[deals_schedule],
sensors=[musicbrainz_lookup_sensor], sensors=[
dg.AutomationConditionSensorDefinition(
"run_tags_automation_condition_sensor",
target=dg.AssetSelection.all(),
default_status=dg.DefaultSensorStatus.RUNNING,
),
musicbrainz_lookup_sensor,
],
) )

View File

@@ -1,27 +1,20 @@
import polars as pl import polars as pl
from assets import deals, new_deals, works from assets import deals, new_deals, works
from dagster import ( import dagster as dg
AssetKey,
AssetMaterialization,
OpExecutionContext,
define_asset_job,
job,
op,
)
deals_job = define_asset_job( deals_job = dg.define_asset_job(
"deals_job", selection=[deals], partitions_def=deals.partitions_def "deals_job", selection=[deals.key], partitions_def=deals.partitions_def
) )
@op(required_resource_keys={"polars_parquet_io_manager"}) @dg.op(required_resource_keys={"polars_parquet_io_manager"})
def check_partitions(context: OpExecutionContext): def check_partitions(context: dg.OpExecutionContext):
asset_key = "deals" asset_key = "deals"
# Fetch the materializations for the asset key # Fetch the materializations for the asset key
materializations = context.instance.get_materialized_partitions( materializations = context.instance.get_materialized_partitions(
asset_key=AssetKey(asset_key) asset_key=dg.AssetKey(asset_key)
) )
ic(materializations) ic(materializations)
@@ -40,15 +33,15 @@ def check_partitions(context: OpExecutionContext):
if partition not in materializations: if partition not in materializations:
context.log.info(f"Missing partition: {partition}") context.log.info(f"Missing partition: {partition}")
context.log_event( context.log_event(
AssetMaterialization(asset_key=asset_key, partition=partition) dg.AssetMaterialization(asset_key=asset_key, partition=partition)
) )
@job @dg.job
def check_partitions_job(): def check_partitions_job():
check_partitions() check_partitions()
musicbrainz_lookup_job = define_asset_job( musicbrainz_lookup_job = dg.define_asset_job(
"musicbrainz_lookup_job", selection=[works, new_deals] "musicbrainz_lookup_job", selection=[works.key, new_deals.key]
) )

View File

@@ -1,10 +1,9 @@
from jobs import deals_job from jobs import deals_job
from dagster import DefaultScheduleStatus, build_schedule_from_partitioned_job import dagster as dg
deals_schedule = build_schedule_from_partitioned_job( deals_schedule = dg.build_schedule_from_partitioned_job(
job=deals_job, job=deals_job,
hour_of_day=7, hour_of_day=7,
# execution_timezone="Europe/Amsterdam", default_status=dg.DefaultScheduleStatus.RUNNING,
default_status=DefaultScheduleStatus.RUNNING,
) )

View File

@@ -1,22 +1,16 @@
from assets import deals from assets import deals
from jobs import musicbrainz_lookup_job from jobs import musicbrainz_lookup_job
from dagster import ( import dagster as dg
DefaultSensorStatus,
EventLogEntry,
RunRequest,
SensorEvaluationContext,
asset_sensor,
)
@asset_sensor( @dg.asset_sensor(
asset_key=deals.key, asset_key=deals.key,
job=musicbrainz_lookup_job, job=musicbrainz_lookup_job,
default_status=DefaultSensorStatus.RUNNING, default_status=dg.DefaultSensorStatus.STOPPED,
) )
def musicbrainz_lookup_sensor( def musicbrainz_lookup_sensor(
context: SensorEvaluationContext, asset_event: EventLogEntry context: dg.SensorEvaluationContext, asset_event: dg.EventLogEntry
): ):
assert asset_event.dagster_event and asset_event.dagster_event.asset_key assert asset_event.dagster_event and asset_event.dagster_event.asset_key
yield RunRequest(run_key=context.cursor) yield dg.RunRequest(run_key=context.cursor)