This commit is contained in:
2025-07-26 19:53:13 +02:00
parent 329a50a5d6
commit d55cb8fb17
2 changed files with 5 additions and 4 deletions

View File

@@ -205,8 +205,11 @@ def new_deals(
)
new_df = df_after.join(df_before.select("id"), on="id", how="anti").collect()
if not new_df.height:
if new_df.height:
context.log.info(f"New deals found ({new_df.height}x)!")
yield dg.Output(Deal.DataFrame(new_df))
else:
context.log.info("No new deals found!")
@dg.asset(

View File

@@ -9,8 +9,6 @@ from jobs import check_partitions_job
import dagster as dg
# warnings.filterwarnings("ignore", category=dg.Ex)
logging.getLogger().setLevel(logging.INFO)
@@ -40,7 +38,7 @@ if __name__ == "__main__":
base_dir="/opt/dagster/storage/vinyl"
)
}
source = "sounds" # or "plato"
source = "plato"
match run:
case 1: