The full implementation
Here is the consolidated module that ties Steps 1–4 together. It's the same logic
from the previous chapters, with the DP inner loops vectorized with NumPy for
speed (the for t loop becomes a slice + argmin), and calc_scatters computed
once and reused across every candidate $m$ in cpd_auto.
The runnable file lives at code/kts.py in this book's
folder. It is reproduced below in full.
"""
Kernel Temporal Segmentation (KTS) — from scratch in NumPy.
Reference:
Potapov, Douze, Harchaoui, Schmid.
"Category-Specific Video Summarization." ECCV 2014.
This module is the consolidated, vectorized version of the step-by-step
derivation in the accompanying mdBook. Public functions:
build_kernel(X, kind="cosine") -> kernel/Gram matrix K
calc_scatters(K) -> n x n scatter (cost) table J
cpd_nonlin(K, ncp, ...) -> optimal segmentation for fixed m
cpd_auto(K, ncp_max, ...) -> auto-select the number of segments
All functions use only NumPy.
"""
from __future__ import annotations
import numpy as np
# --------------------------------------------------------------------------- #
# Step 1 — kernels
# --------------------------------------------------------------------------- #
def linear_kernel(X):
"""K[i, j] = <x_i, x_j>; reproduces ordinary Euclidean variance."""
return X @ X.T
def cosine_kernel(X, eps=1e-8):
"""Scale-invariant kernel; a robust default for deep features."""
norms = np.linalg.norm(X, axis=1, keepdims=True)
Xn = X / np.maximum(norms, eps)
return Xn @ Xn.T
def rbf_kernel(X, sigma=None):
"""Gaussian kernel exp(-||x_i - x_j||^2 / (2 sigma^2))."""
sq = np.sum(X ** 2, axis=1)
d2 = sq[:, None] + sq[None, :] - 2.0 * (X @ X.T)
d2 = np.maximum(d2, 0.0)
if sigma is None:
pos = d2[d2 > 0]
med = np.median(pos) if pos.size else 1.0
sigma = np.sqrt(0.5 * med) if med > 0 else 1.0
return np.exp(-d2 / (2.0 * sigma ** 2))
def build_kernel(X, kind="cosine", **kwargs):
"""Build a kernel matrix from an (n, d) feature array."""
X = np.asarray(X, dtype=np.float64)
if kind == "linear":
return linear_kernel(X)
if kind == "cosine":
return cosine_kernel(X)
if kind == "rbf":
return rbf_kernel(X, **kwargs)
raise ValueError(f"unknown kernel {kind!r}; use 'linear', 'cosine' or 'rbf'")
# --------------------------------------------------------------------------- #
# Step 2 — scatter (cost) matrix via cumulative sums
# --------------------------------------------------------------------------- #
def calc_scatters(K):
"""
J[i, j] = within-segment scatter of frames i..j (inclusive):
v(i, j) = sum_t K[t, t] - (1 / L) * sum_{s,t in [i, j]} K[s, t]
where L = j - i + 1. Built in O(n^2) with prefix sums; entries with
j < i are left at 0.
"""
K = np.asarray(K, dtype=np.float64)
n = K.shape[0]
dcum = np.concatenate(([0.0], np.cumsum(np.diag(K)))) # diagonal prefix
Kcum = np.zeros((n + 1, n + 1))
Kcum[1:, 1:] = np.cumsum(np.cumsum(K, axis=0), axis=1) # 2-D prefix
J = np.zeros((n, n))
for i in range(n):
# vectorized over j = i .. n-1
js = np.arange(i, n)
L = js - i + 1
diag_sum = dcum[js + 1] - dcum[i]
block = (Kcum[js + 1, js + 1] - Kcum[i, js + 1]
- Kcum[js + 1, i] + Kcum[i, i])
J[i, i:] = diag_sum - block / L
return J
# --------------------------------------------------------------------------- #
# Step 3 — dynamic programming for a fixed number of change points
# --------------------------------------------------------------------------- #
def cpd_nonlin(K, ncp, lmin=1, lmax=None, J=None):
"""
Globally optimal segmentation with exactly `ncp` change points.
Returns
-------
cps : (ncp,) int array of change points (start frame of each new segment).
cost : total scatter of the optimal segmentation.
"""
n = K.shape[0]
m = int(ncp)
if lmax is None:
lmax = n
if (m + 1) * lmin > n:
raise ValueError("sequence too short for this many change points")
if J is None:
J = calc_scatters(K)
INF = 1e18
C = np.full((m + 1, n + 1), INF) # C[k, l]: best cost, first l frames, k+1 segs
back = np.zeros((m + 1, n + 1), dtype=int)
# base row: single segment [0 .. l-1]
for l in range(lmin, min(lmax, n) + 1):
C[0, l] = J[0, l - 1]
for k in range(1, m + 1):
for l in range((k + 1) * lmin, n + 1):
t_lo = max(k * lmin, l - lmax)
t_hi = l - lmin # last segment length >= lmin
if t_lo > t_hi:
continue
ts = np.arange(t_lo, t_hi + 1)
cand = C[k - 1, ts] + J[ts, l - 1]
idx = int(np.argmin(cand))
C[k, l] = cand[idx]
back[k, l] = ts[idx]
# backtrack
cps = np.zeros(m, dtype=int)
l = n
for k in range(m, 0, -1):
t = back[k, l]
cps[k - 1] = t
l = t
return cps, float(C[m, n])
# --------------------------------------------------------------------------- #
# Step 4 — automatic selection of the number of segments
# --------------------------------------------------------------------------- #
def cpd_auto(K, ncp_max, vmax=1.0, lmin=1, lmax=None):
"""
Pick the number of change points by penalized model selection.
Returns
-------
cps : change points of the selected segmentation.
scores : dict {m: penalized score} for inspection.
"""
n = K.shape[0]
ncp_max = min(ncp_max, n - 1)
J = calc_scatters(K) # compute once, reuse for every m
best_score = np.inf
best_cps = np.array([], dtype=int)
scores = {}
for m in range(0, ncp_max + 1):
if (m + 1) * lmin > n:
break
cps, J_m = cpd_nonlin(K, m, lmin=lmin, lmax=lmax, J=J)
# Penalty grows with the *number of change points* m, independent of the
# scatter. (A scatter-proportional penalty would vanish as J_m -> 0 and
# never stop adding cuts.) This is the KTS model-selection criterion.
penalty = 0.0 if m == 0 else (m / (2.0 * n)) * (np.log(n / m) + 1.0)
score = J_m / n + vmax * penalty
scores[m] = score
if score < best_score:
best_score = score
best_cps = cps
return best_cps, scores
__all__ = [
"build_kernel", "linear_kernel", "cosine_kernel", "rbf_kernel",
"calc_scatters", "cpd_nonlin", "cpd_auto",
]
The
{{#include}}directive above is an mdBook feature: when the book is built, it splices in the realcode/kts.py, so the documentation can never drift from the actual implementation.
API at a glance
| Function | Purpose |
|---|---|
build_kernel(X, kind) | Turn an (n, d) feature array into the kernel matrix K. kind ∈ {linear, cosine, rbf}. |
calc_scatters(K) | The n × n cost table J[i, j] = v(i, j). |
cpd_nonlin(K, ncp, lmin, lmax) | Optimal segmentation for a fixed number of change points. |
cpd_auto(K, ncp_max, vmax, lmin) | Auto-select the number of segments via a penalized score. |
Typical usage
import numpy as np
from kts import build_kernel, cpd_auto
features = np.load("frame_features.npy") # shape (n_frames, d)
K = build_kernel(features, kind="cosine")
# Let KTS decide how many shots there are:
change_points, scores = cpd_auto(K, ncp_max=30, vmax=1.0, lmin=5)
# change_points are start-frame indices of each new segment.
segments = np.split(np.arange(len(features)), change_points)
print(f"{len(segments)} segments")
A complete run, start to finish
This single example uses every function in order — build features, build the kernel, auto-segment, and list the resulting segments:
import numpy as np
from kts import build_kernel, cpd_auto
# 1. Fake "video": 12 frames in 3 groups of 4 (groups differ in direction).
rng = np.random.default_rng(1)
g1 = np.array([1, 0, 0, 0.0]) + rng.normal(scale=0.05, size=(4, 4))
g2 = np.array([0, 1, 0, 0.0]) + rng.normal(scale=0.05, size=(4, 4))
g3 = np.array([0, 0, 1, 0.0]) + rng.normal(scale=0.05, size=(4, 4))
X = np.vstack([g1, g2, g3]) # shape (12, 4)
# 2. Similarity grid.
K = build_kernel(X, kind="cosine")
# 3. Let KTS choose the cuts.
cps, _ = cpd_auto(K, ncp_max=6, vmax=1.0, lmin=2)
# 4. Turn cuts into actual frame ranges.
segments = np.split(np.arange(len(X)), cps)
print("change points :", cps.tolist())
print("segments :", [s.tolist() for s in segments])
Output:
change points : [4, 8]
segments : [[0, 1, 2, 3], [4, 5, 6, 7], [8, 9, 10, 11]]
KTS recovers the three groups exactly — cuts at frames 4 and 8, splitting the
12 frames into 0–3, 4–7, 8–11. 🎯
In the final chapter we run the larger bundled demo and measure accuracy. 👉