diff --git a/apps/vinyl/src/assets.py b/apps/vinyl/src/assets.py index 90e543d..059457e 100644 --- a/apps/vinyl/src/assets.py +++ b/apps/vinyl/src/assets.py @@ -127,17 +127,18 @@ def cleaned_deals( deps=[cleaned_deals], io_manager_key="polars_parquet_io_manager", automation_condition=dg.AutomationCondition.eager(), + output_required=False, ) -def works(context: dg.AssetExecutionContext) -> pl.DataFrame | None: +def works(context: dg.AssetExecutionContext) -> Iterator[dg.Output[pl.DataFrame]]: """Aggregate works from cleaned deals.""" partitions = context.instance.get_materialized_partitions(cleaned_deals.key) ic(partitions) logger.info("Works", partitions=partitions) dfs = list(load_partitions(context, cleaned_deals.key, partitions)) + ic(len(dfs)) if dfs: columns = ["artist", "title", "release"] - return pl.concat(dfs, how="vertical_relaxed").select(columns).unique() - return None + yield dg.Output(pl.concat(dfs, how="vertical_relaxed").select(columns).unique()) @asset( diff --git a/shared/src/shared/utils.py b/shared/src/shared/utils.py index 9f94f00..8379799 100644 --- a/shared/src/shared/utils.py +++ b/shared/src/shared/utils.py @@ -1,8 +1,6 @@ from collections.abc import Iterator from typing import Mapping -import polars as pl - import dagster as dg @@ -87,7 +85,7 @@ def parse_partition_keys( def load_partitions( context: dg.AssetExecutionContext, asset_key: dg.AssetKey, partitions: set[str] -) -> Iterator[pl.DataFrame]: +) -> Iterator[object]: """ Load data from an asset for the specified partitions. @@ -99,8 +97,12 @@ def load_partitions( Yields: DataFrames for each partition specified. """ - from definitions import definitions - loader = definitions.get_asset_value_loader(instance=context.instance) + loader = context.repository_def.get_asset_value_loader(instance=context.instance) for partition_key in partitions: - yield loader.load_asset_value(asset_key=asset_key, partition_key=partition_key) + if ( + value := loader.load_asset_value( + asset_key=asset_key, partition_key=partition_key + ) + ) is not None: + yield value