One-file CLI: KTS on a real video
Everything in this book, distilled into one self-contained script that runs on an actual video file and does a useful job: automatic shot detection + storyboard extraction (one keyframe per shot). The KTS core is plain NumPy, written from scratch; only video decoding uses OpenCV.
The steps
- Sample frames. Decode the video and keep ~1 frame per second.
- Describe each frame. Compute an HSV color histogram — a model-free numeric fingerprint of the frame.
- Build the kernel. Make the
n × nsimilarity matrixKbetween frames (cosine by default). - Cost table. Precompute every segment's within-segment scatter
J[i,j]using cumulative sums (instant lookups). - Segment. Dynamic programming (
cpd_nonlin) finds the optimal cuts for a given count;cpd_autoadds a penalty and chooses the count for you. - Map back & report. Convert sampled-frame cuts to real timestamps, print a shot table, and save one keyframe per shot.
Install & run
pip install numpy opencv-python
# auto-detect shots and save a keyframe per shot into ./keyframes/
python kts_video.py myvideo.mp4
# sample 2 fps, fewer shots (higher penalty), custom output dir
python kts_video.py myvideo.mp4 --fps 2 --vmax 1.5 --outdir shots
# force an exact number of shots
python kts_video.py myvideo.mp4 --num-shots 8
# just print the shot table, no images
python kts_video.py myvideo.mp4 --no-keyframes
What it prints
[1/4] decoding & sampling myvideo.mp4 at 1.0 fps ...
183 frames sampled (4575 total, 25.00 fps source)
[2/4] building cosine kernel matrix (183x183) ...
[3/4] auto-selecting shots (vmax=1.0) ...
[4/4] found 7 shots:
shot start end dur
--------------------------------------
0 0:00:00.0 0:00:21.0 21.0s
1 0:00:21.0 0:00:48.0 27.0s
2 0:00:48.0 0:01:33.0 45.0s
...
saved 7 keyframes to ./keyframes/
You get a timestamped shot list plus keyframes/shot_000.jpg, shot_001.jpg, …
— a ready-made storyboard, and exactly the preprocessing step video-summarization
pipelines need.
The exact numbers above are illustrative (they depend on your video). The shot count adapts to content via the penalty; tune it with
--vmax(higher → fewer shots) or pin it with--num-shots.
The complete script
#!/usr/bin/env python3
"""
kts_video.py — Kernel Temporal Segmentation on a real video, from scratch.
A single-file command-line tool that:
1. decodes a video and samples frames (e.g. 1 per second),
2. describes each sampled frame with a color histogram (no deep model needed),
3. builds a kernel (similarity) matrix between frames,
4. runs Kernel Temporal Segmentation to find shot boundaries,
5. prints a shot table (start/end timestamps) and saves one keyframe per shot
— i.e. an automatic storyboard / video-summary preprocessing step.
The KTS core (scatter table + dynamic programming + auto model selection) is
implemented here in plain NumPy. Only video decoding uses OpenCV.
Usage:
python kts_video.py path/to/video.mp4
python kts_video.py video.mp4 --fps 2 --vmax 1.0 --outdir shots
python kts_video.py video.mp4 --num-shots 8 # force exactly 8 shots
Requirements:
pip install numpy opencv-python
"""
from __future__ import annotations
import argparse
import os
import sys
import numpy as np
# ===========================================================================
# 1. Feature extraction — turn each sampled frame into a number vector
# ===========================================================================
def color_histogram(frame_bgr, bins=8):
"""
Describe a frame by an HSV color histogram (a robust, model-free fingerprint).
Returns a 1-D feature vector of length 3*bins, normalized to sum to 1.
"""
import cv2
hsv = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2HSV)
h = np.histogram(hsv[:, :, 0], bins=bins, range=(0, 180))[0]
s = np.histogram(hsv[:, :, 1], bins=bins, range=(0, 256))[0]
v = np.histogram(hsv[:, :, 2], bins=bins, range=(0, 256))[0]
feat = np.concatenate([h, s, v]).astype(np.float64)
total = feat.sum()
return feat / total if total > 0 else feat
def extract_features(path, target_fps=1.0, bins=8):
"""
Decode `path`, sample ~target_fps frames per second, and return:
X : (n_sampled, 3*bins) feature matrix
frame_index : original frame number of each sampled frame
src_fps : the video's real frame rate
n_total : total number of frames in the video
"""
import cv2
cap = cv2.VideoCapture(path)
if not cap.isOpened():
raise SystemExit(f"error: could not open video {path!r}")
src_fps = cap.get(cv2.CAP_PROP_FPS) or 25.0
n_total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT) or 0)
step = max(1, int(round(src_fps / max(target_fps, 1e-6))))
feats, frame_index = [], []
i = 0
while True:
ok, frame = cap.read()
if not ok:
break
if i % step == 0:
feats.append(color_histogram(frame, bins=bins))
frame_index.append(i)
i += 1
cap.release()
if len(feats) < 2:
raise SystemExit("error: video too short / too few sampled frames")
return np.array(feats), np.array(frame_index), src_fps, (n_total or i)
# ===========================================================================
# 2. Kernel (similarity) matrix
# ===========================================================================
def build_kernel(X, kind="cosine", eps=1e-8):
"""Similarity grid K[i, j] between every pair of frame fingerprints."""
X = np.asarray(X, dtype=np.float64)
if kind == "linear":
return X @ X.T
if kind == "cosine":
norms = np.linalg.norm(X, axis=1, keepdims=True)
Xn = X / np.maximum(norms, eps)
return Xn @ Xn.T
if kind == "rbf":
sq = np.sum(X ** 2, axis=1)
d2 = np.maximum(sq[:, None] + sq[None, :] - 2.0 * (X @ X.T), 0.0)
pos = d2[d2 > 0]
sigma = np.sqrt(0.5 * np.median(pos)) if pos.size else 1.0
return np.exp(-d2 / (2.0 * max(sigma, eps) ** 2))
raise ValueError(f"unknown kernel {kind!r}")
# ===========================================================================
# 3. Scatter (cost) table — within-segment variance for every segment, fast
# ===========================================================================
def calc_scatters(K):
"""J[i, j] = within-segment scatter of frames i..j, via prefix sums."""
K = np.asarray(K, dtype=np.float64)
n = K.shape[0]
dcum = np.concatenate(([0.0], np.cumsum(np.diag(K))))
Kcum = np.zeros((n + 1, n + 1))
Kcum[1:, 1:] = np.cumsum(np.cumsum(K, axis=0), axis=1)
J = np.zeros((n, n))
for i in range(n):
js = np.arange(i, n)
L = js - i + 1
diag_sum = dcum[js + 1] - dcum[i]
block = (Kcum[js + 1, js + 1] - Kcum[i, js + 1]
- Kcum[js + 1, i] + Kcum[i, i])
J[i, i:] = diag_sum - block / L
return J
# ===========================================================================
# 4. Dynamic programming — optimal cuts for a fixed number of change points
# ===========================================================================
def cpd_nonlin(K, ncp, lmin=1, lmax=None, J=None):
"""Globally optimal segmentation with exactly `ncp` change points."""
n = K.shape[0]
m = int(ncp)
if lmax is None:
lmax = n
if (m + 1) * lmin > n:
raise ValueError("sequence too short for this many change points")
if J is None:
J = calc_scatters(K)
INF = 1e18
C = np.full((m + 1, n + 1), INF)
back = np.zeros((m + 1, n + 1), dtype=int)
for l in range(lmin, min(lmax, n) + 1):
C[0, l] = J[0, l - 1]
for k in range(1, m + 1):
for l in range((k + 1) * lmin, n + 1):
t_lo = max(k * lmin, l - lmax)
t_hi = l - lmin
if t_lo > t_hi:
continue
ts = np.arange(t_lo, t_hi + 1)
cand = C[k - 1, ts] + J[ts, l - 1]
idx = int(np.argmin(cand))
C[k, l] = cand[idx]
back[k, l] = ts[idx]
cps = np.zeros(m, dtype=int)
l = n
for k in range(m, 0, -1):
t = back[k, l]
cps[k - 1] = t
l = t
return cps, float(C[m, n])
# ===========================================================================
# 5. Automatic model selection — let KTS choose the number of shots
# ===========================================================================
def cpd_auto(K, ncp_max, vmax=1.0, lmin=1, lmax=None):
"""Pick the number of change points by a penalized score; return cuts."""
n = K.shape[0]
ncp_max = min(ncp_max, n - 1)
J = calc_scatters(K)
best_score, best_cps = np.inf, np.array([], dtype=int)
for m in range(0, ncp_max + 1):
if (m + 1) * lmin > n:
break
cps, J_m = cpd_nonlin(K, m, lmin=lmin, lmax=lmax, J=J)
penalty = 0.0 if m == 0 else (m / (2.0 * n)) * (np.log(n / m) + 1.0)
score = J_m / n + vmax * penalty
if score < best_score:
best_score, best_cps = score, cps
return best_cps
# ===========================================================================
# Helpers: segments, timestamps, keyframes
# ===========================================================================
def segments_from_cps(cps, n):
"""Turn change points into (start, end) index pairs covering 0..n-1."""
bounds = [0] + list(cps) + [n]
return [(bounds[i], bounds[i + 1] - 1) for i in range(len(bounds) - 1)]
def fmt_time(seconds):
"""Seconds -> H:MM:SS.s timestamp string."""
h = int(seconds // 3600)
m = int((seconds % 3600) // 60)
s = seconds % 60
return f"{h}:{m:02d}:{s:04.1f}"
def save_keyframes(path, segments, frame_index, outdir, max_side=480):
"""Save one representative frame (segment midpoint) per shot into outdir."""
import cv2
os.makedirs(outdir, exist_ok=True)
cap = cv2.VideoCapture(path)
saved = []
for k, (a, b) in enumerate(segments):
mid_sampled = (a + b) // 2
src_frame = int(frame_index[mid_sampled])
cap.set(cv2.CAP_PROP_POS_FRAMES, src_frame)
ok, frame = cap.read()
if not ok:
continue
h, w = frame.shape[:2]
if max(h, w) > max_side:
scale = max_side / max(h, w)
frame = cv2.resize(frame, (int(w * scale), int(h * scale)))
out = os.path.join(outdir, f"shot_{k:03d}.jpg")
cv2.imwrite(out, frame)
saved.append(out)
cap.release()
return saved
# ===========================================================================
# Command-line interface
# ===========================================================================
def main(argv=None):
p = argparse.ArgumentParser(
description="Detect shots in a video with Kernel Temporal Segmentation "
"and save one keyframe per shot (a storyboard).")
p.add_argument("video", help="path to the input video file")
p.add_argument("--fps", type=float, default=1.0,
help="frames sampled per second (default: 1)")
p.add_argument("--vmax", type=float, default=1.0,
help="penalty weight; larger = fewer shots (default: 1.0)")
p.add_argument("--num-shots", type=int, default=None,
help="force an exact number of shots (skips auto-selection)")
p.add_argument("--max-shots", type=int, default=100,
help="upper bound for auto-selection (default: 100)")
p.add_argument("--min-shot-sec", type=float, default=1.0,
help="minimum shot length in seconds (default: 1.0)")
p.add_argument("--kernel", choices=["cosine", "linear", "rbf"],
default="cosine", help="similarity kernel (default: cosine)")
p.add_argument("--bins", type=int, default=8,
help="histogram bins per channel (default: 8)")
p.add_argument("--outdir", default="keyframes",
help="directory for keyframe images (default: keyframes)")
p.add_argument("--no-keyframes", action="store_true",
help="only print the shot table; do not save images")
args = p.parse_args(argv)
# Step 1–2: features + kernel
print(f"[1/4] decoding & sampling {args.video} at {args.fps} fps ...")
X, frame_index, src_fps, n_total = extract_features(
args.video, target_fps=args.fps, bins=args.bins)
print(f" {len(X)} frames sampled ({n_total} total, {src_fps:.2f} fps source)")
print(f"[2/4] building {args.kernel} kernel matrix ({len(X)}x{len(X)}) ...")
K = build_kernel(X, kind=args.kernel)
# Step 3–5: segmentation
sample_dt = (frame_index[1] - frame_index[0]) / src_fps if len(frame_index) > 1 else 1.0
lmin = max(1, int(round(args.min_shot_sec / max(sample_dt, 1e-6))))
if args.num_shots:
print(f"[3/4] segmenting into exactly {args.num_shots} shots ...")
cps, _ = cpd_nonlin(K, args.num_shots - 1, lmin=lmin)
else:
print(f"[3/4] auto-selecting shots (vmax={args.vmax}) ...")
cps = cpd_auto(K, args.max_shots, vmax=args.vmax, lmin=lmin)
segments = segments_from_cps(cps, len(X))
# Report
print(f"[4/4] found {len(segments)} shots:\n")
print(f" {'shot':>4} {'start':>10} {'end':>10} {'dur':>7}")
print(" " + "-" * 38)
for k, (a, b) in enumerate(segments):
t0 = frame_index[a] / src_fps
t1 = (frame_index[b] / src_fps) + sample_dt
print(f" {k:>4} {fmt_time(t0):>10} {fmt_time(t1):>10} {t1 - t0:>6.1f}s")
if not args.no_keyframes:
saved = save_keyframes(args.video, segments, frame_index, args.outdir)
print(f"\nsaved {len(saved)} keyframes to ./{args.outdir}/")
if __name__ == "__main__":
main()
That's the whole tool: decode → fingerprint → kernel → DP → timestamps, in one file you can read top to bottom.