The full implementation
Here is the consolidated module — k-means, exact search, IVF, PQ, and IVFPQ — in
one file, code/ivfpq.py, reproduced in full.
"""
IVF and Product Quantization for approximate nearest-neighbor search,
from scratch in NumPy.
References:
Jégou, Douze, Schmid. "Product Quantization for Nearest Neighbor Search."
IEEE TPAMI, 2011. (introduces PQ and the IVFADC system)
This module implements, in order of dependency:
kmeans(X, k) -> the building block for every codebook here
flat_knn(X, q, k) -> exact search (ground truth for recall)
IVF -> Inverted File index (partition + probe)
PQ -> Product Quantizer (compress vectors to bytes)
IVFPQ -> IVF + PQ on residuals (the FAISS workhorse)
Only NumPy is used.
"""
from __future__ import annotations
import numpy as np
# ===========================================================================
# k-means and exact search
# ===========================================================================
def _assign(X, C):
"""Nearest-centroid index for each row of X (squared L2)."""
d = (X ** 2).sum(1)[:, None] - 2.0 * X @ C.T + (C ** 2).sum(1)[None, :]
return d.argmin(1)
def kmeans(X, k, iters=25, seed=0):
"""Lloyd's k-means. Returns (centroids, assignments)."""
X = np.asarray(X, dtype=np.float64)
n = X.shape[0]
k = min(k, n)
rng = np.random.default_rng(seed)
C = X[rng.choice(n, size=k, replace=False)].copy()
for _ in range(iters):
a = _assign(X, C)
newC = C.copy()
for j in range(k):
members = X[a == j]
if len(members):
newC[j] = members.mean(0)
if np.allclose(newC, C):
C = newC
break
C = newC
return C, _assign(X, C)
def flat_knn(X, q, k=10):
"""Exact k nearest neighbors by brute force. Returns (ids, distances)."""
d = ((X - q) ** 2).sum(1)
idx = np.argsort(d)[:k]
return idx, d[idx]
# ===========================================================================
# IVF — Inverted File index (partition space, search a few partitions)
# ===========================================================================
class IVF:
def __init__(self, nlist=64, nprobe=8, seed=0):
self.nlist = nlist # number of partitions (Voronoi cells)
self.nprobe = nprobe # how many cells to scan per query
self.seed = seed
def train(self, X):
"""Learn the partition centroids (the coarse quantizer)."""
self.centroids, _ = kmeans(X, self.nlist, seed=self.seed)
return self
def add(self, X):
"""Assign every vector to its nearest cell and store the inverted lists."""
self.X = np.asarray(X, dtype=np.float64)
a = _assign(self.X, self.centroids)
self.lists = [np.where(a == c)[0] for c in range(len(self.centroids))]
return self
def search(self, q, k=10, nprobe=None):
nprobe = nprobe or self.nprobe
q = np.asarray(q, dtype=np.float64)
dc = ((self.centroids - q) ** 2).sum(1)
cells = np.argsort(dc)[:nprobe]
cand = np.concatenate([self.lists[c] for c in cells]) if len(cells) else np.array([], int)
if cand.size == 0:
return np.array([], int), np.array([])
d = ((self.X[cand] - q) ** 2).sum(1) # exact distance within probed cells
order = np.argsort(d)[:k]
return cand[order], d[order]
# ===========================================================================
# PQ — Product Quantization (compress each vector to `m` bytes)
# ===========================================================================
class PQ:
def __init__(self, m=8, ksub=256, seed=0):
self.m = m # number of sub-vectors
self.ksub = ksub # centroids per sub-quantizer (256 => 1 byte)
self.seed = seed
def train(self, X):
X = np.asarray(X, dtype=np.float64)
n, D = X.shape
if D % self.m != 0:
raise ValueError(f"dim {D} not divisible by m={self.m}")
self.dsub = D // self.m
self.codebooks = np.zeros((self.m, self.ksub, self.dsub))
for j in range(self.m):
sub = X[:, j * self.dsub:(j + 1) * self.dsub]
C, _ = kmeans(sub, self.ksub, seed=self.seed + j)
self.codebooks[j, :len(C)] = C
return self
def encode(self, X):
"""Map each vector to `m` centroid ids (uint8 codes)."""
X = np.asarray(X, dtype=np.float64)
codes = np.zeros((X.shape[0], self.m), dtype=np.uint8)
for j in range(self.m):
sub = X[:, j * self.dsub:(j + 1) * self.dsub]
codes[:, j] = _assign(sub, self.codebooks[j])
return codes
def distance_table(self, q):
"""Squared distance from q's sub-vectors to every centroid: shape (m, ksub)."""
q = np.asarray(q, dtype=np.float64)
table = np.zeros((self.m, self.ksub))
for j in range(self.m):
qs = q[j * self.dsub:(j + 1) * self.dsub]
table[j] = ((self.codebooks[j] - qs) ** 2).sum(1)
return table
def adc_distances(self, codes, q):
"""Asymmetric distance: approx ||q - x|| using only the codes + table."""
table = self.distance_table(q)
d = np.zeros(len(codes))
for j in range(self.m):
d += table[j, codes[:, j]]
return d
# ===========================================================================
# IVFPQ — IVF + PQ on residuals (compressed AND partitioned)
# ===========================================================================
class IVFPQ:
def __init__(self, nlist=64, nprobe=8, m=8, ksub=256, seed=0):
self.nlist = nlist
self.nprobe = nprobe
self.m = m
self.ksub = ksub
self.seed = seed
def train(self, X):
X = np.asarray(X, dtype=np.float64)
self.centroids, a = kmeans(X, self.nlist, seed=self.seed)
residuals = X - self.centroids[a] # PQ encodes the residual
self.pq = PQ(self.m, self.ksub, seed=self.seed + 1).train(residuals)
return self
def add(self, X):
X = np.asarray(X, dtype=np.float64)
a = _assign(X, self.centroids)
residuals = X - self.centroids[a]
self.codes = self.pq.encode(residuals)
self.lists = [np.where(a == c)[0] for c in range(len(self.centroids))]
return self
def search(self, q, k=10, nprobe=None):
nprobe = nprobe or self.nprobe
q = np.asarray(q, dtype=np.float64)
dc = ((self.centroids - q) ** 2).sum(1)
cells = np.argsort(dc)[:nprobe]
all_d, all_id = [], []
for c in cells:
ids = self.lists[c]
if ids.size == 0:
continue
res_q = q - self.centroids[c] # residual relative to this cell
d = self.pq.adc_distances(self.codes[ids], res_q)
all_d.append(d)
all_id.append(ids)
if not all_id:
return np.array([], int), np.array([])
d = np.concatenate(all_d)
ids = np.concatenate(all_id)
order = np.argsort(d)[:k]
return ids[order], d[order]
__all__ = ["kmeans", "flat_knn", "IVF", "PQ", "IVFPQ"]
The
{{#include}}directive splices in the realcode/ivfpq.pyat build time, so the documentation can never drift from the actual implementation.
API at a glance
| Call | Purpose |
|---|---|
kmeans(X, k) | Learn k centroids + assignments (used by everything). |
flat_knn(X, q, k) | Exact search — the ground truth for recall. |
IVF(nlist, nprobe).train(X).add(X) | Partitioned exact search. |
PQ(m, ksub).train(X) | Compress vectors to m-byte codes; .encode, .adc_distances. |
IVFPQ(nlist, m).train(X).add(X) | Partitioned and compressed search. |
Typical usage
import numpy as np
from ivfpq import IVFPQ
vectors = np.load("embeddings.npy") # shape (n, d), d divisible by m
index = IVFPQ(nlist=256, m=16, ksub=256).train(vectors)
index.add(vectors)
q = vectors[0]
ids, approx_d = index.search(q, k=100, nprobe=16) # shortlist via compressed codes
# re-rank the shortlist exactly for high recall:
exact_d = ((vectors[ids] - q) ** 2).sum(1)
top10 = ids[np.argsort(exact_d)[:10]]
print(top10)
What this is and isn't
This implementation is written for clarity — it shows exactly how IVF, PQ, and
IVFPQ work and produces correct, measurable recall. Production libraries
(FAISS, ScaNN) add the speed and scale: SIMD/AVX distance kernels, optimized
PQ table lookups (PQFastScan), GPU support, multi-threaded training on samples,
on-disk indexes, and combinations like HNSW-as-coarse-quantizer or
OPQ (a learned rotation before PQ that boosts recall). They share these exact
algorithms; they just run them far faster — see the
use-cases chapter, which maps each class here to its FAISS
equivalent.
Next: measure the whole thing. 👉