The recommender & two-stage ranker

This is the heart of the capstone: the two-stage design from Chapter 9, built for real. Stage 1 generates candidates fast with embeddings + a time-decayed profile + ANN search. Stage 2 re-ranks them with a learned click model.

Stage 1 — candidate generation

recommender.py embeds every article (title + abstract), builds a vector index, and computes a time-decayed user profile, then retrieves the nearest unseen articles. It also computes trending (time-decayed popularity) for cold-start users, and dedupes by title so the feed isn't ten near-identical stories.

"""The recommender: content embeddings + time-decayed user profile + ANN
candidate generation, with a trending fallback for cold-start users.

This is stage 1 (candidate generation). The LogisticRanker (ranker.py) is the
optional stage 2 that re-scores candidates with extra features.
"""

from __future__ import annotations

import math

import numpy as np

from .ann import VectorIndex
from .embeddings import make_embedder


class NewsRecommender:
    def __init__(self, embedder="tfidf", half_life_hours=72.0):
        self.embedder = make_embedder(embedder) if isinstance(embedder, str) else embedder
        self.half_life = half_life_hours * 3600.0      # to seconds (timestamps are seconds)
        self.index = VectorIndex()

    # ---------------------------------------------------------------- fitting
    def fit(self, data):
        self.data = data
        self.ids = data.article_ids
        self.id_to_row = {nid: r for r, nid in enumerate(self.ids)}
        texts = [data.articles[nid].text for nid in self.ids]
        self.embedder.fit(texts)
        self.emb = self.embedder.transform(texts)      # L2-normalized rows
        self.index.build(self.emb, self.ids)

        # popularity + time-decayed "trending" from clicks
        self.now = max((t for _, _, t in data.interactions), default=0.0)
        lam = math.log(2) / self.half_life
        self.pop = np.zeros(len(self.ids))
        self.trend = np.zeros(len(self.ids))
        for _, nid, t in data.interactions:
            r = self.id_to_row.get(nid)
            if r is not None:
                self.pop[r] += 1.0
                self.trend[r] += math.exp(-lam * (self.now - t))

        # per-user click history (item, time)
        self.history = {u: list(h) for u, h in data.user_history.items()}
        # per-user preferred categories (for the ranker's category-match feature)
        self.user_cats = {}
        for u, hist in self.history.items():
            cats = {}
            for nid, _ in hist:
                a = data.articles.get(nid)
                if a:
                    cats[a.subcategory] = cats.get(a.subcategory, 0) + 1
            self.user_cats[u] = cats
        return self

    # ---------------------------------------------------------------- profile
    def profile(self, user):
        hist = self.history.get(user)
        if not hist:
            return None
        lam = math.log(2) / self.half_life
        rows, weights = [], []
        for nid, t in hist:
            r = self.id_to_row.get(nid)
            if r is not None:
                rows.append(r)
                weights.append(math.exp(-lam * (self.now - t)))
        if not rows:
            return None
        w = np.asarray(weights)
        p = (w[:, None] * self.emb[rows]).sum(0) / w.sum()
        n = np.linalg.norm(p)
        return p / n if n > 0 else None

    def seen(self, user):
        return {nid for nid, _ in self.history.get(user, [])}

    # ------------------------------------------------------- recommendation
    def trending(self, k=10, exclude=()):
        order = np.argsort(-self.trend)
        out = [self.ids[r] for r in order if self.ids[r] not in exclude]
        return out[:k]

    def candidates(self, user, k=100, dedup=True):
        """Stage-1 candidate generation. Returns [(news_id, content_score)].

        Dedupes near-duplicate articles by title (a basic diversity guard —
        production systems do this so the feed isn't ten copies of one story).
        """
        p = self.profile(user)
        if p is None:
            return [(nid, 0.0) for nid in self.trending(k, self.seen(user))]
        hits = self.index.search(p, (k + len(self.seen(user))) * 3)
        seen = self.seen(user)
        out, titles = [], set()
        for nid, s in hits:
            if nid in seen:
                continue
            title = self.data.articles[nid].title.lower() if dedup else nid
            if title in titles:
                continue
            titles.add(title)
            out.append((nid, s))
            if len(out) >= k:
                break
        return out

    def recommend(self, user, k=10, ranker=None):
        """Full pipeline: candidate generation, then optional ranking."""
        cands = self.candidates(user, k=max(k * 10, 50))
        if not cands:
            return self.trending(k, self.seen(user))
        if ranker is None:
            return [nid for nid, _ in cands[:k]]
        feats = np.array([self.features(user, nid, content) for nid, content in cands])
        scores = ranker.predict(feats)
        order = np.argsort(-scores)
        return [cands[i][0] for i in order[:k]]

    # --------------------------------------------------- ranker features
    def features(self, user, news_id, content_score=None):
        """Feature vector for a (user, candidate) pair (used by the ranker)."""
        r = self.id_to_row.get(news_id)
        if content_score is None:
            p = self.profile(user)
            content_score = float(self.emb[r] @ p) if (p is not None and r is not None) else 0.0
        pop = math.log1p(self.pop[r]) if r is not None else 0.0
        trend = self.trend[r] if r is not None else 0.0
        a = self.data.articles.get(news_id)
        cat_match = float(self.user_cats.get(user, {}).get(a.subcategory, 0) > 0) if a else 0.0
        return [content_score, pop, trend, cat_match, 1.0]   # last = bias term

    n_features = 5

Key pieces:

  • fit embeds articles via the pluggable embedder (TF-IDF by default, sentence-transformers in production) and builds the ANN index — the VectorIndex here is exact cosine, but its interface matches HNSW/FAISS for a drop-in production swap.
  • profile is the time-decayed average of the user's clicked-article embeddings — recent reads weigh most (the half-life knob).
  • candidates runs the ANN search from the profile, filters seen items, and dedupes by title.
  • recommend returns candidates directly, or — if given a ranker — re-scores them (stage 2).
  • features builds the (user, candidate) feature vector the ranker consumes.

Stage 2 — the learned ranker

Stage 1 optimizes recall (don't miss good items); stage 2 optimizes precision (order them well). ranker.py is a from-scratch logistic-regression click model trained on the impression labels — the click-through-rate model real systems use, kept small and transparent.

"""Stage-2 ranker: a from-scratch logistic-regression click model.

It re-scores the candidates from stage 1 using features (content similarity,
popularity, trending, category match). Trained on the impression labels in
behaviors.tsv — exactly the click-through-rate model real systems use, just
small and transparent.
"""

from __future__ import annotations

import numpy as np


class LogisticRanker:
    def __init__(self, lr=0.1, reg=1e-4, epochs=200, seed=0):
        self.lr, self.reg, self.epochs, self.seed = lr, reg, epochs, seed

    def fit(self, X, y):
        X = np.asarray(X, dtype=np.float64)
        y = np.asarray(y, dtype=np.float64)
        # standardize features (except the bias column, last) for stable training
        self.mean = X.mean(0); self.mean[-1] = 0.0
        self.std = X.std(0); self.std[self.std < 1e-9] = 1.0; self.std[-1] = 1.0
        Xs = (X - self.mean) / self.std
        rng = np.random.default_rng(self.seed)
        self.w = rng.normal(scale=0.01, size=Xs.shape[1])
        n = len(y)
        for _ in range(self.epochs):
            z = Xs @ self.w
            p = 1.0 / (1.0 + np.exp(-z))
            grad = Xs.T @ (p - y) / n + self.reg * self.w
            self.w -= self.lr * grad
        return self

    def predict(self, X):
        Xs = (np.asarray(X, dtype=np.float64) - self.mean) / self.std
        return 1.0 / (1.0 + np.exp(-(Xs @ self.w)))


def build_training_samples(recommender, impressions, max_impressions=None):
    """
    Turn impression logs into (X, y) for the click model: each shown candidate
    becomes one row, labelled 1 if clicked.
    """
    X, y = [], []
    imps = impressions if max_impressions is None else impressions[:max_impressions]
    for imp in imps:
        u = imp["user"]
        for nid, label in imp["candidates"]:
            X.append(recommender.features(u, nid))
            y.append(label)
    return np.array(X), np.array(y)

Its features per (user, candidate): content similarity, popularity, trending, category match, and a bias term. It learns weights that predict clicks.

See it run

Recommending for a soccer-history user, then training the ranker:

user U106 top-5 (candidate gen only):
  soccer | Bayern Munich and Chelsea play out thrilling 2-2 draw
  soccer | Barcelona and Inter Milan play out thrilling 4-4 draw
  soccer | Bukayo Saka wins Ballon d'Or after stellar football season
  soccer | Manchester City and Paris Saint-Germain play out thrilling 4-4 draw
  soccer | Kylian Mbappe wins Ballon d'Or after stellar football season

ranker trained on 22009 samples; click-AUC = 0.925
ranker weights [content, pop, trend, cat_match, bias]: [0.93, 0.118, 0.021, 0.68, -1.92]

Two things worth noting:

  • The candidates are all soccer (the user's taste) and deduped (distinct titles) — exactly what we want.
  • The ranker reaches AUC 0.925 at predicting clicks, and its learned weights are interpretable: content similarity (0.93) and category match (0.68) drive clicks most; the negative bias reflects that most shown items aren't clicked. That's a sensible, debuggable model — not a black box.

Why two stages (recap)

Running the logistic ranker over all 300 articles per request would be wasteful, and over millions it'd be impossible. Stage 1's ANN search cheaply narrows millions → a few hundred; stage 2 spends its effort only there. This is the architecture behind essentially every large recommender — and here it is in ~150 lines you can read end to end.

Next: track training runs so you can compare models and ship the best one. 👉