fix works asset
This commit is contained in:
@@ -127,17 +127,18 @@ def cleaned_deals(
|
|||||||
deps=[cleaned_deals],
|
deps=[cleaned_deals],
|
||||||
io_manager_key="polars_parquet_io_manager",
|
io_manager_key="polars_parquet_io_manager",
|
||||||
automation_condition=dg.AutomationCondition.eager(),
|
automation_condition=dg.AutomationCondition.eager(),
|
||||||
|
output_required=False,
|
||||||
)
|
)
|
||||||
def works(context: dg.AssetExecutionContext) -> pl.DataFrame | None:
|
def works(context: dg.AssetExecutionContext) -> Iterator[dg.Output[pl.DataFrame]]:
|
||||||
"""Aggregate works from cleaned deals."""
|
"""Aggregate works from cleaned deals."""
|
||||||
partitions = context.instance.get_materialized_partitions(cleaned_deals.key)
|
partitions = context.instance.get_materialized_partitions(cleaned_deals.key)
|
||||||
ic(partitions)
|
ic(partitions)
|
||||||
logger.info("Works", partitions=partitions)
|
logger.info("Works", partitions=partitions)
|
||||||
dfs = list(load_partitions(context, cleaned_deals.key, partitions))
|
dfs = list(load_partitions(context, cleaned_deals.key, partitions))
|
||||||
|
ic(len(dfs))
|
||||||
if dfs:
|
if dfs:
|
||||||
columns = ["artist", "title", "release"]
|
columns = ["artist", "title", "release"]
|
||||||
return pl.concat(dfs, how="vertical_relaxed").select(columns).unique()
|
yield dg.Output(pl.concat(dfs, how="vertical_relaxed").select(columns).unique())
|
||||||
return None
|
|
||||||
|
|
||||||
|
|
||||||
@asset(
|
@asset(
|
||||||
|
|||||||
@@ -1,8 +1,6 @@
|
|||||||
from collections.abc import Iterator
|
from collections.abc import Iterator
|
||||||
from typing import Mapping
|
from typing import Mapping
|
||||||
|
|
||||||
import polars as pl
|
|
||||||
|
|
||||||
import dagster as dg
|
import dagster as dg
|
||||||
|
|
||||||
|
|
||||||
@@ -87,7 +85,7 @@ def parse_partition_keys(
|
|||||||
|
|
||||||
def load_partitions(
|
def load_partitions(
|
||||||
context: dg.AssetExecutionContext, asset_key: dg.AssetKey, partitions: set[str]
|
context: dg.AssetExecutionContext, asset_key: dg.AssetKey, partitions: set[str]
|
||||||
) -> Iterator[pl.DataFrame]:
|
) -> Iterator[object]:
|
||||||
"""
|
"""
|
||||||
Load data from an asset for the specified partitions.
|
Load data from an asset for the specified partitions.
|
||||||
|
|
||||||
@@ -99,8 +97,12 @@ def load_partitions(
|
|||||||
Yields:
|
Yields:
|
||||||
DataFrames for each partition specified.
|
DataFrames for each partition specified.
|
||||||
"""
|
"""
|
||||||
from definitions import definitions
|
|
||||||
|
|
||||||
loader = definitions.get_asset_value_loader(instance=context.instance)
|
loader = context.repository_def.get_asset_value_loader(instance=context.instance)
|
||||||
for partition_key in partitions:
|
for partition_key in partitions:
|
||||||
yield loader.load_asset_value(asset_key=asset_key, partition_key=partition_key)
|
if (
|
||||||
|
value := loader.load_asset_value(
|
||||||
|
asset_key=asset_key, partition_key=partition_key
|
||||||
|
)
|
||||||
|
) is not None:
|
||||||
|
yield value
|
||||||
|
|||||||
Reference in New Issue
Block a user