find good deals

This commit is contained in:
2025-07-26 19:44:39 +02:00
parent 891680bb05
commit 329a50a5d6
3 changed files with 52 additions and 7 deletions

View File

@@ -8,7 +8,7 @@ from dagster_polars.patito import patito_model_to_dagster_type
from models import Deal
from plato.fetch import scrape_plato
from plato.parse import parse as parse_plato
from shared.utils import get_partition_keys, load_partitions
from shared.utils import get_partition_keys, load_partitions, parse_partition_keys
from sounds.fetch import fetch_deals
from sounds.parse import parse as parse_sounds
@@ -171,6 +171,7 @@ def works(context: dg.AssetExecutionContext) -> pl.DataFrame | None:
},
output_required=False,
dagster_type=patito_model_to_dagster_type(Deal),
automation_condition=dg.AutomationCondition.eager(),
)
def new_deals(
context: dg.AssetExecutionContext, partitions: dict[str, pl.LazyFrame | None]
@@ -178,6 +179,8 @@ def new_deals(
"""Fetch new deals from all sources."""
ic()
partition_keys = get_partition_keys(context)
parsed_partition_keys = parse_partition_keys(context, "partitions")
ic(partition_keys)
if len(partition_keys := sorted(partitions.keys())) < 2:
@@ -185,7 +188,42 @@ def new_deals(
return
yield dg.Output(Deal.DataFrame(partitions[partition_keys[-1]].limit(5).collect()))
before, after = partition_keys[-2:]
before_str, after_str = [
parsed_partition_keys[partition_key]["date"]
for partition_key in (before, after)
]
df_before = partitions[before]
df_after = partitions[after]
num_rows_before, num_rows_after = [
df.select(pl.len()).collect().item() for df in (df_before, df_after)
]
context.log.info(
f"Fetching new deals between {before_str} ({num_rows_before}) and {after_str} ({num_rows_after})"
)
new_df = df_after.join(df_before.select("id"), on="id", how="anti").collect()
if not new_df.height:
yield dg.Output(Deal.DataFrame(new_df))
# def good_deals(): ...
@dg.asset(
io_manager_key="polars_parquet_io_manager",
partitions_def=multi_partitions_def,
ins={"df": dg.AssetIn(key=new_deals.key)},
output_required=False,
automation_condition=dg.AutomationCondition.eager(),
)
def good_deals(
context: dg.AssetExecutionContext, df: pl.LazyFrame
) -> Iterator[dg.Output[Deal.DataFrame]]:
filtered_df = df.filter(pl.col("price") <= 25).collect()
num_rows = filtered_df.height
if not num_rows:
context.log.info("No good deals found!")
return
context.log.info(f"Good deals found ({num_rows}x)!")
yield dg.Output(Deal.DataFrame(filtered_df))

View File

@@ -2,7 +2,7 @@ import logging
from datetime import datetime
from typing import Any
from assets import cleaned_deals, deals, new_deals, works
from assets import cleaned_deals, deals, good_deals, new_deals, works
from dagster_polars import PolarsParquetIOManager
from definitions import definitions
from jobs import check_partitions_job
@@ -34,7 +34,7 @@ def test_deals(resources: dict[str, Any], source="sounds", date: str = None):
if __name__ == "__main__":
run = 5
run = 6
resources = {
"polars_parquet_io_manager": PolarsParquetIOManager(
base_dir="/opt/dagster/storage/vinyl"
@@ -68,5 +68,12 @@ if __name__ == "__main__":
partition_key=f"{today_str()}|{source}",
resources=resources,
)
case 6:
dg.materialize(
assets=definitions.assets,
selection=[good_deals.key],
partition_key=f"{today_str()}|{source}",
resources=resources,
)
case _:
raise ValueError(f"Invalid run number: {run}!")