From 732e50924e845eeb23caa54dd59d36867a3c851c Mon Sep 17 00:00:00 2001 From: Stijnvandenbroek Date: Fri, 6 Mar 2026 10:49:24 +0000 Subject: [PATCH] feat: add elo tables for house ranking --- data_platform/assets/elo/__init__.py | 5 ++ data_platform/assets/elo/elo.py | 77 +++++++++++++++++++ .../elo/sql/create_comparisons_table.sql | 11 +++ .../assets/elo/sql/create_ratings_table.sql | 9 +++ .../assets/elo/sql/create_schema.sql | 1 + data_platform/definitions.py | 3 + dbt/models/marts/elo_sample_listings.sql | 14 ++++ dbt/models/marts/elo_sample_listings.yml | 31 ++++++++ 8 files changed, 151 insertions(+) create mode 100644 data_platform/assets/elo/__init__.py create mode 100644 data_platform/assets/elo/elo.py create mode 100644 data_platform/assets/elo/sql/create_comparisons_table.sql create mode 100644 data_platform/assets/elo/sql/create_ratings_table.sql create mode 100644 data_platform/assets/elo/sql/create_schema.sql create mode 100644 dbt/models/marts/elo_sample_listings.sql create mode 100644 dbt/models/marts/elo_sample_listings.yml diff --git a/data_platform/assets/elo/__init__.py b/data_platform/assets/elo/__init__.py new file mode 100644 index 0000000..94f6a6e --- /dev/null +++ b/data_platform/assets/elo/__init__.py @@ -0,0 +1,5 @@ +"""ELO ranking assets.""" + +from data_platform.assets.elo.elo import elo_comparisons, elo_ratings + +__all__ = ["elo_comparisons", "elo_ratings"] diff --git a/data_platform/assets/elo/elo.py b/data_platform/assets/elo/elo.py new file mode 100644 index 0000000..3c7ba5b --- /dev/null +++ b/data_platform/assets/elo/elo.py @@ -0,0 +1,77 @@ +"""ELO schema and table management assets. + +Individual assets for the stateful ELO tables (ratings, comparisons) used +by the house-elo-ranking application. The sample_listings table is +managed as a dbt model in marts. +""" + +from pathlib import Path + +from dagster import AssetExecutionContext, MaterializeResult, MetadataValue, asset +from sqlalchemy import text + +from data_platform.helpers import render_sql +from data_platform.resources import PostgresResource + +_SQL_DIR = Path(__file__).parent / "sql" +_SCHEMA = "elo" + + +def _ensure_schema(conn: object) -> None: + """Create the ELO schema if it does not exist.""" + schema_sql = render_sql(_SQL_DIR, "create_schema.sql", schema=_SCHEMA) + conn.execute(text(schema_sql)) + + +@asset( + deps=["elo_sample_listings"], + group_name="elo", + description="Creates the ELO ratings table that stores per-listing ELO scores.", +) +def elo_ratings( + context: AssetExecutionContext, + postgres: PostgresResource, +) -> MaterializeResult: + """Ensure the ELO ratings table exists (idempotent).""" + table_sql = render_sql(_SQL_DIR, "create_ratings_table.sql", schema=_SCHEMA) + + engine = postgres.get_engine() + with engine.begin() as conn: + _ensure_schema(conn) + conn.execute(text(table_sql)) + + context.log.info("ELO ratings table ensured.") + + return MaterializeResult( + metadata={ + "schema": MetadataValue.text(_SCHEMA), + "table": MetadataValue.text("ratings"), + } + ) + + +@asset( + deps=["elo_sample_listings"], + group_name="elo", + description="Creates the ELO comparisons table that records pairwise match results.", +) +def elo_comparisons( + context: AssetExecutionContext, + postgres: PostgresResource, +) -> MaterializeResult: + """Ensure the ELO comparisons table exists (idempotent).""" + table_sql = render_sql(_SQL_DIR, "create_comparisons_table.sql", schema=_SCHEMA) + + engine = postgres.get_engine() + with engine.begin() as conn: + _ensure_schema(conn) + conn.execute(text(table_sql)) + + context.log.info("ELO comparisons table ensured.") + + return MaterializeResult( + metadata={ + "schema": MetadataValue.text(_SCHEMA), + "table": MetadataValue.text("comparisons"), + } + ) diff --git a/data_platform/assets/elo/sql/create_comparisons_table.sql b/data_platform/assets/elo/sql/create_comparisons_table.sql new file mode 100644 index 0000000..f0f5cee --- /dev/null +++ b/data_platform/assets/elo/sql/create_comparisons_table.sql @@ -0,0 +1,11 @@ +create table if not exists {{ schema }}.comparisons ( + id serial primary key, + listing_a_id text not null, + listing_b_id text not null, + winner_id text not null, + elo_a_before double precision not null, + elo_b_before double precision not null, + elo_a_after double precision not null, + elo_b_after double precision not null, + created_at timestamptz default now() +); diff --git a/data_platform/assets/elo/sql/create_ratings_table.sql b/data_platform/assets/elo/sql/create_ratings_table.sql new file mode 100644 index 0000000..3b4be48 --- /dev/null +++ b/data_platform/assets/elo/sql/create_ratings_table.sql @@ -0,0 +1,9 @@ +create table if not exists {{ schema }}.ratings ( + global_id text primary key, + elo_rating double precision not null default 1500.0, + comparison_count integer not null default 0, + wins integer not null default 0, + losses integer not null default 0, + created_at timestamptz default now(), + updated_at timestamptz default now() +); diff --git a/data_platform/assets/elo/sql/create_schema.sql b/data_platform/assets/elo/sql/create_schema.sql new file mode 100644 index 0000000..4b3923d --- /dev/null +++ b/data_platform/assets/elo/sql/create_schema.sql @@ -0,0 +1 @@ +create schema if not exists {{ schema }}; diff --git a/data_platform/definitions.py b/data_platform/definitions.py index ebd6e0e..34670a0 100644 --- a/data_platform/definitions.py +++ b/data_platform/definitions.py @@ -6,6 +6,7 @@ from dagster import ( from dagster_dbt import DbtCliResource from data_platform.assets.dbt import DBT_PROJECT_DIR, dbt_project_assets +from data_platform.assets.elo import elo_comparisons, elo_ratings from data_platform.assets.ingestion.funda import ( raw_funda_listing_details, raw_funda_price_history, @@ -31,6 +32,8 @@ defs = Definitions( raw_funda_search_results, raw_funda_listing_details, raw_funda_price_history, + elo_ratings, + elo_comparisons, ] ), jobs=[funda_ingestion_job, funda_raw_quality_job, elementary_refresh_job], diff --git a/dbt/models/marts/elo_sample_listings.sql b/dbt/models/marts/elo_sample_listings.sql new file mode 100644 index 0000000..3828310 --- /dev/null +++ b/dbt/models/marts/elo_sample_listings.sql @@ -0,0 +1,14 @@ +-- Mart: stable random sample of Funda listings for pairwise ELO comparison. +-- Incrementally tops up to the target sample size using deterministic ordering. + +select l.global_id +from {{ ref('funda_listings') }} as l +{% if is_incremental() %} + left join {{ this }} as s on l.global_id = s.global_id + where s.global_id is null + order by md5(l.global_id) + limit greatest(0, 50 - (select count(*) from {{ this }})) +{% else %} + order by md5(l.global_id) + limit 50 +{% endif %} diff --git a/dbt/models/marts/elo_sample_listings.yml b/dbt/models/marts/elo_sample_listings.yml new file mode 100644 index 0000000..29bdd0b --- /dev/null +++ b/dbt/models/marts/elo_sample_listings.yml @@ -0,0 +1,31 @@ +version: 2 + +models: + - name: elo_sample_listings + description: > + Stable random sample of Funda listings used for pairwise ELO comparison in the + house-elo-ranking application. Maintains up to 50 listings, topped up incrementally using + deterministic md5-based ordering. + config: + materialized: incremental + unique_key: global_id + on_schema_change: fail + schema: elo + contract: + enforced: true + meta: + dagster: + group: elo + columns: + - name: global_id + description: Funda internal listing ID, referencing funda_listings. + data_type: text + constraints: + - type: not_null + - type: unique + tests: + - unique + - not_null + - relationships: + to: ref('funda_listings') + field: global_id