The full implementation

Here is the consolidated module that ties Steps 1–4 together — distances, the graph, single-layer greedy search, multi-layer search, and insertion with the neighbor heuristic. It also includes brute_force_knn so we can measure accuracy.

The runnable file lives at code/hnsw.py in this book's folder, reproduced below in full.

"""
HNSW (Hierarchical Navigable Small World) — from scratch in Python + NumPy.

Reference:
    Yu. A. Malkov, D. A. Yashunin.
    "Efficient and robust approximate nearest neighbor search using
     Hierarchical Navigable Small World graphs." IEEE TPAMI, 2018.
    (arXiv:1603.09320)

This is the consolidated, readable version of the step-by-step derivation in the
accompanying mdBook. It implements:

    HNSW.add(vector)            -> insert a point, returns its id
    HNSW.search(query, k)       -> approximate k nearest neighbors

Distances supported: "euclidean" (squared) and "cosine" (vectors auto-normalized).
Only NumPy + the standard library are used.
"""

from __future__ import annotations

import heapq
import math
import random

import numpy as np


class HNSW:
    def __init__(self, dim, M=16, ef_construction=200, ef=50,
                 distance="euclidean", seed=42):
        self.dim = dim
        self.M = M                       # target neighbors per node
        self.Mmax = M                    # cap per node above layer 0
        self.Mmax0 = 2 * M               # cap per node at layer 0 (denser)
        self.ef_construction = ef_construction
        self.ef = ef                     # default search breadth
        self.mL = 1.0 / math.log(M)      # level-generation normalizer
        self.rng = random.Random(seed)
        self.distance_name = distance

        self.data = []                   # data[id] = vector
        self.graph = []                  # graph[level][id] = list of neighbor ids
        self.entry_point = None          # id of the current top-layer entry
        self.max_level = -1
        self.dist_count = 0              # distance evaluations (for analysis)

    # ----------------------------------------------------------------- distance
    def _dist(self, a, b):
        self.dist_count += 1
        if self.distance_name == "cosine":
            return 1.0 - float(np.dot(a, b))     # vectors are pre-normalized
        d = a - b
        return float(np.dot(d, d))               # squared Euclidean (monotonic)

    def _prepare(self, v):
        v = np.asarray(v, dtype=np.float64)
        if self.distance_name == "cosine":
            n = np.linalg.norm(v)
            if n > 0:
                v = v / n
        return v

    # ------------------------------------------------------------ search a layer
    def _search_layer(self, q, entry_ids, ef, level):
        """Greedy best-first search within a single layer.

        Returns up to `ef` nearest found, as a list of (distance, id).
        """
        visited = set(entry_ids)
        candidates = []                  # min-heap by distance (explore nearest)
        results = []                     # max-heap by distance (-d) of the best ef
        for e in entry_ids:
            d = self._dist(q, self.data[e])
            heapq.heappush(candidates, (d, e))
            heapq.heappush(results, (-d, e))

        while candidates:
            d_c, c = heapq.heappop(candidates)
            if d_c > -results[0][0]:      # nearest candidate worse than worst result
                break
            for nb in self.graph[level].get(c, []):
                if nb in visited:
                    continue
                visited.add(nb)
                d_nb = self._dist(q, self.data[nb])
                if d_nb < -results[0][0] or len(results) < ef:
                    heapq.heappush(candidates, (d_nb, nb))
                    heapq.heappush(results, (-d_nb, nb))
                    if len(results) > ef:
                        heapq.heappop(results)
        return [(-d, n) for d, n in results]

    # -------------------------------------------------- neighbor selection heuristic
    def _select_neighbors(self, base_vec, candidates, M):
        """Algorithm 4 (heuristic): pick diverse neighbors, not just the closest.

        Keep candidate e only if it is closer to the base point than to any
        neighbor already chosen — this avoids clustering all links in one
        direction and keeps the graph well-connected.
        """
        result = []
        for d_e, e in sorted(candidates):            # nearest first
            keep = True
            for r in result:
                if self._dist(self.data[e], self.data[r]) < d_e:
                    keep = False
                    break
            if keep:
                result.append(e)
            if len(result) >= M:
                break
        return result

    # ----------------------------------------------------------------- insertion
    def add(self, vector):
        v = self._prepare(vector)
        node = len(self.data)
        self.data.append(v)

        # random level: geometric-like, so few nodes reach the high layers
        level = int(-math.log(self.rng.random()) * self.mL)
        while len(self.graph) <= level:
            self.graph.append({})
        for l in range(level + 1):
            self.graph[l].setdefault(node, [])

        if self.entry_point is None:                 # first ever point
            self.entry_point = node
            self.max_level = level
            return node

        ep = [self.entry_point]
        L = self.max_level

        # Phase 1: descend from the top to just above the new node's level (ef=1).
        for l in range(L, level, -1):
            W = self._search_layer(v, ep, 1, l)
            ep = [min(W)[1]]

        # Phase 2: from the new node's level down to 0, connect it in.
        for l in range(min(L, level), -1, -1):
            W = self._search_layer(v, ep, self.ef_construction, l)
            neighbors = self._select_neighbors(v, W, self.M)
            self.graph[l][node] = list(neighbors)

            Mmax = self.Mmax0 if l == 0 else self.Mmax
            for nb in neighbors:                     # links are bidirectional
                self.graph[l].setdefault(nb, []).append(node)
                if len(self.graph[l][nb]) > Mmax:    # prune over-full neighbor
                    nb_vec = self.data[nb]
                    cand = [(self._dist(nb_vec, self.data[x]), x)
                            for x in self.graph[l][nb]]
                    self.graph[l][nb] = self._select_neighbors(nb_vec, cand, Mmax)
            ep = [n for _, n in W]

        if level > self.max_level:                   # new top of the hierarchy
            self.max_level = level
            self.entry_point = node
        return node

    # -------------------------------------------------------------------- search
    def search(self, query, k=5, ef=None):
        """Return the approximate k nearest neighbors as [(id, distance), ...]."""
        if self.entry_point is None:
            return []
        ef = ef or max(self.ef, k)
        q = self._prepare(query)

        ep = [self.entry_point]
        for l in range(self.max_level, 0, -1):       # greedily descend the hierarchy
            W = self._search_layer(q, ep, 1, l)
            ep = [min(W)[1]]
        W = self._search_layer(q, ep, ef, 0)         # wide search at the base layer
        W.sort()
        return [(n, d) for d, n in W[:k]]

    def __len__(self):
        return len(self.data)


# ------------------------------------------------------------------ brute force
def brute_force_knn(data, query, k=5, distance="euclidean"):
    """Exact nearest neighbors, for measuring recall against HNSW."""
    X = np.asarray(data, dtype=np.float64)
    q = np.asarray(query, dtype=np.float64)
    if distance == "cosine":
        X = X / np.maximum(np.linalg.norm(X, axis=1, keepdims=True), 1e-12)
        q = q / max(np.linalg.norm(q), 1e-12)
        d = 1.0 - X @ q
    else:
        d = np.sum((X - q) ** 2, axis=1)
    idx = np.argsort(d)[:k]
    return [(int(i), float(d[i])) for i in idx]


__all__ = ["HNSW", "brute_force_knn"]

The {{#include}} directive splices in the real code/hnsw.py at build time, so the documentation can never drift from the actual implementation.

API at a glance

CallPurpose
HNSW(dim, M, ef_construction, ef, distance, seed)Create an empty index.
index.add(vector)Insert one vector; returns its integer id.
index.search(query, k, ef)Approximate k nearest neighbors as [(id, distance), …].
brute_force_knn(data, query, k, distance)Exact answer, for measuring recall.

Typical usage

import numpy as np
from hnsw import HNSW

vectors = np.load("embeddings.npy")      # shape (n, d)

index = HNSW(dim=vectors.shape[1], M=16, ef_construction=200, distance="cosine")
for v in vectors:
    index.add(v)

query = vectors[0]
for rank, (i, dist) in enumerate(index.search(query, k=5, ef=64), 1):
    print(rank, i, round(dist, 4))

A note on dist_count

The class increments self.dist_count on every distance computation. Reset it before a query (index.dist_count = 0) to measure how much work a search did — we use this in the demo to compare HNSW's effort against brute force in a way that isn't skewed by NumPy's vectorization.

What this is and isn't

This implementation is written for clarity: it shows exactly how HNSW works and produces correct, high-recall results. Production libraries (FAISS, hnswlib, Lucene) add heavy optimization — C/C++ inner loops, SIMD distance kernels, memory-packed graphs, multi-threaded builds, deletions, and filtered search. They share this exact algorithm; they just run it far faster. (See the use-cases chapter.)

Next: let's measure how well it actually works. 👉