diff --git a/apps/other/src/assets.py b/apps/other/src/assets.py index 3b2ac53..8be3d3f 100644 --- a/apps/other/src/assets.py +++ b/apps/other/src/assets.py @@ -30,8 +30,19 @@ partitions_def_multi = MultiPartitionsDefinition( }, ) def asset_single_1(context): + """A dummy asset that is partitioned by date only.""" ic() ic(context.partition_key) + + import sys + + sys.__stdout__.write("This goes to stdout!\n") + sys.__stderr__.write("This goes to stderr!\n") + + from logging import getLogger + + getLogger("mylogger").info("This is an info message from mylogger") + return pl.DataFrame( [{"date": context.partition_key, "data": f"Data for {context.partition_key}"}] ) @@ -45,6 +56,7 @@ def asset_single_1(context): }, ) def asset_multi_1(context): + """A dummy asset that is partitioned by date and source.""" ic() ic(context.partition_key) return pl.DataFrame( @@ -61,9 +73,16 @@ def asset_multi_1(context): }, ) def asset_single_2(context, asset_single_1): + """A dummy asset that combines the last two date partitions.""" ic() ic(context.partition_key) + ic(context.partition_key.keys_by_dimension) + ic(type(asset_single_1)) + ic(type(list(asset_single_1.values())[0])) + ic(asset_single_1) + ic(asset_single_1.keys()) + partition_key = context.asset_partition_key_for_output() return f"Processed data for {partition_key}" @@ -73,7 +92,9 @@ partition_mapping = MultiPartitionMapping( { "date": DimensionPartitionMapping( dimension_name="date", - partition_mapping=TimeWindowPartitionMapping(start_offset=-1, end_offset=0), + partition_mapping=TimeWindowPartitionMapping( + start_offset=-10, end_offset=0 + ), ), "source": DimensionPartitionMapping( dimension_name="source", @@ -92,8 +113,63 @@ def asset_multi_2(context, asset_multi_1): ic() ic(context.partition_key) ic(context.partition_key.keys_by_dimension) + + ic(type(list(asset_multi_1.values())[0])) ic(asset_multi_1) partition_key = context.asset_partition_key_for_output() ic(partition_key) return f"Processed data for {partition_key}" + + +@asset( + partitions_def=partitions_def_multi, + # ins={"asset_multi_1": AssetIn(partition_mapping=partition_mapping)}, +) +def asset_multi_3(context, asset_multi_1): + # Towards some delta mechanism between two partitions + ic() + ic(context.partition_key) + ic(context.partition_key.keys_by_dimension) + ic(asset_multi_1) + target_date = context.partition_key.keys_by_dimension["date"] + ic(target_date) + + storage_dir = context.resources.polars_parquet_io_manager.base_dir + ic(storage_dir) + asset_key = "deals" + + df = ( + pl.scan_parquet( + f"{storage_dir}/{asset_key}/*/*.parquet", extra_columns="ignore" + ) + .select(["date"]) # include any other needed columns + .filter(pl.col("date") <= target_date) + .limit(2) + ) + + # Collect only the available dates + available_dates = ( + df.select(pl.col("date").unique()) + .collect() + .get_column("date") + .sort(descending=True) + .to_list() + ) + ic(available_dates) + + # Find latest before + the target + if target_date not in available_dates: + raise ValueError(f"{target_date} not in available data.") + + idx = available_dates.index(target_date) + if idx == 0: + raise ValueError(f"No previous date available before {target_date}") + + keep_dates = [available_dates[idx - 1], available_dates[idx]] + ic(keep_dates) + + # Re-scan with final filter (re-use lazy frame) + return pl.scan_parquet( + f"{storage_dir}/{asset_key}/*/*.parquet", extra_columns="ignore" + ).filter(pl.col("date").is_in(keep_dates))