remove test code
This commit is contained in:
@@ -1,184 +1,20 @@
|
||||
import polars as pl
|
||||
import sys
|
||||
from functools import partial
|
||||
from logging import getLogger
|
||||
|
||||
from config import APP
|
||||
|
||||
import dagster as dg
|
||||
from dagster import (
|
||||
AssetIn,
|
||||
DailyPartitionsDefinition,
|
||||
DimensionPartitionMapping,
|
||||
IdentityPartitionMapping,
|
||||
MultiPartitionMapping,
|
||||
MultiPartitionsDefinition,
|
||||
StaticPartitionsDefinition,
|
||||
TimeWindowPartitionMapping,
|
||||
asset,
|
||||
)
|
||||
|
||||
partitions_def_single = DailyPartitionsDefinition(start_date="2024-09-20")
|
||||
|
||||
partitions_def_multi = MultiPartitionsDefinition(
|
||||
{
|
||||
"date": DailyPartitionsDefinition(start_date="2024-09-20"),
|
||||
"source": StaticPartitionsDefinition(["plato", "sounds"]),
|
||||
}
|
||||
)
|
||||
asset = partial(dg.asset, key_prefix=APP)
|
||||
|
||||
|
||||
@asset(
|
||||
io_manager_key="polars_parquet_io_manager",
|
||||
partitions_def=partitions_def_single,
|
||||
metadata={
|
||||
"partition_by": ["date"],
|
||||
},
|
||||
)
|
||||
def asset_single_1(context):
|
||||
"""A dummy asset that is partitioned by date only."""
|
||||
@asset()
|
||||
def logging(context):
|
||||
ic()
|
||||
ic(context.partition_key)
|
||||
|
||||
import sys
|
||||
|
||||
sys.__stdout__.write("This goes to stdout!\n")
|
||||
sys.__stderr__.write("This goes to stderr!\n")
|
||||
|
||||
from logging import getLogger
|
||||
|
||||
getLogger("mylogger").info("This is an info message from mylogger")
|
||||
|
||||
return pl.DataFrame(
|
||||
[{"date": context.partition_key, "data": f"Data for {context.partition_key}"}]
|
||||
)
|
||||
|
||||
|
||||
@asset(
|
||||
io_manager_key="polars_parquet_io_manager",
|
||||
partitions_def=partitions_def_multi,
|
||||
metadata={
|
||||
"partition_by": ["date", "source"],
|
||||
},
|
||||
)
|
||||
def asset_multi_1(context):
|
||||
"""A dummy asset that is partitioned by date and source."""
|
||||
ic()
|
||||
ic(context.partition_key)
|
||||
return pl.DataFrame(
|
||||
[{"date": context.partition_key, "data": f"Data for {context.partition_key}"}]
|
||||
)
|
||||
|
||||
|
||||
@asset(
|
||||
partitions_def=partitions_def_single,
|
||||
ins={
|
||||
"asset_single_1": AssetIn(
|
||||
partition_mapping=TimeWindowPartitionMapping(start_offset=-1, end_offset=0)
|
||||
)
|
||||
},
|
||||
)
|
||||
def asset_single_2(context, asset_single_1):
|
||||
"""A dummy asset that combines the last two date partitions."""
|
||||
ic()
|
||||
ic(context.partition_key)
|
||||
ic(context.partition_key.keys_by_dimension)
|
||||
ic(type(asset_single_1))
|
||||
ic(type(list(asset_single_1.values())[0]))
|
||||
ic(asset_single_1)
|
||||
|
||||
ic(asset_single_1.keys())
|
||||
|
||||
partition_key = context.asset_partition_key_for_output()
|
||||
return f"Processed data for {partition_key}"
|
||||
|
||||
|
||||
# Special partition mapping that should give the last two partitions for "date" and all partitions for "source"
|
||||
partition_mapping = MultiPartitionMapping(
|
||||
{
|
||||
"date": DimensionPartitionMapping(
|
||||
dimension_name="date",
|
||||
partition_mapping=TimeWindowPartitionMapping(
|
||||
start_offset=-10, end_offset=0
|
||||
),
|
||||
),
|
||||
"source": DimensionPartitionMapping(
|
||||
dimension_name="source",
|
||||
partition_mapping=IdentityPartitionMapping(),
|
||||
),
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
@asset(
|
||||
partitions_def=partitions_def_multi,
|
||||
ins={"asset_multi_1": AssetIn(partition_mapping=partition_mapping)},
|
||||
)
|
||||
def asset_multi_2(context, asset_multi_1):
|
||||
# Towards some delta mechanism between two partitions
|
||||
ic()
|
||||
ic(context.partition_key)
|
||||
ic(context.partition_key.keys_by_dimension)
|
||||
|
||||
ic(type(list(asset_multi_1.values())[0]))
|
||||
ic(asset_multi_1)
|
||||
|
||||
partition_key = context.asset_partition_key_for_output()
|
||||
ic(partition_key)
|
||||
return f"Processed data for {partition_key}"
|
||||
|
||||
|
||||
@asset(
|
||||
partitions_def=partitions_def_multi,
|
||||
# ins={"asset_multi_1": AssetIn(partition_mapping=partition_mapping)},
|
||||
)
|
||||
def asset_multi_3(context, asset_multi_1):
|
||||
# Towards some delta mechanism between two partitions
|
||||
ic()
|
||||
ic(context.partition_key)
|
||||
ic(context.partition_key.keys_by_dimension)
|
||||
ic(asset_multi_1)
|
||||
target_date = context.partition_key.keys_by_dimension["date"]
|
||||
ic(target_date)
|
||||
|
||||
storage_dir = context.resources.polars_parquet_io_manager.base_dir
|
||||
ic(storage_dir)
|
||||
asset_key = "deals"
|
||||
|
||||
df = (
|
||||
pl.scan_parquet(
|
||||
f"{storage_dir}/{asset_key}/*/*.parquet", extra_columns="ignore"
|
||||
)
|
||||
.select(["date"]) # include any other needed columns
|
||||
.filter(pl.col("date") <= target_date)
|
||||
.limit(2)
|
||||
)
|
||||
|
||||
# Collect only the available dates
|
||||
available_dates = (
|
||||
df.select(pl.col("date").unique())
|
||||
.collect()
|
||||
.get_column("date")
|
||||
.sort(descending=True)
|
||||
.to_list()
|
||||
)
|
||||
ic(available_dates)
|
||||
|
||||
# Find latest before + the target
|
||||
if target_date not in available_dates:
|
||||
raise ValueError(f"{target_date} not in available data.")
|
||||
|
||||
idx = available_dates.index(target_date)
|
||||
if idx == 0:
|
||||
raise ValueError(f"No previous date available before {target_date}")
|
||||
|
||||
keep_dates = [available_dates[idx - 1], available_dates[idx]]
|
||||
ic(keep_dates)
|
||||
|
||||
# Re-scan with final filter (re-use lazy frame)
|
||||
return pl.scan_parquet(
|
||||
f"{storage_dir}/{asset_key}/*/*.parquet", extra_columns="ignore"
|
||||
).filter(pl.col("date").is_in(keep_dates))
|
||||
|
||||
|
||||
@asset(io_manager_key="polars_parquet_io_manager", metadata={"partition_by": ["b"]})
|
||||
def dummy_asset():
|
||||
yield dg.Output(
|
||||
pl.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6], "c": [7, 8, 9]}),
|
||||
metadata={"partition_by": ["c"]},
|
||||
)
|
||||
|
||||
4
apps/other/src/config.py
Normal file
4
apps/other/src/config.py
Normal file
@@ -0,0 +1,4 @@
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
APP = os.environ.get("APP", Path(__file__).parent.parent.name)
|
||||
@@ -6,12 +6,6 @@ import dagster as dg
|
||||
|
||||
install()
|
||||
|
||||
# Define a job that includes both assets
|
||||
daily_job = dg.define_asset_job(
|
||||
"daily_job",
|
||||
selection=[assets.asset_multi_1, assets.asset_multi_2],
|
||||
)
|
||||
|
||||
definitions = dg.Definitions(
|
||||
assets=[
|
||||
asset.with_attributes(
|
||||
@@ -23,5 +17,4 @@ definitions = dg.Definitions(
|
||||
resources={
|
||||
"polars_parquet_io_manager": PolarsParquetIOManager(base_dir="/storage")
|
||||
},
|
||||
jobs=[daily_job],
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user