diff --git a/apps/other/src/assets.py b/apps/other/src/assets.py index d6e36be..0cc93f6 100644 --- a/apps/other/src/assets.py +++ b/apps/other/src/assets.py @@ -1,184 +1,20 @@ -import polars as pl +import sys +from functools import partial +from logging import getLogger + +from config import APP import dagster as dg -from dagster import ( - AssetIn, - DailyPartitionsDefinition, - DimensionPartitionMapping, - IdentityPartitionMapping, - MultiPartitionMapping, - MultiPartitionsDefinition, - StaticPartitionsDefinition, - TimeWindowPartitionMapping, - asset, -) -partitions_def_single = DailyPartitionsDefinition(start_date="2024-09-20") - -partitions_def_multi = MultiPartitionsDefinition( - { - "date": DailyPartitionsDefinition(start_date="2024-09-20"), - "source": StaticPartitionsDefinition(["plato", "sounds"]), - } -) +asset = partial(dg.asset, key_prefix=APP) -@asset( - io_manager_key="polars_parquet_io_manager", - partitions_def=partitions_def_single, - metadata={ - "partition_by": ["date"], - }, -) -def asset_single_1(context): - """A dummy asset that is partitioned by date only.""" +@asset() +def logging(context): ic() ic(context.partition_key) - import sys - sys.__stdout__.write("This goes to stdout!\n") sys.__stderr__.write("This goes to stderr!\n") - from logging import getLogger - getLogger("mylogger").info("This is an info message from mylogger") - - return pl.DataFrame( - [{"date": context.partition_key, "data": f"Data for {context.partition_key}"}] - ) - - -@asset( - io_manager_key="polars_parquet_io_manager", - partitions_def=partitions_def_multi, - metadata={ - "partition_by": ["date", "source"], - }, -) -def asset_multi_1(context): - """A dummy asset that is partitioned by date and source.""" - ic() - ic(context.partition_key) - return pl.DataFrame( - [{"date": context.partition_key, "data": f"Data for {context.partition_key}"}] - ) - - -@asset( - partitions_def=partitions_def_single, - ins={ - "asset_single_1": AssetIn( - partition_mapping=TimeWindowPartitionMapping(start_offset=-1, end_offset=0) - ) - }, -) -def asset_single_2(context, asset_single_1): - """A dummy asset that combines the last two date partitions.""" - ic() - ic(context.partition_key) - ic(context.partition_key.keys_by_dimension) - ic(type(asset_single_1)) - ic(type(list(asset_single_1.values())[0])) - ic(asset_single_1) - - ic(asset_single_1.keys()) - - partition_key = context.asset_partition_key_for_output() - return f"Processed data for {partition_key}" - - -# Special partition mapping that should give the last two partitions for "date" and all partitions for "source" -partition_mapping = MultiPartitionMapping( - { - "date": DimensionPartitionMapping( - dimension_name="date", - partition_mapping=TimeWindowPartitionMapping( - start_offset=-10, end_offset=0 - ), - ), - "source": DimensionPartitionMapping( - dimension_name="source", - partition_mapping=IdentityPartitionMapping(), - ), - } -) - - -@asset( - partitions_def=partitions_def_multi, - ins={"asset_multi_1": AssetIn(partition_mapping=partition_mapping)}, -) -def asset_multi_2(context, asset_multi_1): - # Towards some delta mechanism between two partitions - ic() - ic(context.partition_key) - ic(context.partition_key.keys_by_dimension) - - ic(type(list(asset_multi_1.values())[0])) - ic(asset_multi_1) - - partition_key = context.asset_partition_key_for_output() - ic(partition_key) - return f"Processed data for {partition_key}" - - -@asset( - partitions_def=partitions_def_multi, - # ins={"asset_multi_1": AssetIn(partition_mapping=partition_mapping)}, -) -def asset_multi_3(context, asset_multi_1): - # Towards some delta mechanism between two partitions - ic() - ic(context.partition_key) - ic(context.partition_key.keys_by_dimension) - ic(asset_multi_1) - target_date = context.partition_key.keys_by_dimension["date"] - ic(target_date) - - storage_dir = context.resources.polars_parquet_io_manager.base_dir - ic(storage_dir) - asset_key = "deals" - - df = ( - pl.scan_parquet( - f"{storage_dir}/{asset_key}/*/*.parquet", extra_columns="ignore" - ) - .select(["date"]) # include any other needed columns - .filter(pl.col("date") <= target_date) - .limit(2) - ) - - # Collect only the available dates - available_dates = ( - df.select(pl.col("date").unique()) - .collect() - .get_column("date") - .sort(descending=True) - .to_list() - ) - ic(available_dates) - - # Find latest before + the target - if target_date not in available_dates: - raise ValueError(f"{target_date} not in available data.") - - idx = available_dates.index(target_date) - if idx == 0: - raise ValueError(f"No previous date available before {target_date}") - - keep_dates = [available_dates[idx - 1], available_dates[idx]] - ic(keep_dates) - - # Re-scan with final filter (re-use lazy frame) - return pl.scan_parquet( - f"{storage_dir}/{asset_key}/*/*.parquet", extra_columns="ignore" - ).filter(pl.col("date").is_in(keep_dates)) - - -@asset(io_manager_key="polars_parquet_io_manager", metadata={"partition_by": ["b"]}) -def dummy_asset(): - yield dg.Output( - pl.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6], "c": [7, 8, 9]}), - metadata={"partition_by": ["c"]}, - ) diff --git a/apps/other/src/config.py b/apps/other/src/config.py new file mode 100644 index 0000000..fefeadf --- /dev/null +++ b/apps/other/src/config.py @@ -0,0 +1,4 @@ +import os +from pathlib import Path + +APP = os.environ.get("APP", Path(__file__).parent.parent.name) diff --git a/apps/other/src/definitions.py b/apps/other/src/definitions.py index f7b647c..003eb58 100644 --- a/apps/other/src/definitions.py +++ b/apps/other/src/definitions.py @@ -6,12 +6,6 @@ import dagster as dg install() -# Define a job that includes both assets -daily_job = dg.define_asset_job( - "daily_job", - selection=[assets.asset_multi_1, assets.asset_multi_2], -) - definitions = dg.Definitions( assets=[ asset.with_attributes( @@ -23,5 +17,4 @@ definitions = dg.Definitions( resources={ "polars_parquet_io_manager": PolarsParquetIOManager(base_dir="/storage") }, - jobs=[daily_job], )