feat: add elo tables for house ranking

This commit is contained in:
Stijnvandenbroek
2026-03-06 10:49:24 +00:00
parent ccfd453dfe
commit 732e50924e
8 changed files with 151 additions and 0 deletions

View File

@@ -0,0 +1,5 @@
"""ELO ranking assets."""
from data_platform.assets.elo.elo import elo_comparisons, elo_ratings
__all__ = ["elo_comparisons", "elo_ratings"]

View File

@@ -0,0 +1,77 @@
"""ELO schema and table management assets.
Individual assets for the stateful ELO tables (ratings, comparisons) used
by the house-elo-ranking application. The sample_listings table is
managed as a dbt model in marts.
"""
from pathlib import Path
from dagster import AssetExecutionContext, MaterializeResult, MetadataValue, asset
from sqlalchemy import text
from data_platform.helpers import render_sql
from data_platform.resources import PostgresResource
_SQL_DIR = Path(__file__).parent / "sql"
_SCHEMA = "elo"
def _ensure_schema(conn: object) -> None:
"""Create the ELO schema if it does not exist."""
schema_sql = render_sql(_SQL_DIR, "create_schema.sql", schema=_SCHEMA)
conn.execute(text(schema_sql))
@asset(
deps=["elo_sample_listings"],
group_name="elo",
description="Creates the ELO ratings table that stores per-listing ELO scores.",
)
def elo_ratings(
context: AssetExecutionContext,
postgres: PostgresResource,
) -> MaterializeResult:
"""Ensure the ELO ratings table exists (idempotent)."""
table_sql = render_sql(_SQL_DIR, "create_ratings_table.sql", schema=_SCHEMA)
engine = postgres.get_engine()
with engine.begin() as conn:
_ensure_schema(conn)
conn.execute(text(table_sql))
context.log.info("ELO ratings table ensured.")
return MaterializeResult(
metadata={
"schema": MetadataValue.text(_SCHEMA),
"table": MetadataValue.text("ratings"),
}
)
@asset(
deps=["elo_sample_listings"],
group_name="elo",
description="Creates the ELO comparisons table that records pairwise match results.",
)
def elo_comparisons(
context: AssetExecutionContext,
postgres: PostgresResource,
) -> MaterializeResult:
"""Ensure the ELO comparisons table exists (idempotent)."""
table_sql = render_sql(_SQL_DIR, "create_comparisons_table.sql", schema=_SCHEMA)
engine = postgres.get_engine()
with engine.begin() as conn:
_ensure_schema(conn)
conn.execute(text(table_sql))
context.log.info("ELO comparisons table ensured.")
return MaterializeResult(
metadata={
"schema": MetadataValue.text(_SCHEMA),
"table": MetadataValue.text("comparisons"),
}
)

View File

@@ -0,0 +1,11 @@
create table if not exists {{ schema }}.comparisons (
id serial primary key,
listing_a_id text not null,
listing_b_id text not null,
winner_id text not null,
elo_a_before double precision not null,
elo_b_before double precision not null,
elo_a_after double precision not null,
elo_b_after double precision not null,
created_at timestamptz default now()
);

View File

@@ -0,0 +1,9 @@
create table if not exists {{ schema }}.ratings (
global_id text primary key,
elo_rating double precision not null default 1500.0,
comparison_count integer not null default 0,
wins integer not null default 0,
losses integer not null default 0,
created_at timestamptz default now(),
updated_at timestamptz default now()
);

View File

@@ -0,0 +1 @@
create schema if not exists {{ schema }};

View File

@@ -6,6 +6,7 @@ from dagster import (
from dagster_dbt import DbtCliResource
from data_platform.assets.dbt import DBT_PROJECT_DIR, dbt_project_assets
from data_platform.assets.elo import elo_comparisons, elo_ratings
from data_platform.assets.ingestion.funda import (
raw_funda_listing_details,
raw_funda_price_history,
@@ -31,6 +32,8 @@ defs = Definitions(
raw_funda_search_results,
raw_funda_listing_details,
raw_funda_price_history,
elo_ratings,
elo_comparisons,
]
),
jobs=[funda_ingestion_job, funda_raw_quality_job, elementary_refresh_job],