The full implementation

Here is the consolidated module — k-means, exact search, IVF, PQ, and IVFPQ — in one file, code/ivfpq.py, reproduced in full.

"""
IVF and Product Quantization for approximate nearest-neighbor search,
from scratch in NumPy.

References:
    Jégou, Douze, Schmid. "Product Quantization for Nearest Neighbor Search."
    IEEE TPAMI, 2011.  (introduces PQ and the IVFADC system)

This module implements, in order of dependency:
    kmeans(X, k)        -> the building block for every codebook here
    flat_knn(X, q, k)   -> exact search (ground truth for recall)
    IVF                 -> Inverted File index (partition + probe)
    PQ                  -> Product Quantizer (compress vectors to bytes)
    IVFPQ               -> IVF + PQ on residuals (the FAISS workhorse)

Only NumPy is used.
"""

from __future__ import annotations

import numpy as np


# ===========================================================================
# k-means and exact search
# ===========================================================================
def _assign(X, C):
    """Nearest-centroid index for each row of X (squared L2)."""
    d = (X ** 2).sum(1)[:, None] - 2.0 * X @ C.T + (C ** 2).sum(1)[None, :]
    return d.argmin(1)


def kmeans(X, k, iters=25, seed=0):
    """Lloyd's k-means. Returns (centroids, assignments)."""
    X = np.asarray(X, dtype=np.float64)
    n = X.shape[0]
    k = min(k, n)
    rng = np.random.default_rng(seed)
    C = X[rng.choice(n, size=k, replace=False)].copy()
    for _ in range(iters):
        a = _assign(X, C)
        newC = C.copy()
        for j in range(k):
            members = X[a == j]
            if len(members):
                newC[j] = members.mean(0)
        if np.allclose(newC, C):
            C = newC
            break
        C = newC
    return C, _assign(X, C)


def flat_knn(X, q, k=10):
    """Exact k nearest neighbors by brute force. Returns (ids, distances)."""
    d = ((X - q) ** 2).sum(1)
    idx = np.argsort(d)[:k]
    return idx, d[idx]


# ===========================================================================
# IVF — Inverted File index (partition space, search a few partitions)
# ===========================================================================
class IVF:
    def __init__(self, nlist=64, nprobe=8, seed=0):
        self.nlist = nlist          # number of partitions (Voronoi cells)
        self.nprobe = nprobe        # how many cells to scan per query
        self.seed = seed

    def train(self, X):
        """Learn the partition centroids (the coarse quantizer)."""
        self.centroids, _ = kmeans(X, self.nlist, seed=self.seed)
        return self

    def add(self, X):
        """Assign every vector to its nearest cell and store the inverted lists."""
        self.X = np.asarray(X, dtype=np.float64)
        a = _assign(self.X, self.centroids)
        self.lists = [np.where(a == c)[0] for c in range(len(self.centroids))]
        return self

    def search(self, q, k=10, nprobe=None):
        nprobe = nprobe or self.nprobe
        q = np.asarray(q, dtype=np.float64)
        dc = ((self.centroids - q) ** 2).sum(1)
        cells = np.argsort(dc)[:nprobe]
        cand = np.concatenate([self.lists[c] for c in cells]) if len(cells) else np.array([], int)
        if cand.size == 0:
            return np.array([], int), np.array([])
        d = ((self.X[cand] - q) ** 2).sum(1)     # exact distance within probed cells
        order = np.argsort(d)[:k]
        return cand[order], d[order]


# ===========================================================================
# PQ — Product Quantization (compress each vector to `m` bytes)
# ===========================================================================
class PQ:
    def __init__(self, m=8, ksub=256, seed=0):
        self.m = m                  # number of sub-vectors
        self.ksub = ksub            # centroids per sub-quantizer (256 => 1 byte)
        self.seed = seed

    def train(self, X):
        X = np.asarray(X, dtype=np.float64)
        n, D = X.shape
        if D % self.m != 0:
            raise ValueError(f"dim {D} not divisible by m={self.m}")
        self.dsub = D // self.m
        self.codebooks = np.zeros((self.m, self.ksub, self.dsub))
        for j in range(self.m):
            sub = X[:, j * self.dsub:(j + 1) * self.dsub]
            C, _ = kmeans(sub, self.ksub, seed=self.seed + j)
            self.codebooks[j, :len(C)] = C
        return self

    def encode(self, X):
        """Map each vector to `m` centroid ids (uint8 codes)."""
        X = np.asarray(X, dtype=np.float64)
        codes = np.zeros((X.shape[0], self.m), dtype=np.uint8)
        for j in range(self.m):
            sub = X[:, j * self.dsub:(j + 1) * self.dsub]
            codes[:, j] = _assign(sub, self.codebooks[j])
        return codes

    def distance_table(self, q):
        """Squared distance from q's sub-vectors to every centroid: shape (m, ksub)."""
        q = np.asarray(q, dtype=np.float64)
        table = np.zeros((self.m, self.ksub))
        for j in range(self.m):
            qs = q[j * self.dsub:(j + 1) * self.dsub]
            table[j] = ((self.codebooks[j] - qs) ** 2).sum(1)
        return table

    def adc_distances(self, codes, q):
        """Asymmetric distance: approx ||q - x|| using only the codes + table."""
        table = self.distance_table(q)
        d = np.zeros(len(codes))
        for j in range(self.m):
            d += table[j, codes[:, j]]
        return d


# ===========================================================================
# IVFPQ — IVF + PQ on residuals (compressed AND partitioned)
# ===========================================================================
class IVFPQ:
    def __init__(self, nlist=64, nprobe=8, m=8, ksub=256, seed=0):
        self.nlist = nlist
        self.nprobe = nprobe
        self.m = m
        self.ksub = ksub
        self.seed = seed

    def train(self, X):
        X = np.asarray(X, dtype=np.float64)
        self.centroids, a = kmeans(X, self.nlist, seed=self.seed)
        residuals = X - self.centroids[a]          # PQ encodes the residual
        self.pq = PQ(self.m, self.ksub, seed=self.seed + 1).train(residuals)
        return self

    def add(self, X):
        X = np.asarray(X, dtype=np.float64)
        a = _assign(X, self.centroids)
        residuals = X - self.centroids[a]
        self.codes = self.pq.encode(residuals)
        self.lists = [np.where(a == c)[0] for c in range(len(self.centroids))]
        return self

    def search(self, q, k=10, nprobe=None):
        nprobe = nprobe or self.nprobe
        q = np.asarray(q, dtype=np.float64)
        dc = ((self.centroids - q) ** 2).sum(1)
        cells = np.argsort(dc)[:nprobe]
        all_d, all_id = [], []
        for c in cells:
            ids = self.lists[c]
            if ids.size == 0:
                continue
            res_q = q - self.centroids[c]          # residual relative to this cell
            d = self.pq.adc_distances(self.codes[ids], res_q)
            all_d.append(d)
            all_id.append(ids)
        if not all_id:
            return np.array([], int), np.array([])
        d = np.concatenate(all_d)
        ids = np.concatenate(all_id)
        order = np.argsort(d)[:k]
        return ids[order], d[order]


__all__ = ["kmeans", "flat_knn", "IVF", "PQ", "IVFPQ"]

The {{#include}} directive splices in the real code/ivfpq.py at build time, so the documentation can never drift from the actual implementation.

API at a glance

CallPurpose
kmeans(X, k)Learn k centroids + assignments (used by everything).
flat_knn(X, q, k)Exact search — the ground truth for recall.
IVF(nlist, nprobe).train(X).add(X)Partitioned exact search.
PQ(m, ksub).train(X)Compress vectors to m-byte codes; .encode, .adc_distances.
IVFPQ(nlist, m).train(X).add(X)Partitioned and compressed search.

Typical usage

import numpy as np
from ivfpq import IVFPQ

vectors = np.load("embeddings.npy")        # shape (n, d), d divisible by m

index = IVFPQ(nlist=256, m=16, ksub=256).train(vectors)
index.add(vectors)

q = vectors[0]
ids, approx_d = index.search(q, k=100, nprobe=16)   # shortlist via compressed codes
# re-rank the shortlist exactly for high recall:
exact_d = ((vectors[ids] - q) ** 2).sum(1)
top10 = ids[np.argsort(exact_d)[:10]]
print(top10)

What this is and isn't

This implementation is written for clarity — it shows exactly how IVF, PQ, and IVFPQ work and produces correct, measurable recall. Production libraries (FAISS, ScaNN) add the speed and scale: SIMD/AVX distance kernels, optimized PQ table lookups (PQFastScan), GPU support, multi-threaded training on samples, on-disk indexes, and combinations like HNSW-as-coarse-quantizer or OPQ (a learned rotation before PQ that boosts recall). They share these exact algorithms; they just run them far faster — see the use-cases chapter, which maps each class here to its FAISS equivalent.

Next: measure the whole thing. 👉