feat: separate sql

This commit is contained in:
Stijnvandenbroek
2026-03-06 14:26:20 +00:00
parent 81188a4569
commit c908d96921
16 changed files with 179 additions and 219 deletions

View File

@@ -1,17 +1,16 @@
"""Application configuration from environment variables."""
import os
from pathlib import Path
SQL_DIR = Path(__file__).parent / "sql"
class Settings:
"""Application settings loaded from environment variables.
Configure via DATABASE_URL (single connection string) or individual
POSTGRES_* variables. The application expects the target database to
contain:
- A listings table (default: marts.funda_listings)
- An ELO schema with ratings, comparisons, and sample_listings tables
(default: elo.*)
POSTGRES_* variables.
"""
POSTGRES_HOST: str = os.getenv("POSTGRES_HOST", "localhost")
@@ -23,6 +22,8 @@ class Settings:
LISTINGS_SCHEMA: str = os.getenv("LISTINGS_SCHEMA", "marts")
LISTINGS_TABLE: str = os.getenv("LISTINGS_TABLE", "funda_listings")
ELO_SCHEMA: str = os.getenv("ELO_SCHEMA", "elo")
IMAGES_SCHEMA: str = os.getenv("IMAGES_SCHEMA", "raw_funda")
IMAGES_TABLE: str = os.getenv("IMAGES_TABLE", "listing_details")
K_FACTOR: float = float(os.getenv("ELO_K_FACTOR", "32"))
DEFAULT_ELO: float = float(os.getenv("ELO_DEFAULT_RATING", "1500"))
@@ -39,3 +40,15 @@ class Settings:
settings = Settings()
def load_sql(name: str) -> str:
"""Load a SQL template from the sql/ directory and inject schema/table names."""
raw = (SQL_DIR / name).read_text()
return raw.format(
listings_schema=settings.LISTINGS_SCHEMA,
listings_table=settings.LISTINGS_TABLE,
elo_schema=settings.ELO_SCHEMA,
images_schema=settings.IMAGES_SCHEMA,
images_table=settings.IMAGES_TABLE,
)

View File

@@ -1,29 +1,9 @@
"""Shared query helpers for listing data."""
from app.config import settings
from app.config import load_sql, settings
from app.schemas import ListingResponse
LISTING_SELECT = f"""
SELECT
l.global_id, l.tiny_id, l.url, l.title, l.city, l.postcode,
l.province, l.neighbourhood, l.municipality,
l.latitude, l.longitude,
l.object_type, l.house_type, l.offering_type,
l.construction_type, l.construction_year,
l.energy_label, l.living_area, l.plot_area,
l.bedrooms, l.rooms,
l.has_garden, l.has_balcony, l.has_solar_panels,
l.has_heat_pump, l.has_roof_terrace,
l.is_energy_efficient, l.is_monument,
l.current_price, l.status, l.price_per_sqm,
l.publication_date,
COALESCE(r.elo_rating, {settings.DEFAULT_ELO}) AS elo_rating,
COALESCE(r.comparison_count, 0) AS comparison_count,
COALESCE(r.wins, 0) AS wins,
COALESCE(r.losses, 0) AS losses
FROM {settings.LISTINGS_SCHEMA}.{settings.LISTINGS_TABLE} l
LEFT JOIN {settings.ELO_SCHEMA}.ratings r ON l.global_id = r.global_id
"""
LISTING_SELECT = load_sql("listing_select.sql")
def row_to_listing(row) -> ListingResponse:

View File

@@ -6,7 +6,7 @@ from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy import text
from sqlalchemy.orm import Session
from app.config import settings
from app.config import load_sql, settings
from app.database import get_db
from app.elo import calculate_elo
from app.models import Comparison, EloRating
@@ -21,25 +21,22 @@ from app.schemas import (
router = APIRouter()
SAMPLE_JOIN = (
f" inner join {settings.ELO_SCHEMA}.sample_listings as s"
f" on l.global_id = s.global_id"
)
@router.get("/matchup", response_model=MatchupResponse)
def get_matchup(
status: str | None = None,
db: Session = Depends(get_db),
):
"""Return a weighted-random pair of listings for comparison.
Only listings in the stable sample (elo.sample_listings) are considered.
Listings with fewer comparisons are more likely to appear, ensuring
broad coverage across all properties.
"""
query = LISTING_SELECT + f"""
INNER JOIN {settings.ELO_SCHEMA}.sample_listings s
ON l.global_id = s.global_id
"""
params: dict = {}
"""Return a weighted-random pair of listings for comparison."""
query = LISTING_SELECT + SAMPLE_JOIN
params: dict = {"default_elo": settings.DEFAULT_ELO}
if status and status != "all":
query += " WHERE l.status = :status"
query += " where l.status = :status"
params["status"] = status
result = db.execute(text(query), params)
listings = [row_to_listing(row) for row in result]
@@ -50,33 +47,21 @@ def get_matchup(
detail="Not enough listings for comparison (need at least 2).",
)
# Gather recent pairs to avoid immediate repeats
recent = db.execute(
text(
f"SELECT listing_a_id, listing_b_id "
f"FROM {settings.ELO_SCHEMA}.comparisons "
f"ORDER BY created_at DESC LIMIT 20"
)
)
recent = db.execute(text(load_sql("recent_pairs.sql")))
recent_pairs = {
frozenset([r.listing_a_id, r.listing_b_id]) for r in recent
}
# Weight by inverse comparison count (prioritise less-compared houses)
weights = [1.0 / (l.comparison_count + 1) ** 1.5 for l in listings]
# Pick first listing
first = random.choices(listings, weights=weights, k=1)[0]
# Pick second listing (exclude first, avoid recent repeats)
remaining = [l for l in listings if l.global_id != first.global_id]
remaining_weights = [1.0 / (l.comparison_count + 1) ** 1.5 for l in remaining]
second = remaining[0] # fallback
second = remaining[0]
for _ in range(20):
candidate = random.choices(remaining, weights=remaining_weights, k=1)[0]
pair = frozenset([first.global_id, candidate.global_id])
if pair not in recent_pairs:
if frozenset([first.global_id, candidate.global_id]) not in recent_pairs:
second = candidate
break
else:
@@ -88,37 +73,24 @@ def get_matchup(
@router.post("/compare", response_model=CompareResponse)
def submit_comparison(body: CompareRequest, db: Session = Depends(get_db)):
"""Record a comparison result and update ELO ratings."""
winner_id = body.winner_id
loser_id = body.loser_id
if winner_id == loser_id:
if body.winner_id == body.loser_id:
raise HTTPException(status_code=400, detail="Winner and loser must differ.")
# Get or create rating records
winner_rating = db.query(EloRating).filter_by(global_id=winner_id).first()
if not winner_rating:
winner_rating = EloRating(
global_id=winner_id, elo_rating=settings.DEFAULT_ELO
)
db.add(winner_rating)
db.flush()
def get_or_create_rating(global_id: str) -> EloRating:
rating = db.query(EloRating).filter_by(global_id=global_id).first()
if not rating:
rating = EloRating(global_id=global_id, elo_rating=settings.DEFAULT_ELO)
db.add(rating)
db.flush()
return rating
loser_rating = db.query(EloRating).filter_by(global_id=loser_id).first()
if not loser_rating:
loser_rating = EloRating(
global_id=loser_id, elo_rating=settings.DEFAULT_ELO
)
db.add(loser_rating)
db.flush()
winner_rating = get_or_create_rating(body.winner_id)
loser_rating = get_or_create_rating(body.loser_id)
elo_w_before = winner_rating.elo_rating
elo_l_before = loser_rating.elo_rating
new_elo_w, new_elo_l = calculate_elo(elo_w_before, elo_l_before, settings.K_FACTOR)
new_elo_w, new_elo_l = calculate_elo(
elo_w_before, elo_l_before, settings.K_FACTOR
)
# Update ratings
winner_rating.elo_rating = new_elo_w
winner_rating.comparison_count += 1
winner_rating.wins += 1
@@ -127,12 +99,11 @@ def submit_comparison(body: CompareRequest, db: Session = Depends(get_db)):
loser_rating.comparison_count += 1
loser_rating.losses += 1
# Record comparison
db.add(
Comparison(
listing_a_id=winner_id,
listing_b_id=loser_id,
winner_id=winner_id,
listing_a_id=body.winner_id,
listing_b_id=body.loser_id,
winner_id=body.winner_id,
elo_a_before=elo_w_before,
elo_b_before=elo_l_before,
elo_a_after=new_elo_w,
@@ -142,132 +113,58 @@ def submit_comparison(body: CompareRequest, db: Session = Depends(get_db)):
db.commit()
return CompareResponse(
winner_id=winner_id,
loser_id=loser_id,
winner_id=body.winner_id,
loser_id=body.loser_id,
elo_change=round(new_elo_w - elo_w_before, 1),
new_winner_elo=round(new_elo_w, 1),
new_loser_elo=round(new_elo_l, 1),
)
def _row_to_history(r) -> ComparisonHistoryItem:
return ComparisonHistoryItem(
id=r.id,
listing_a_title=r.listing_a_title,
listing_b_title=r.listing_b_title,
winner_title=r.winner_title,
listing_a_id=r.listing_a_id,
listing_b_id=r.listing_b_id,
winner_id=r.winner_id,
elo_a_before=round(r.elo_a_before, 1),
elo_b_before=round(r.elo_b_before, 1),
elo_a_after=round(r.elo_a_after, 1),
elo_b_after=round(r.elo_b_after, 1),
created_at=r.created_at,
)
@router.get("/history", response_model=list[ComparisonHistoryItem])
def get_history(
limit: int = 50,
db: Session = Depends(get_db),
):
def get_history(limit: int = 50, db: Session = Depends(get_db)):
"""Return recent comparisons."""
query = f"""
SELECT
c.*,
a.title AS listing_a_title,
b.title AS listing_b_title,
w.title AS winner_title
FROM {settings.ELO_SCHEMA}.comparisons c
LEFT JOIN {settings.LISTINGS_SCHEMA}.{settings.LISTINGS_TABLE} a
ON c.listing_a_id = a.global_id
LEFT JOIN {settings.LISTINGS_SCHEMA}.{settings.LISTINGS_TABLE} b
ON c.listing_b_id = b.global_id
LEFT JOIN {settings.LISTINGS_SCHEMA}.{settings.LISTINGS_TABLE} w
ON c.winner_id = w.global_id
ORDER BY c.created_at DESC
LIMIT :limit
"""
rows = db.execute(text(query), {"limit": limit})
return [
ComparisonHistoryItem(
id=r.id,
listing_a_title=r.listing_a_title,
listing_b_title=r.listing_b_title,
winner_title=r.winner_title,
listing_a_id=r.listing_a_id,
listing_b_id=r.listing_b_id,
winner_id=r.winner_id,
elo_a_before=round(r.elo_a_before, 1),
elo_b_before=round(r.elo_b_before, 1),
elo_a_after=round(r.elo_a_after, 1),
elo_b_after=round(r.elo_b_after, 1),
created_at=r.created_at,
)
for r in rows
]
rows = db.execute(text(load_sql("history.sql")), {"limit": limit})
return [_row_to_history(r) for r in rows]
@router.get("/stats", response_model=StatsResponse)
def get_stats(db: Session = Depends(get_db)):
"""Return aggregate statistics about comparisons and ratings."""
total_comparisons = db.execute(
text(f"SELECT COUNT(*) FROM {settings.ELO_SCHEMA}.comparisons")
).scalar() or 0
total_rated = db.execute(
text(f"SELECT COUNT(*) FROM {settings.ELO_SCHEMA}.ratings")
).scalar() or 0
total_listings = db.execute(
text(
f"SELECT COUNT(*) FROM {settings.LISTINGS_SCHEMA}.{settings.LISTINGS_TABLE}"
)
).scalar() or 0
elo_agg = db.execute(
text(
f"SELECT AVG(elo_rating), MAX(elo_rating), MIN(elo_rating) "
f"FROM {settings.ELO_SCHEMA}.ratings"
)
).first()
total_comparisons = db.execute(text(load_sql("count_comparisons.sql"))).scalar() or 0
total_rated = db.execute(text(load_sql("count_rated.sql"))).scalar() or 0
total_listings = db.execute(text(load_sql("count_listings.sql"))).scalar() or 0
elo_agg = db.execute(text(load_sql("elo_aggregates.sql"))).first()
avg_elo = round(float(elo_agg[0]), 1) if elo_agg and elo_agg[0] else None
max_elo = round(float(elo_agg[1]), 1) if elo_agg and elo_agg[1] else None
min_elo = round(float(elo_agg[2]), 1) if elo_agg and elo_agg[2] else None
# ELO distribution in buckets of 50
dist_rows = db.execute(
text(
f"SELECT FLOOR(elo_rating / 50) * 50 AS bucket, COUNT(*) AS count "
f"FROM {settings.ELO_SCHEMA}.ratings "
f"GROUP BY bucket ORDER BY bucket"
)
)
dist_rows = db.execute(text(load_sql("elo_distribution.sql")))
elo_distribution = [
{"bucket": f"{int(r.bucket)}-{int(r.bucket) + 49}", "count": r.count}
for r in dist_rows
]
# Recent comparisons
recent_query = f"""
SELECT
c.*,
a.title AS listing_a_title,
b.title AS listing_b_title,
w.title AS winner_title
FROM {settings.ELO_SCHEMA}.comparisons c
LEFT JOIN {settings.LISTINGS_SCHEMA}.{settings.LISTINGS_TABLE} a
ON c.listing_a_id = a.global_id
LEFT JOIN {settings.LISTINGS_SCHEMA}.{settings.LISTINGS_TABLE} b
ON c.listing_b_id = b.global_id
LEFT JOIN {settings.LISTINGS_SCHEMA}.{settings.LISTINGS_TABLE} w
ON c.winner_id = w.global_id
ORDER BY c.created_at DESC
LIMIT 10
"""
recent_rows = db.execute(text(recent_query))
recent_comparisons = [
ComparisonHistoryItem(
id=r.id,
listing_a_title=r.listing_a_title,
listing_b_title=r.listing_b_title,
winner_title=r.winner_title,
listing_a_id=r.listing_a_id,
listing_b_id=r.listing_b_id,
winner_id=r.winner_id,
elo_a_before=round(r.elo_a_before, 1),
elo_b_before=round(r.elo_b_before, 1),
elo_a_after=round(r.elo_a_after, 1),
elo_b_after=round(r.elo_b_after, 1),
created_at=r.created_at,
)
for r in recent_rows
]
recent_rows = db.execute(text(load_sql("history.sql")), {"limit": 10})
recent_comparisons = [_row_to_history(r) for r in recent_rows]
return StatsResponse(
total_comparisons=total_comparisons,

View File

@@ -1,10 +1,10 @@
"""Image endpoints retrieve photo URLs from the raw Funda data."""
"""Image endpoints retrieve photo URLs from listing data."""
from fastapi import APIRouter, Depends, HTTPException
from fastapi import APIRouter, Depends
from sqlalchemy import text
from sqlalchemy.orm import Session
from app.config import settings
from app.config import load_sql
from app.database import get_db
router = APIRouter()
@@ -15,15 +15,8 @@ def get_listing_images(
global_id: str,
db: Session = Depends(get_db),
) -> dict[str, list[str]]:
"""Return image URLs for a listing from the raw Funda JSON data."""
row = db.execute(
text(
"SELECT raw_json->'photo_urls' AS photo_urls "
"FROM raw_funda.listing_details "
"WHERE global_id = :gid"
),
{"gid": global_id},
).first()
"""Return image URLs for a listing."""
row = db.execute(text(load_sql("listing_images.sql")), {"gid": global_id}).first()
if not row or not row.photo_urls:
return {"images": []}

View File

@@ -1,9 +1,10 @@
"""Listing endpoints read-only access to Funda data with ELO ratings."""
"""Listing endpoints read-only access to listing data with ELO ratings."""
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy import text
from sqlalchemy.orm import Session
from app.config import settings
from app.database import get_db
from app.queries import LISTING_SELECT, row_to_listing
from app.schemas import ListingResponse
@@ -18,11 +19,11 @@ def get_listings(
):
"""Return all listings with their current ELO rating."""
query = LISTING_SELECT
params: dict = {}
params: dict = {"default_elo": settings.DEFAULT_ELO}
if status and status != "all":
query += " WHERE l.status = :status"
query += " where l.status = :status"
params["status"] = status
query += " ORDER BY elo_rating DESC"
query += " order by elo_rating desc"
result = db.execute(text(query), params)
return [row_to_listing(row) for row in result]
@@ -30,8 +31,10 @@ def get_listings(
@router.get("/listings/{global_id}", response_model=ListingResponse)
def get_listing(global_id: str, db: Session = Depends(get_db)):
"""Return a single listing by its global_id."""
query = LISTING_SELECT + " WHERE l.global_id = :global_id"
row = db.execute(text(query), {"global_id": global_id}).first()
query = LISTING_SELECT + " where l.global_id = :global_id"
row = db.execute(
text(query), {"global_id": global_id, "default_elo": settings.DEFAULT_ELO}
).first()
if not row:
raise HTTPException(status_code=404, detail="Listing not found")
return row_to_listing(row)

View File

@@ -11,6 +11,11 @@ from app.schemas import RankingResponse
router = APIRouter()
SAMPLE_JOIN = (
f" inner join {settings.ELO_SCHEMA}.sample_listings as s"
f" on l.global_id = s.global_id"
)
@router.get("/rankings", response_model=list[RankingResponse])
def get_rankings(
@@ -19,21 +24,15 @@ def get_rankings(
offset: int = 0,
db: Session = Depends(get_db),
):
"""Return listings ranked by ELO rating (highest first).
Only listings in the stable sample (elo.sample_listings) are shown.
"""
query = LISTING_SELECT + f"""
INNER JOIN {settings.ELO_SCHEMA}.sample_listings s
ON l.global_id = s.global_id
"""
params: dict = {"limit": limit, "offset": offset}
"""Return listings ranked by ELO rating (highest first)."""
query = LISTING_SELECT + SAMPLE_JOIN
params: dict = {"limit": limit, "offset": offset, "default_elo": settings.DEFAULT_ELO}
if status and status != "all":
query += " WHERE l.status = :status"
query += " where l.status = :status"
params["status"] = status
query += " ORDER BY elo_rating DESC, l.current_price DESC LIMIT :limit OFFSET :offset"
query += " order by elo_rating desc, l.current_price desc limit :limit offset :offset"
result = db.execute(text(query), params)
listings = [row_to_listing(row) for row in result]

View File

@@ -6,7 +6,7 @@ from pydantic import BaseModel
class ListingResponse(BaseModel):
"""Funda listing combined with its ELO rating data."""
"""Listing combined with its ELO rating data."""
global_id: str
tiny_id: str | None = None
@@ -105,6 +105,3 @@ class StatsResponse(BaseModel):
min_elo: float | None
elo_distribution: list[dict]
recent_comparisons: list[ComparisonHistoryItem]

View File

@@ -0,0 +1 @@
select count(*) from {elo_schema}.comparisons

View File

@@ -0,0 +1 @@
select count(*) from {listings_schema}.{listings_table}

View File

@@ -0,0 +1 @@
select count(*) from {elo_schema}.ratings

View File

@@ -0,0 +1,5 @@
select
avg(elo_rating),
max(elo_rating),
min(elo_rating)
from {elo_schema}.ratings

View File

@@ -0,0 +1,6 @@
select
floor(elo_rating / 50) * 50 as bucket,
count(*) as count
from {elo_schema}.ratings
group by bucket
order by bucket

View File

@@ -0,0 +1,14 @@
select
c.*,
a.title as listing_a_title,
b.title as listing_b_title,
w.title as winner_title
from {elo_schema}.comparisons as c
left join {listings_schema}.{listings_table} as a
on c.listing_a_id = a.global_id
left join {listings_schema}.{listings_table} as b
on c.listing_b_id = b.global_id
left join {listings_schema}.{listings_table} as w
on c.winner_id = w.global_id
order by c.created_at desc
limit :limit

View File

@@ -0,0 +1,4 @@
select
raw_json -> 'photo_urls' as photo_urls
from {images_schema}.{images_table}
where global_id = :gid

View File

@@ -0,0 +1,40 @@
select
l.global_id,
l.tiny_id,
l.url,
l.title,
l.city,
l.postcode,
l.province,
l.neighbourhood,
l.municipality,
l.latitude,
l.longitude,
l.object_type,
l.house_type,
l.offering_type,
l.construction_type,
l.construction_year,
l.energy_label,
l.living_area,
l.plot_area,
l.bedrooms,
l.rooms,
l.has_garden,
l.has_balcony,
l.has_solar_panels,
l.has_heat_pump,
l.has_roof_terrace,
l.is_energy_efficient,
l.is_monument,
l.current_price,
l.status,
l.price_per_sqm,
l.publication_date,
coalesce(r.elo_rating, :default_elo) as elo_rating,
coalesce(r.comparison_count, 0) as comparison_count,
coalesce(r.wins, 0) as wins,
coalesce(r.losses, 0) as losses
from {listings_schema}.{listings_table} as l
left join {elo_schema}.ratings as r
on l.global_id = r.global_id

View File

@@ -0,0 +1,6 @@
select
listing_a_id,
listing_b_id
from {elo_schema}.comparisons
order by created_at desc
limit 20