The FastAPI service

Models are useless until something can call them. api.py wraps the recommender and RAG assistant in a FastAPI service — the online half of the system.

The endpoints

Method & pathPurpose
GET /healthliveness + which model/embedder/RAG mode is loaded
GET /recommend/{user_id}?k=personalized recommendations (two-stage)
GET /search?q=&k=content/vector search over the catalog
POST /ask {query, k}the RAG news assistant
POST /feedback {user_id, news_id, event}log an interaction (online update)

It loads models/newsreco.pkl if present (from training) or trains a fresh model on startup, so it always comes up ready.

"""FastAPI service exposing the recommender + RAG assistant.

Endpoints:
  GET  /health
  GET  /recommend/{user_id}?k=10        -> personalized recommendations
  GET  /search?q=...&k=10               -> content search over the catalog
  POST /ask         {query, k}          -> RAG news assistant (Claude or offline)
  POST /feedback    {user_id, news_id, event}  -> log an interaction (online update)

Loads a trained model from models/newsreco.pkl if present; otherwise it trains a
fresh one from the data on startup. Run:

    uvicorn newsreco.api:app --reload --port 8000
"""

from __future__ import annotations

import os
import pickle

from .config import Config
from .data import load_all
from .recommender import NewsRecommender
from .rag import NewsAssistant

try:
    from fastapi import FastAPI, HTTPException
    from pydantic import BaseModel
except Exception as e:                  # pragma: no cover
    raise SystemExit("FastAPI not installed. pip install fastapi uvicorn") from e


cfg = Config()


def _load_or_train():
    # 1. If asked, load the current 'production' model from the MLflow registry.
    if os.environ.get("NEWSRECO_USE_REGISTRY") == "1":
        try:
            from .registry import load_production
            bundle = load_production()
            if bundle:
                return bundle["recommender"], bundle.get("ranker")
        except Exception:
            pass  # fall through to local artifact / fresh train
    # 2. Otherwise load the locally trained artifact.
    path = os.path.join("models", "newsreco.pkl")
    if os.path.exists(path):
        with open(path, "rb") as f:
            bundle = pickle.load(f)
        return bundle["recommender"], bundle.get("ranker")
    # 3. Last resort: train a fresh model on startup.
    rec = NewsRecommender(embedder=cfg.embedder, half_life_hours=cfg.half_life_hours)
    rec.fit(load_all(cfg))
    return rec, None


app = FastAPI(title="News Recommender", version="1.0")
rec, ranker = _load_or_train()
assistant = NewsAssistant(rec, api_key=cfg.anthropic_api_key, model=cfg.llm_model)


def _article_dict(nid, score=None):
    a = rec.data.articles[nid]
    d = {"id": nid, "title": a.title, "abstract": a.abstract,
         "category": a.category, "subcategory": a.subcategory}
    if score is not None:
        d["score"] = round(float(score), 4)
    return d


class AskRequest(BaseModel):
    query: str
    k: int = 5


class FeedbackRequest(BaseModel):
    user_id: str
    news_id: str
    event: str = "click"


@app.get("/health")
def health():
    return {"status": "ok", "articles": len(rec.data.articles),
            "embedder": rec.embedder.name, "has_ranker": ranker is not None,
            "rag_mode": "claude" if cfg.anthropic_api_key else "extractive"}


@app.get("/recommend/{user_id}")
def recommend(user_id: str, k: int = 10):
    ids = rec.recommend(user_id, k=k, ranker=ranker)
    cold = rec.profile(user_id) is None
    return {"user_id": user_id, "cold_start": cold,
            "recommendations": [_article_dict(nid) for nid in ids]}


@app.get("/search")
def search(q: str, k: int = 10):
    qv = rec.embedder.transform([q])[0]
    hits = rec.index.search(qv, k)
    return {"query": q, "results": [_article_dict(nid, s) for nid, s in hits]}


@app.post("/ask")
def ask(req: AskRequest):
    return assistant.ask(req.query, k=req.k)


@app.post("/feedback")
def feedback(req: FeedbackRequest):
    if req.news_id not in rec.data.articles:
        raise HTTPException(status_code=404, detail="unknown news_id")
    # online update: append to the user's history so the next request reflects it.
    rec.history.setdefault(req.user_id, []).append((req.news_id, rec.now))
    return {"status": "recorded", "user_id": req.user_id, "news_id": req.news_id}

Verified responses

These are real responses from the running app (via FastAPI's TestClient):

GET /health
{'status': 'ok', 'articles': 300, 'embedder': 'tfidf',
 'has_ranker': True, 'rag_mode': 'extractive'}

GET /recommend/U106?k=3   (cold_start=False)
  soccer | Erling Haaland wins Ballon d'Or after stellar football season
  soccer | Jude Bellingham wins Ballon d'Or after stellar football season
  soccer | Bukayo Saka wins Ballon d'Or after stellar football season

GET /search?q=world cup final winner
  0.542 | Croatia reach the World Cup final after dramatic win
  0.535 | Morocco reach the World Cup quarter-final after dramatic win
  0.535 | Portugal reach the World Cup quarter-final after dramatic win

POST /ask {"query":"Who scored in the Champions League?","k":2}   (mode=extractive)
  sources: ['Chelsea beat Juventus 2-0 in the Europa League',
            'Liverpool beat Barcelona 3-0 in the Premier League']

POST /feedback {"user_id":"U106","news_id":"N2"}
  {'status': 'recorded', 'user_id': 'U106', 'news_id': 'N2'}

GET /recommend/UNEW   ->  cold_start = True   (new user → trending fallback)

Everything works as designed: personalized soccer recs for a soccer user, semantic search, grounded RAG answers, feedback logging, and the cold-start flag flips to True for an unknown user.

Running it

uvicorn newsreco.api:app --reload --port 8000

Then explore the auto-generated API docs at http://localhost:8000/docs (FastAPI builds an interactive Swagger UI for free), or curl it:

curl localhost:8000/health
curl "localhost:8000/recommend/U106?k=5"
curl -X POST localhost:8000/ask -H 'content-type: application/json' \
     -d '{"query":"Who won the Champions League?","k":3}'

The online-feedback loop

POST /feedback appends the click to the user's history in memory, so the next /recommend call reflects it immediately — a minimal version of real-time personalization. In production you'd write feedback to a stream/store and refresh the user vector from it, but the principle is the same: the system learns from behavior as it happens.

Production hardening (checklist)

The code is clean and correct; before real traffic you'd add:

  • CORS middleware for the browser, auth on write endpoints, and request validation/limits.
  • Async + batching for the model calls; load the model once per worker.
  • Caching of per-user candidate lists; rate limiting.
  • Observability — structured logs, latency/error metrics, tracing.
  • Load the Production model from the MLflow registry rather than a local pickle.

Next, the user interface. 👉