test write to stdout, multiple partitions
This commit is contained in:
@@ -30,8 +30,19 @@ partitions_def_multi = MultiPartitionsDefinition(
|
|||||||
},
|
},
|
||||||
)
|
)
|
||||||
def asset_single_1(context):
|
def asset_single_1(context):
|
||||||
|
"""A dummy asset that is partitioned by date only."""
|
||||||
ic()
|
ic()
|
||||||
ic(context.partition_key)
|
ic(context.partition_key)
|
||||||
|
|
||||||
|
import sys
|
||||||
|
|
||||||
|
sys.__stdout__.write("This goes to stdout!\n")
|
||||||
|
sys.__stderr__.write("This goes to stderr!\n")
|
||||||
|
|
||||||
|
from logging import getLogger
|
||||||
|
|
||||||
|
getLogger("mylogger").info("This is an info message from mylogger")
|
||||||
|
|
||||||
return pl.DataFrame(
|
return pl.DataFrame(
|
||||||
[{"date": context.partition_key, "data": f"Data for {context.partition_key}"}]
|
[{"date": context.partition_key, "data": f"Data for {context.partition_key}"}]
|
||||||
)
|
)
|
||||||
@@ -45,6 +56,7 @@ def asset_single_1(context):
|
|||||||
},
|
},
|
||||||
)
|
)
|
||||||
def asset_multi_1(context):
|
def asset_multi_1(context):
|
||||||
|
"""A dummy asset that is partitioned by date and source."""
|
||||||
ic()
|
ic()
|
||||||
ic(context.partition_key)
|
ic(context.partition_key)
|
||||||
return pl.DataFrame(
|
return pl.DataFrame(
|
||||||
@@ -61,9 +73,16 @@ def asset_multi_1(context):
|
|||||||
},
|
},
|
||||||
)
|
)
|
||||||
def asset_single_2(context, asset_single_1):
|
def asset_single_2(context, asset_single_1):
|
||||||
|
"""A dummy asset that combines the last two date partitions."""
|
||||||
ic()
|
ic()
|
||||||
ic(context.partition_key)
|
ic(context.partition_key)
|
||||||
|
ic(context.partition_key.keys_by_dimension)
|
||||||
|
ic(type(asset_single_1))
|
||||||
|
ic(type(list(asset_single_1.values())[0]))
|
||||||
|
ic(asset_single_1)
|
||||||
|
|
||||||
ic(asset_single_1.keys())
|
ic(asset_single_1.keys())
|
||||||
|
|
||||||
partition_key = context.asset_partition_key_for_output()
|
partition_key = context.asset_partition_key_for_output()
|
||||||
return f"Processed data for {partition_key}"
|
return f"Processed data for {partition_key}"
|
||||||
|
|
||||||
@@ -73,7 +92,9 @@ partition_mapping = MultiPartitionMapping(
|
|||||||
{
|
{
|
||||||
"date": DimensionPartitionMapping(
|
"date": DimensionPartitionMapping(
|
||||||
dimension_name="date",
|
dimension_name="date",
|
||||||
partition_mapping=TimeWindowPartitionMapping(start_offset=-1, end_offset=0),
|
partition_mapping=TimeWindowPartitionMapping(
|
||||||
|
start_offset=-10, end_offset=0
|
||||||
|
),
|
||||||
),
|
),
|
||||||
"source": DimensionPartitionMapping(
|
"source": DimensionPartitionMapping(
|
||||||
dimension_name="source",
|
dimension_name="source",
|
||||||
@@ -92,8 +113,63 @@ def asset_multi_2(context, asset_multi_1):
|
|||||||
ic()
|
ic()
|
||||||
ic(context.partition_key)
|
ic(context.partition_key)
|
||||||
ic(context.partition_key.keys_by_dimension)
|
ic(context.partition_key.keys_by_dimension)
|
||||||
|
|
||||||
|
ic(type(list(asset_multi_1.values())[0]))
|
||||||
ic(asset_multi_1)
|
ic(asset_multi_1)
|
||||||
|
|
||||||
partition_key = context.asset_partition_key_for_output()
|
partition_key = context.asset_partition_key_for_output()
|
||||||
ic(partition_key)
|
ic(partition_key)
|
||||||
return f"Processed data for {partition_key}"
|
return f"Processed data for {partition_key}"
|
||||||
|
|
||||||
|
|
||||||
|
@asset(
|
||||||
|
partitions_def=partitions_def_multi,
|
||||||
|
# ins={"asset_multi_1": AssetIn(partition_mapping=partition_mapping)},
|
||||||
|
)
|
||||||
|
def asset_multi_3(context, asset_multi_1):
|
||||||
|
# Towards some delta mechanism between two partitions
|
||||||
|
ic()
|
||||||
|
ic(context.partition_key)
|
||||||
|
ic(context.partition_key.keys_by_dimension)
|
||||||
|
ic(asset_multi_1)
|
||||||
|
target_date = context.partition_key.keys_by_dimension["date"]
|
||||||
|
ic(target_date)
|
||||||
|
|
||||||
|
storage_dir = context.resources.polars_parquet_io_manager.base_dir
|
||||||
|
ic(storage_dir)
|
||||||
|
asset_key = "deals"
|
||||||
|
|
||||||
|
df = (
|
||||||
|
pl.scan_parquet(
|
||||||
|
f"{storage_dir}/{asset_key}/*/*.parquet", extra_columns="ignore"
|
||||||
|
)
|
||||||
|
.select(["date"]) # include any other needed columns
|
||||||
|
.filter(pl.col("date") <= target_date)
|
||||||
|
.limit(2)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Collect only the available dates
|
||||||
|
available_dates = (
|
||||||
|
df.select(pl.col("date").unique())
|
||||||
|
.collect()
|
||||||
|
.get_column("date")
|
||||||
|
.sort(descending=True)
|
||||||
|
.to_list()
|
||||||
|
)
|
||||||
|
ic(available_dates)
|
||||||
|
|
||||||
|
# Find latest before + the target
|
||||||
|
if target_date not in available_dates:
|
||||||
|
raise ValueError(f"{target_date} not in available data.")
|
||||||
|
|
||||||
|
idx = available_dates.index(target_date)
|
||||||
|
if idx == 0:
|
||||||
|
raise ValueError(f"No previous date available before {target_date}")
|
||||||
|
|
||||||
|
keep_dates = [available_dates[idx - 1], available_dates[idx]]
|
||||||
|
ic(keep_dates)
|
||||||
|
|
||||||
|
# Re-scan with final filter (re-use lazy frame)
|
||||||
|
return pl.scan_parquet(
|
||||||
|
f"{storage_dir}/{asset_key}/*/*.parquet", extra_columns="ignore"
|
||||||
|
).filter(pl.col("date").is_in(keep_dates))
|
||||||
|
|||||||
Reference in New Issue
Block a user