vinyl repo is not yet using multi partition mapping
This commit is contained in:
@@ -11,15 +11,11 @@ from utils import parse_date
|
|||||||
|
|
||||||
from dagster import (
|
from dagster import (
|
||||||
DailyPartitionsDefinition,
|
DailyPartitionsDefinition,
|
||||||
DimensionPartitionMapping,
|
|
||||||
Failure,
|
Failure,
|
||||||
Field,
|
Field,
|
||||||
IdentityPartitionMapping,
|
|
||||||
MultiPartitionMapping,
|
|
||||||
MultiPartitionsDefinition,
|
MultiPartitionsDefinition,
|
||||||
OpExecutionContext,
|
OpExecutionContext,
|
||||||
StaticPartitionsDefinition,
|
StaticPartitionsDefinition,
|
||||||
TimeWindowPartitionMapping,
|
|
||||||
asset,
|
asset,
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -34,19 +30,6 @@ partitions_def = MultiPartitionsDefinition(
|
|||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
partition_mapping = MultiPartitionMapping(
|
|
||||||
{
|
|
||||||
"date": DimensionPartitionMapping(
|
|
||||||
dimension_name="date",
|
|
||||||
partition_mapping=TimeWindowPartitionMapping(start_offset=-1, end_offset=0),
|
|
||||||
),
|
|
||||||
"source": DimensionPartitionMapping(
|
|
||||||
dimension_name="source",
|
|
||||||
partition_mapping=IdentityPartitionMapping(),
|
|
||||||
),
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
@asset(
|
@asset(
|
||||||
io_manager_key="polars_parquet_io_manager",
|
io_manager_key="polars_parquet_io_manager",
|
||||||
@@ -114,6 +97,8 @@ def new_deals(context: OpExecutionContext) -> pl.DataFrame:
|
|||||||
storage_dir = context.resources.polars_parquet_io_manager.base_dir
|
storage_dir = context.resources.polars_parquet_io_manager.base_dir
|
||||||
asset_key = "deals"
|
asset_key = "deals"
|
||||||
|
|
||||||
|
# TODO: can we directly query from the deals input?
|
||||||
|
|
||||||
with duckdb.connect() as con:
|
with duckdb.connect() as con:
|
||||||
con.create_function("PARSE_DATE", parse_date, [VARCHAR], DATE)
|
con.create_function("PARSE_DATE", parse_date, [VARCHAR], DATE)
|
||||||
return con.execute(
|
return con.execute(
|
||||||
|
|||||||
Reference in New Issue
Block a user