diff --git a/apps/vinyl/src/assets.py b/apps/vinyl/src/assets.py index 059457e..b814693 100644 --- a/apps/vinyl/src/assets.py +++ b/apps/vinyl/src/assets.py @@ -1,3 +1,4 @@ +import os from collections.abc import Iterator from datetime import datetime from functools import partial @@ -13,7 +14,7 @@ from models import Deal from partitions import daily_partitions_def, multi_partitions_def from plato.fetch import scrape_plato from plato.parse import parse as parse_plato -from shared.utils import get_partition_keys, load_partitions, parse_partition_keys +from shared.utils import get_partition_keys, parse_partition_keys from sounds.fetch import fetch_deals from sounds.parse import parse as parse_sounds from utils.email import EmailService @@ -134,11 +135,13 @@ def works(context: dg.AssetExecutionContext) -> Iterator[dg.Output[pl.DataFrame] partitions = context.instance.get_materialized_partitions(cleaned_deals.key) ic(partitions) logger.info("Works", partitions=partitions) - dfs = list(load_partitions(context, cleaned_deals.key, partitions)) - ic(len(dfs)) - if dfs: - columns = ["artist", "title", "release"] - yield dg.Output(pl.concat(dfs, how="vertical_relaxed").select(columns).unique()) + path = os.path.join( + context.resources.polars_parquet_io_manager.base_dir, *cleaned_deals.key.path + ) + ic(path) + yield dg.Output( + pl.scan_parquet(path).select(["artist", "title", "release"]).unique().collect() + ) @asset(