The full implementation

Here is the consolidated module that ties Steps 1–4 together. It's the same logic from the previous chapters, with the DP inner loops vectorized with NumPy for speed (the for t loop becomes a slice + argmin), and calc_scatters computed once and reused across every candidate $m$ in cpd_auto.

The runnable file lives at code/kts.py in this book's folder. It is reproduced below in full.

"""
Kernel Temporal Segmentation (KTS) — from scratch in NumPy.

Reference:
    Potapov, Douze, Harchaoui, Schmid.
    "Category-Specific Video Summarization." ECCV 2014.

This module is the consolidated, vectorized version of the step-by-step
derivation in the accompanying mdBook. Public functions:

    build_kernel(X, kind="cosine")  -> kernel/Gram matrix K
    calc_scatters(K)                -> n x n scatter (cost) table J
    cpd_nonlin(K, ncp, ...)         -> optimal segmentation for fixed m
    cpd_auto(K, ncp_max, ...)       -> auto-select the number of segments

All functions use only NumPy.
"""

from __future__ import annotations

import numpy as np


# --------------------------------------------------------------------------- #
# Step 1 — kernels
# --------------------------------------------------------------------------- #
def linear_kernel(X):
    """K[i, j] = <x_i, x_j>; reproduces ordinary Euclidean variance."""
    return X @ X.T


def cosine_kernel(X, eps=1e-8):
    """Scale-invariant kernel; a robust default for deep features."""
    norms = np.linalg.norm(X, axis=1, keepdims=True)
    Xn = X / np.maximum(norms, eps)
    return Xn @ Xn.T


def rbf_kernel(X, sigma=None):
    """Gaussian kernel exp(-||x_i - x_j||^2 / (2 sigma^2))."""
    sq = np.sum(X ** 2, axis=1)
    d2 = sq[:, None] + sq[None, :] - 2.0 * (X @ X.T)
    d2 = np.maximum(d2, 0.0)
    if sigma is None:
        pos = d2[d2 > 0]
        med = np.median(pos) if pos.size else 1.0
        sigma = np.sqrt(0.5 * med) if med > 0 else 1.0
    return np.exp(-d2 / (2.0 * sigma ** 2))


def build_kernel(X, kind="cosine", **kwargs):
    """Build a kernel matrix from an (n, d) feature array."""
    X = np.asarray(X, dtype=np.float64)
    if kind == "linear":
        return linear_kernel(X)
    if kind == "cosine":
        return cosine_kernel(X)
    if kind == "rbf":
        return rbf_kernel(X, **kwargs)
    raise ValueError(f"unknown kernel {kind!r}; use 'linear', 'cosine' or 'rbf'")


# --------------------------------------------------------------------------- #
# Step 2 — scatter (cost) matrix via cumulative sums
# --------------------------------------------------------------------------- #
def calc_scatters(K):
    """
    J[i, j] = within-segment scatter of frames i..j (inclusive):

        v(i, j) = sum_t K[t, t]  -  (1 / L) * sum_{s,t in [i, j]} K[s, t]

    where L = j - i + 1. Built in O(n^2) with prefix sums; entries with
    j < i are left at 0.
    """
    K = np.asarray(K, dtype=np.float64)
    n = K.shape[0]

    dcum = np.concatenate(([0.0], np.cumsum(np.diag(K))))        # diagonal prefix
    Kcum = np.zeros((n + 1, n + 1))
    Kcum[1:, 1:] = np.cumsum(np.cumsum(K, axis=0), axis=1)       # 2-D prefix

    J = np.zeros((n, n))
    for i in range(n):
        # vectorized over j = i .. n-1
        js = np.arange(i, n)
        L = js - i + 1
        diag_sum = dcum[js + 1] - dcum[i]
        block = (Kcum[js + 1, js + 1] - Kcum[i, js + 1]
                 - Kcum[js + 1, i] + Kcum[i, i])
        J[i, i:] = diag_sum - block / L
    return J


# --------------------------------------------------------------------------- #
# Step 3 — dynamic programming for a fixed number of change points
# --------------------------------------------------------------------------- #
def cpd_nonlin(K, ncp, lmin=1, lmax=None, J=None):
    """
    Globally optimal segmentation with exactly `ncp` change points.

    Returns
    -------
    cps  : (ncp,) int array of change points (start frame of each new segment).
    cost : total scatter of the optimal segmentation.
    """
    n = K.shape[0]
    m = int(ncp)
    if lmax is None:
        lmax = n
    if (m + 1) * lmin > n:
        raise ValueError("sequence too short for this many change points")

    if J is None:
        J = calc_scatters(K)

    INF = 1e18
    C = np.full((m + 1, n + 1), INF)         # C[k, l]: best cost, first l frames, k+1 segs
    back = np.zeros((m + 1, n + 1), dtype=int)

    # base row: single segment [0 .. l-1]
    for l in range(lmin, min(lmax, n) + 1):
        C[0, l] = J[0, l - 1]

    for k in range(1, m + 1):
        for l in range((k + 1) * lmin, n + 1):
            t_lo = max(k * lmin, l - lmax)
            t_hi = l - lmin                  # last segment length >= lmin
            if t_lo > t_hi:
                continue
            ts = np.arange(t_lo, t_hi + 1)
            cand = C[k - 1, ts] + J[ts, l - 1]
            idx = int(np.argmin(cand))
            C[k, l] = cand[idx]
            back[k, l] = ts[idx]

    # backtrack
    cps = np.zeros(m, dtype=int)
    l = n
    for k in range(m, 0, -1):
        t = back[k, l]
        cps[k - 1] = t
        l = t
    return cps, float(C[m, n])


# --------------------------------------------------------------------------- #
# Step 4 — automatic selection of the number of segments
# --------------------------------------------------------------------------- #
def cpd_auto(K, ncp_max, vmax=1.0, lmin=1, lmax=None):
    """
    Pick the number of change points by penalized model selection.

    Returns
    -------
    cps    : change points of the selected segmentation.
    scores : dict {m: penalized score} for inspection.
    """
    n = K.shape[0]
    ncp_max = min(ncp_max, n - 1)
    J = calc_scatters(K)                     # compute once, reuse for every m

    best_score = np.inf
    best_cps = np.array([], dtype=int)
    scores = {}

    for m in range(0, ncp_max + 1):
        if (m + 1) * lmin > n:
            break
        cps, J_m = cpd_nonlin(K, m, lmin=lmin, lmax=lmax, J=J)
        # Penalty grows with the *number of change points* m, independent of the
        # scatter. (A scatter-proportional penalty would vanish as J_m -> 0 and
        # never stop adding cuts.) This is the KTS model-selection criterion.
        penalty = 0.0 if m == 0 else (m / (2.0 * n)) * (np.log(n / m) + 1.0)
        score = J_m / n + vmax * penalty
        scores[m] = score
        if score < best_score:
            best_score = score
            best_cps = cps
    return best_cps, scores


__all__ = [
    "build_kernel", "linear_kernel", "cosine_kernel", "rbf_kernel",
    "calc_scatters", "cpd_nonlin", "cpd_auto",
]

The {{#include}} directive above is an mdBook feature: when the book is built, it splices in the real code/kts.py, so the documentation can never drift from the actual implementation.

API at a glance

FunctionPurpose
build_kernel(X, kind)Turn an (n, d) feature array into the kernel matrix K. kind ∈ {linear, cosine, rbf}.
calc_scatters(K)The n × n cost table J[i, j] = v(i, j).
cpd_nonlin(K, ncp, lmin, lmax)Optimal segmentation for a fixed number of change points.
cpd_auto(K, ncp_max, vmax, lmin)Auto-select the number of segments via a penalized score.

Typical usage

import numpy as np
from kts import build_kernel, cpd_auto

features = np.load("frame_features.npy")   # shape (n_frames, d)
K = build_kernel(features, kind="cosine")

# Let KTS decide how many shots there are:
change_points, scores = cpd_auto(K, ncp_max=30, vmax=1.0, lmin=5)

# change_points are start-frame indices of each new segment.
segments = np.split(np.arange(len(features)), change_points)
print(f"{len(segments)} segments")

A complete run, start to finish

This single example uses every function in order — build features, build the kernel, auto-segment, and list the resulting segments:

import numpy as np
from kts import build_kernel, cpd_auto

# 1. Fake "video": 12 frames in 3 groups of 4 (groups differ in direction).
rng = np.random.default_rng(1)
g1 = np.array([1, 0, 0, 0.0]) + rng.normal(scale=0.05, size=(4, 4))
g2 = np.array([0, 1, 0, 0.0]) + rng.normal(scale=0.05, size=(4, 4))
g3 = np.array([0, 0, 1, 0.0]) + rng.normal(scale=0.05, size=(4, 4))
X = np.vstack([g1, g2, g3])                 # shape (12, 4)

# 2. Similarity grid.
K = build_kernel(X, kind="cosine")

# 3. Let KTS choose the cuts.
cps, _ = cpd_auto(K, ncp_max=6, vmax=1.0, lmin=2)

# 4. Turn cuts into actual frame ranges.
segments = np.split(np.arange(len(X)), cps)
print("change points :", cps.tolist())
print("segments       :", [s.tolist() for s in segments])

Output:

change points : [4, 8]
segments       : [[0, 1, 2, 3], [4, 5, 6, 7], [8, 9, 10, 11]]

KTS recovers the three groups exactly — cuts at frames 4 and 8, splitting the 12 frames into 0–3, 4–7, 8–11. 🎯

In the final chapter we run the larger bundled demo and measure accuracy. 👉