diff --git a/apps/vinyl/src/assets.py b/apps/vinyl/src/assets.py index d4093cc..5430f30 100644 --- a/apps/vinyl/src/assets.py +++ b/apps/vinyl/src/assets.py @@ -8,7 +8,7 @@ from dagster_polars.patito import patito_model_to_dagster_type from models import Deal from plato.fetch import scrape_plato from plato.parse import parse as parse_plato -from shared.utils import get_partition_keys, load_partitions +from shared.utils import get_partition_keys, load_partitions, parse_partition_keys from sounds.fetch import fetch_deals from sounds.parse import parse as parse_sounds @@ -171,6 +171,7 @@ def works(context: dg.AssetExecutionContext) -> pl.DataFrame | None: }, output_required=False, dagster_type=patito_model_to_dagster_type(Deal), + automation_condition=dg.AutomationCondition.eager(), ) def new_deals( context: dg.AssetExecutionContext, partitions: dict[str, pl.LazyFrame | None] @@ -178,6 +179,8 @@ def new_deals( """Fetch new deals from all sources.""" ic() partition_keys = get_partition_keys(context) + parsed_partition_keys = parse_partition_keys(context, "partitions") + ic(partition_keys) if len(partition_keys := sorted(partitions.keys())) < 2: @@ -185,7 +188,42 @@ def new_deals( return - yield dg.Output(Deal.DataFrame(partitions[partition_keys[-1]].limit(5).collect())) + before, after = partition_keys[-2:] + before_str, after_str = [ + parsed_partition_keys[partition_key]["date"] + for partition_key in (before, after) + ] + df_before = partitions[before] + df_after = partitions[after] + + num_rows_before, num_rows_after = [ + df.select(pl.len()).collect().item() for df in (df_before, df_after) + ] + + context.log.info( + f"Fetching new deals between {before_str} ({num_rows_before}) and {after_str} ({num_rows_after})" + ) + + new_df = df_after.join(df_before.select("id"), on="id", how="anti").collect() + if not new_df.height: + yield dg.Output(Deal.DataFrame(new_df)) -# def good_deals(): ... +@dg.asset( + io_manager_key="polars_parquet_io_manager", + partitions_def=multi_partitions_def, + ins={"df": dg.AssetIn(key=new_deals.key)}, + output_required=False, + automation_condition=dg.AutomationCondition.eager(), +) +def good_deals( + context: dg.AssetExecutionContext, df: pl.LazyFrame +) -> Iterator[dg.Output[Deal.DataFrame]]: + filtered_df = df.filter(pl.col("price") <= 25).collect() + num_rows = filtered_df.height + if not num_rows: + context.log.info("No good deals found!") + return + + context.log.info(f"Good deals found ({num_rows}x)!") + yield dg.Output(Deal.DataFrame(filtered_df)) diff --git a/apps/vinyl/src/test.py b/apps/vinyl/src/test.py index 09c2c24..54aeade 100644 --- a/apps/vinyl/src/test.py +++ b/apps/vinyl/src/test.py @@ -2,7 +2,7 @@ import logging from datetime import datetime from typing import Any -from assets import cleaned_deals, deals, new_deals, works +from assets import cleaned_deals, deals, good_deals, new_deals, works from dagster_polars import PolarsParquetIOManager from definitions import definitions from jobs import check_partitions_job @@ -34,7 +34,7 @@ def test_deals(resources: dict[str, Any], source="sounds", date: str = None): if __name__ == "__main__": - run = 5 + run = 6 resources = { "polars_parquet_io_manager": PolarsParquetIOManager( base_dir="/opt/dagster/storage/vinyl" @@ -68,5 +68,12 @@ if __name__ == "__main__": partition_key=f"{today_str()}|{source}", resources=resources, ) + case 6: + dg.materialize( + assets=definitions.assets, + selection=[good_deals.key], + partition_key=f"{today_str()}|{source}", + resources=resources, + ) case _: raise ValueError(f"Invalid run number: {run}!") diff --git a/shared/src/shared/utils.py b/shared/src/shared/utils.py index 158548c..34539d8 100644 --- a/shared/src/shared/utils.py +++ b/shared/src/shared/utils.py @@ -7,7 +7,7 @@ import dagster as dg def get_dimension_names( - context: dg.OpExecutionContext, input_name: str = "partitions" + context: dg.AssetExecutionContext, input_name: str = "partitions" ) -> list[str]: """ Extract dimension names for an input. @@ -66,7 +66,7 @@ def get_partition_keys(context: dg.AssetExecutionContext) -> Mapping[str, str]: def parse_partition_keys( - context: dg.OpExecutionContext, input_name: str = "partitions" + context: dg.AssetExecutionContext, input_name: str = "partitions" ) -> dict[str, dict[str, str]]: """ Parse partition keys for a given input.