This commit is contained in:
2025-07-27 15:23:35 +02:00
parent 05c2b33e25
commit 3aa6790f29
2 changed files with 19 additions and 16 deletions

View File

@@ -1,4 +1,3 @@
import os
from collections.abc import Iterator
from datetime import datetime
from glob import glob
@@ -8,6 +7,7 @@ import polars as pl
import structlog
from dagster_polars.patito import patito_model_to_dagster_type
from models import Deal
from partitions import multi_partitions_def
from plato.fetch import scrape_plato
from plato.parse import parse as parse_plato
from shared.utils import get_partition_keys, load_partitions, parse_partition_keys
@@ -17,23 +17,8 @@ from utils.email import EmailService
import dagster as dg
SOURCES = ["plato", "sounds"]
logger = structlog.get_logger()
daily_partitions_def = dg.DailyPartitionsDefinition(
start_date="2024-09-01", end_offset=1, timezone=os.environ.get("TZ", "UTC")
)
multi_partitions_def = dg.MultiPartitionsDefinition(
{
"date": daily_partitions_def,
"source": dg.StaticPartitionsDefinition(SOURCES),
}
)
partitions_mapping = dg.MultiToSingleDimensionPartitionMapping(
partition_dimension_name="date"
)
@dg.asset(
io_manager_key="polars_parquet_io_manager",
@@ -141,6 +126,7 @@ def works(context: dg.AssetExecutionContext) -> pl.DataFrame | None:
"""Aggregate works from cleaned deals."""
partitions = context.instance.get_materialized_partitions(cleaned_deals.key)
ic(partitions)
logger.info("Works", partitions=partitions)
dfs = list(load_partitions(context, cleaned_deals.key, partitions))
if dfs:
columns = ["artist", "title", "release"]

View File

@@ -0,0 +1,17 @@
import os
import dagster as dg
SOURCES = ["plato", "sounds"]
daily_partitions_def = dg.DailyPartitionsDefinition(
start_date="2024-09-01", end_offset=1, timezone=os.environ.get("TZ", "UTC")
)
multi_partitions_def = dg.MultiPartitionsDefinition(
{
"date": daily_partitions_def,
"source": dg.StaticPartitionsDefinition(SOURCES),
}
)
partitions_mapping = dg.MultiToSingleDimensionPartitionMapping(
partition_dimension_name="date"
)