From 3aa6790f29f2fd895181c9879e5eef8afef578be Mon Sep 17 00:00:00 2001 From: Rik Veenboer Date: Sun, 27 Jul 2025 15:23:35 +0200 Subject: [PATCH] tidy up --- apps/vinyl/src/assets.py | 18 ++---------------- apps/vinyl/src/partitions.py | 17 +++++++++++++++++ 2 files changed, 19 insertions(+), 16 deletions(-) create mode 100644 apps/vinyl/src/partitions.py diff --git a/apps/vinyl/src/assets.py b/apps/vinyl/src/assets.py index dbc2d0c..6be3dc5 100644 --- a/apps/vinyl/src/assets.py +++ b/apps/vinyl/src/assets.py @@ -1,4 +1,3 @@ -import os from collections.abc import Iterator from datetime import datetime from glob import glob @@ -8,6 +7,7 @@ import polars as pl import structlog from dagster_polars.patito import patito_model_to_dagster_type from models import Deal +from partitions import multi_partitions_def from plato.fetch import scrape_plato from plato.parse import parse as parse_plato from shared.utils import get_partition_keys, load_partitions, parse_partition_keys @@ -17,23 +17,8 @@ from utils.email import EmailService import dagster as dg -SOURCES = ["plato", "sounds"] - logger = structlog.get_logger() -daily_partitions_def = dg.DailyPartitionsDefinition( - start_date="2024-09-01", end_offset=1, timezone=os.environ.get("TZ", "UTC") -) -multi_partitions_def = dg.MultiPartitionsDefinition( - { - "date": daily_partitions_def, - "source": dg.StaticPartitionsDefinition(SOURCES), - } -) -partitions_mapping = dg.MultiToSingleDimensionPartitionMapping( - partition_dimension_name="date" -) - @dg.asset( io_manager_key="polars_parquet_io_manager", @@ -141,6 +126,7 @@ def works(context: dg.AssetExecutionContext) -> pl.DataFrame | None: """Aggregate works from cleaned deals.""" partitions = context.instance.get_materialized_partitions(cleaned_deals.key) ic(partitions) + logger.info("Works", partitions=partitions) dfs = list(load_partitions(context, cleaned_deals.key, partitions)) if dfs: columns = ["artist", "title", "release"] diff --git a/apps/vinyl/src/partitions.py b/apps/vinyl/src/partitions.py new file mode 100644 index 0000000..9d0e0c6 --- /dev/null +++ b/apps/vinyl/src/partitions.py @@ -0,0 +1,17 @@ +import os + +import dagster as dg + +SOURCES = ["plato", "sounds"] +daily_partitions_def = dg.DailyPartitionsDefinition( + start_date="2024-09-01", end_offset=1, timezone=os.environ.get("TZ", "UTC") +) +multi_partitions_def = dg.MultiPartitionsDefinition( + { + "date": daily_partitions_def, + "source": dg.StaticPartitionsDefinition(SOURCES), + } +) +partitions_mapping = dg.MultiToSingleDimensionPartitionMapping( + partition_dimension_name="date" +)