The full implementation
Here is the consolidated module that ties Steps 1–4 together — distances, the
graph, single-layer greedy search, multi-layer search, and insertion with the
neighbor heuristic. It also includes brute_force_knn so we can measure accuracy.
The runnable file lives at code/hnsw.py in this book's folder, reproduced below
in full.
"""
HNSW (Hierarchical Navigable Small World) — from scratch in Python + NumPy.
Reference:
Yu. A. Malkov, D. A. Yashunin.
"Efficient and robust approximate nearest neighbor search using
Hierarchical Navigable Small World graphs." IEEE TPAMI, 2018.
(arXiv:1603.09320)
This is the consolidated, readable version of the step-by-step derivation in the
accompanying mdBook. It implements:
HNSW.add(vector) -> insert a point, returns its id
HNSW.search(query, k) -> approximate k nearest neighbors
Distances supported: "euclidean" (squared) and "cosine" (vectors auto-normalized).
Only NumPy + the standard library are used.
"""
from __future__ import annotations
import heapq
import math
import random
import numpy as np
class HNSW:
def __init__(self, dim, M=16, ef_construction=200, ef=50,
distance="euclidean", seed=42):
self.dim = dim
self.M = M # target neighbors per node
self.Mmax = M # cap per node above layer 0
self.Mmax0 = 2 * M # cap per node at layer 0 (denser)
self.ef_construction = ef_construction
self.ef = ef # default search breadth
self.mL = 1.0 / math.log(M) # level-generation normalizer
self.rng = random.Random(seed)
self.distance_name = distance
self.data = [] # data[id] = vector
self.graph = [] # graph[level][id] = list of neighbor ids
self.entry_point = None # id of the current top-layer entry
self.max_level = -1
self.dist_count = 0 # distance evaluations (for analysis)
# ----------------------------------------------------------------- distance
def _dist(self, a, b):
self.dist_count += 1
if self.distance_name == "cosine":
return 1.0 - float(np.dot(a, b)) # vectors are pre-normalized
d = a - b
return float(np.dot(d, d)) # squared Euclidean (monotonic)
def _prepare(self, v):
v = np.asarray(v, dtype=np.float64)
if self.distance_name == "cosine":
n = np.linalg.norm(v)
if n > 0:
v = v / n
return v
# ------------------------------------------------------------ search a layer
def _search_layer(self, q, entry_ids, ef, level):
"""Greedy best-first search within a single layer.
Returns up to `ef` nearest found, as a list of (distance, id).
"""
visited = set(entry_ids)
candidates = [] # min-heap by distance (explore nearest)
results = [] # max-heap by distance (-d) of the best ef
for e in entry_ids:
d = self._dist(q, self.data[e])
heapq.heappush(candidates, (d, e))
heapq.heappush(results, (-d, e))
while candidates:
d_c, c = heapq.heappop(candidates)
if d_c > -results[0][0]: # nearest candidate worse than worst result
break
for nb in self.graph[level].get(c, []):
if nb in visited:
continue
visited.add(nb)
d_nb = self._dist(q, self.data[nb])
if d_nb < -results[0][0] or len(results) < ef:
heapq.heappush(candidates, (d_nb, nb))
heapq.heappush(results, (-d_nb, nb))
if len(results) > ef:
heapq.heappop(results)
return [(-d, n) for d, n in results]
# -------------------------------------------------- neighbor selection heuristic
def _select_neighbors(self, base_vec, candidates, M):
"""Algorithm 4 (heuristic): pick diverse neighbors, not just the closest.
Keep candidate e only if it is closer to the base point than to any
neighbor already chosen — this avoids clustering all links in one
direction and keeps the graph well-connected.
"""
result = []
for d_e, e in sorted(candidates): # nearest first
keep = True
for r in result:
if self._dist(self.data[e], self.data[r]) < d_e:
keep = False
break
if keep:
result.append(e)
if len(result) >= M:
break
return result
# ----------------------------------------------------------------- insertion
def add(self, vector):
v = self._prepare(vector)
node = len(self.data)
self.data.append(v)
# random level: geometric-like, so few nodes reach the high layers
level = int(-math.log(self.rng.random()) * self.mL)
while len(self.graph) <= level:
self.graph.append({})
for l in range(level + 1):
self.graph[l].setdefault(node, [])
if self.entry_point is None: # first ever point
self.entry_point = node
self.max_level = level
return node
ep = [self.entry_point]
L = self.max_level
# Phase 1: descend from the top to just above the new node's level (ef=1).
for l in range(L, level, -1):
W = self._search_layer(v, ep, 1, l)
ep = [min(W)[1]]
# Phase 2: from the new node's level down to 0, connect it in.
for l in range(min(L, level), -1, -1):
W = self._search_layer(v, ep, self.ef_construction, l)
neighbors = self._select_neighbors(v, W, self.M)
self.graph[l][node] = list(neighbors)
Mmax = self.Mmax0 if l == 0 else self.Mmax
for nb in neighbors: # links are bidirectional
self.graph[l].setdefault(nb, []).append(node)
if len(self.graph[l][nb]) > Mmax: # prune over-full neighbor
nb_vec = self.data[nb]
cand = [(self._dist(nb_vec, self.data[x]), x)
for x in self.graph[l][nb]]
self.graph[l][nb] = self._select_neighbors(nb_vec, cand, Mmax)
ep = [n for _, n in W]
if level > self.max_level: # new top of the hierarchy
self.max_level = level
self.entry_point = node
return node
# -------------------------------------------------------------------- search
def search(self, query, k=5, ef=None):
"""Return the approximate k nearest neighbors as [(id, distance), ...]."""
if self.entry_point is None:
return []
ef = ef or max(self.ef, k)
q = self._prepare(query)
ep = [self.entry_point]
for l in range(self.max_level, 0, -1): # greedily descend the hierarchy
W = self._search_layer(q, ep, 1, l)
ep = [min(W)[1]]
W = self._search_layer(q, ep, ef, 0) # wide search at the base layer
W.sort()
return [(n, d) for d, n in W[:k]]
def __len__(self):
return len(self.data)
# ------------------------------------------------------------------ brute force
def brute_force_knn(data, query, k=5, distance="euclidean"):
"""Exact nearest neighbors, for measuring recall against HNSW."""
X = np.asarray(data, dtype=np.float64)
q = np.asarray(query, dtype=np.float64)
if distance == "cosine":
X = X / np.maximum(np.linalg.norm(X, axis=1, keepdims=True), 1e-12)
q = q / max(np.linalg.norm(q), 1e-12)
d = 1.0 - X @ q
else:
d = np.sum((X - q) ** 2, axis=1)
idx = np.argsort(d)[:k]
return [(int(i), float(d[i])) for i in idx]
__all__ = ["HNSW", "brute_force_knn"]
The
{{#include}}directive splices in the realcode/hnsw.pyat build time, so the documentation can never drift from the actual implementation.
API at a glance
| Call | Purpose |
|---|---|
HNSW(dim, M, ef_construction, ef, distance, seed) | Create an empty index. |
index.add(vector) | Insert one vector; returns its integer id. |
index.search(query, k, ef) | Approximate k nearest neighbors as [(id, distance), …]. |
brute_force_knn(data, query, k, distance) | Exact answer, for measuring recall. |
Typical usage
import numpy as np
from hnsw import HNSW
vectors = np.load("embeddings.npy") # shape (n, d)
index = HNSW(dim=vectors.shape[1], M=16, ef_construction=200, distance="cosine")
for v in vectors:
index.add(v)
query = vectors[0]
for rank, (i, dist) in enumerate(index.search(query, k=5, ef=64), 1):
print(rank, i, round(dist, 4))
A note on dist_count
The class increments self.dist_count on every distance computation. Reset it
before a query (index.dist_count = 0) to measure how much work a search did —
we use this in the demo to compare HNSW's effort against brute
force in a way that isn't skewed by NumPy's vectorization.
What this is and isn't
This implementation is written for clarity: it shows exactly how HNSW works
and produces correct, high-recall results. Production libraries (FAISS,
hnswlib, Lucene) add heavy optimization — C/C++ inner loops, SIMD distance
kernels, memory-packed graphs, multi-threaded builds, deletions, and filtered
search. They share this exact algorithm; they just run it far faster. (See the
use-cases chapter.)
Next: let's measure how well it actually works. 👉