The FastAPI service
Models are useless until something can call them. api.py wraps the recommender
and RAG assistant in a FastAPI service — the online half of the system.
The endpoints
| Method & path | Purpose |
|---|---|
GET /health | liveness + which model/embedder/RAG mode is loaded |
GET /recommend/{user_id}?k= | personalized recommendations (two-stage) |
GET /search?q=&k= | content/vector search over the catalog |
POST /ask {query, k} | the RAG news assistant |
POST /feedback {user_id, news_id, event} | log an interaction (online update) |
It loads models/newsreco.pkl if present (from training)
or trains a fresh model on startup, so it always comes up ready.
"""FastAPI service exposing the recommender + RAG assistant.
Endpoints:
GET /health
GET /recommend/{user_id}?k=10 -> personalized recommendations
GET /search?q=...&k=10 -> content search over the catalog
POST /ask {query, k} -> RAG news assistant (Claude or offline)
POST /feedback {user_id, news_id, event} -> log an interaction (online update)
Loads a trained model from models/newsreco.pkl if present; otherwise it trains a
fresh one from the data on startup. Run:
uvicorn newsreco.api:app --reload --port 8000
"""
from __future__ import annotations
import os
import pickle
from .config import Config
from .data import load_all
from .recommender import NewsRecommender
from .rag import NewsAssistant
try:
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
except Exception as e: # pragma: no cover
raise SystemExit("FastAPI not installed. pip install fastapi uvicorn") from e
cfg = Config()
def _load_or_train():
# 1. If asked, load the current 'production' model from the MLflow registry.
if os.environ.get("NEWSRECO_USE_REGISTRY") == "1":
try:
from .registry import load_production
bundle = load_production()
if bundle:
return bundle["recommender"], bundle.get("ranker")
except Exception:
pass # fall through to local artifact / fresh train
# 2. Otherwise load the locally trained artifact.
path = os.path.join("models", "newsreco.pkl")
if os.path.exists(path):
with open(path, "rb") as f:
bundle = pickle.load(f)
return bundle["recommender"], bundle.get("ranker")
# 3. Last resort: train a fresh model on startup.
rec = NewsRecommender(embedder=cfg.embedder, half_life_hours=cfg.half_life_hours)
rec.fit(load_all(cfg))
return rec, None
app = FastAPI(title="News Recommender", version="1.0")
rec, ranker = _load_or_train()
assistant = NewsAssistant(rec, api_key=cfg.anthropic_api_key, model=cfg.llm_model)
def _article_dict(nid, score=None):
a = rec.data.articles[nid]
d = {"id": nid, "title": a.title, "abstract": a.abstract,
"category": a.category, "subcategory": a.subcategory}
if score is not None:
d["score"] = round(float(score), 4)
return d
class AskRequest(BaseModel):
query: str
k: int = 5
class FeedbackRequest(BaseModel):
user_id: str
news_id: str
event: str = "click"
@app.get("/health")
def health():
return {"status": "ok", "articles": len(rec.data.articles),
"embedder": rec.embedder.name, "has_ranker": ranker is not None,
"rag_mode": "claude" if cfg.anthropic_api_key else "extractive"}
@app.get("/recommend/{user_id}")
def recommend(user_id: str, k: int = 10):
ids = rec.recommend(user_id, k=k, ranker=ranker)
cold = rec.profile(user_id) is None
return {"user_id": user_id, "cold_start": cold,
"recommendations": [_article_dict(nid) for nid in ids]}
@app.get("/search")
def search(q: str, k: int = 10):
qv = rec.embedder.transform([q])[0]
hits = rec.index.search(qv, k)
return {"query": q, "results": [_article_dict(nid, s) for nid, s in hits]}
@app.post("/ask")
def ask(req: AskRequest):
return assistant.ask(req.query, k=req.k)
@app.post("/feedback")
def feedback(req: FeedbackRequest):
if req.news_id not in rec.data.articles:
raise HTTPException(status_code=404, detail="unknown news_id")
# online update: append to the user's history so the next request reflects it.
rec.history.setdefault(req.user_id, []).append((req.news_id, rec.now))
return {"status": "recorded", "user_id": req.user_id, "news_id": req.news_id}
Verified responses
These are real responses from the running app (via FastAPI's TestClient):
GET /health
{'status': 'ok', 'articles': 300, 'embedder': 'tfidf',
'has_ranker': True, 'rag_mode': 'extractive'}
GET /recommend/U106?k=3 (cold_start=False)
soccer | Erling Haaland wins Ballon d'Or after stellar football season
soccer | Jude Bellingham wins Ballon d'Or after stellar football season
soccer | Bukayo Saka wins Ballon d'Or after stellar football season
GET /search?q=world cup final winner
0.542 | Croatia reach the World Cup final after dramatic win
0.535 | Morocco reach the World Cup quarter-final after dramatic win
0.535 | Portugal reach the World Cup quarter-final after dramatic win
POST /ask {"query":"Who scored in the Champions League?","k":2} (mode=extractive)
sources: ['Chelsea beat Juventus 2-0 in the Europa League',
'Liverpool beat Barcelona 3-0 in the Premier League']
POST /feedback {"user_id":"U106","news_id":"N2"}
{'status': 'recorded', 'user_id': 'U106', 'news_id': 'N2'}
GET /recommend/UNEW -> cold_start = True (new user → trending fallback)
Everything works as designed: personalized soccer recs for a soccer user, semantic
search, grounded RAG answers, feedback logging, and the cold-start flag flips
to True for an unknown user.
Running it
uvicorn newsreco.api:app --reload --port 8000
Then explore the auto-generated API docs at http://localhost:8000/docs
(FastAPI builds an interactive Swagger UI for free), or curl it:
curl localhost:8000/health
curl "localhost:8000/recommend/U106?k=5"
curl -X POST localhost:8000/ask -H 'content-type: application/json' \
-d '{"query":"Who won the Champions League?","k":3}'
The online-feedback loop
POST /feedback appends the click to the user's history in memory, so the next
/recommend call reflects it immediately — a minimal version of real-time
personalization. In production you'd write feedback to a stream/store and refresh
the user vector from it, but the principle is the same: the system learns from
behavior as it happens.
Production hardening (checklist)
The code is clean and correct; before real traffic you'd add:
- CORS middleware for the browser, auth on write endpoints, and request validation/limits.
- Async + batching for the model calls; load the model once per worker.
- Caching of per-user candidate lists; rate limiting.
- Observability — structured logs, latency/error metrics, tracing.
- Load the Production model from the MLflow registry rather than a local pickle.
Next, the user interface. 👉