One-file CLI: KTS on a real video

Everything in this book, distilled into one self-contained script that runs on an actual video file and does a useful job: automatic shot detection + storyboard extraction (one keyframe per shot). The KTS core is plain NumPy, written from scratch; only video decoding uses OpenCV.

The steps

  1. Sample frames. Decode the video and keep ~1 frame per second.
  2. Describe each frame. Compute an HSV color histogram — a model-free numeric fingerprint of the frame.
  3. Build the kernel. Make the n × n similarity matrix K between frames (cosine by default).
  4. Cost table. Precompute every segment's within-segment scatter J[i,j] using cumulative sums (instant lookups).
  5. Segment. Dynamic programming (cpd_nonlin) finds the optimal cuts for a given count; cpd_auto adds a penalty and chooses the count for you.
  6. Map back & report. Convert sampled-frame cuts to real timestamps, print a shot table, and save one keyframe per shot.

Install & run

pip install numpy opencv-python

# auto-detect shots and save a keyframe per shot into ./keyframes/
python kts_video.py myvideo.mp4

# sample 2 fps, fewer shots (higher penalty), custom output dir
python kts_video.py myvideo.mp4 --fps 2 --vmax 1.5 --outdir shots

# force an exact number of shots
python kts_video.py myvideo.mp4 --num-shots 8

# just print the shot table, no images
python kts_video.py myvideo.mp4 --no-keyframes

What it prints

[1/4] decoding & sampling myvideo.mp4 at 1.0 fps ...
      183 frames sampled (4575 total, 25.00 fps source)
[2/4] building cosine kernel matrix (183x183) ...
[3/4] auto-selecting shots (vmax=1.0) ...
[4/4] found 7 shots:

  shot       start         end      dur
  --------------------------------------
     0     0:00:00.0     0:00:21.0    21.0s
     1     0:00:21.0     0:00:48.0    27.0s
     2     0:00:48.0     0:01:33.0    45.0s
     ...
saved 7 keyframes to ./keyframes/

You get a timestamped shot list plus keyframes/shot_000.jpg, shot_001.jpg, … — a ready-made storyboard, and exactly the preprocessing step video-summarization pipelines need.

The exact numbers above are illustrative (they depend on your video). The shot count adapts to content via the penalty; tune it with --vmax (higher → fewer shots) or pin it with --num-shots.

The complete script

#!/usr/bin/env python3
"""
kts_video.py — Kernel Temporal Segmentation on a real video, from scratch.

A single-file command-line tool that:
  1. decodes a video and samples frames (e.g. 1 per second),
  2. describes each sampled frame with a color histogram (no deep model needed),
  3. builds a kernel (similarity) matrix between frames,
  4. runs Kernel Temporal Segmentation to find shot boundaries,
  5. prints a shot table (start/end timestamps) and saves one keyframe per shot
     — i.e. an automatic storyboard / video-summary preprocessing step.

The KTS core (scatter table + dynamic programming + auto model selection) is
implemented here in plain NumPy. Only video decoding uses OpenCV.

Usage:
    python kts_video.py path/to/video.mp4
    python kts_video.py video.mp4 --fps 2 --vmax 1.0 --outdir shots
    python kts_video.py video.mp4 --num-shots 8        # force exactly 8 shots

Requirements:
    pip install numpy opencv-python
"""

from __future__ import annotations

import argparse
import os
import sys

import numpy as np


# ===========================================================================
# 1. Feature extraction — turn each sampled frame into a number vector
# ===========================================================================
def color_histogram(frame_bgr, bins=8):
    """
    Describe a frame by an HSV color histogram (a robust, model-free fingerprint).

    Returns a 1-D feature vector of length 3*bins, normalized to sum to 1.
    """
    import cv2

    hsv = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2HSV)
    h = np.histogram(hsv[:, :, 0], bins=bins, range=(0, 180))[0]
    s = np.histogram(hsv[:, :, 1], bins=bins, range=(0, 256))[0]
    v = np.histogram(hsv[:, :, 2], bins=bins, range=(0, 256))[0]
    feat = np.concatenate([h, s, v]).astype(np.float64)
    total = feat.sum()
    return feat / total if total > 0 else feat


def extract_features(path, target_fps=1.0, bins=8):
    """
    Decode `path`, sample ~target_fps frames per second, and return:
        X            : (n_sampled, 3*bins) feature matrix
        frame_index  : original frame number of each sampled frame
        src_fps      : the video's real frame rate
        n_total      : total number of frames in the video
    """
    import cv2

    cap = cv2.VideoCapture(path)
    if not cap.isOpened():
        raise SystemExit(f"error: could not open video {path!r}")

    src_fps = cap.get(cv2.CAP_PROP_FPS) or 25.0
    n_total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT) or 0)
    step = max(1, int(round(src_fps / max(target_fps, 1e-6))))

    feats, frame_index = [], []
    i = 0
    while True:
        ok, frame = cap.read()
        if not ok:
            break
        if i % step == 0:
            feats.append(color_histogram(frame, bins=bins))
            frame_index.append(i)
        i += 1
    cap.release()

    if len(feats) < 2:
        raise SystemExit("error: video too short / too few sampled frames")
    return np.array(feats), np.array(frame_index), src_fps, (n_total or i)


# ===========================================================================
# 2. Kernel (similarity) matrix
# ===========================================================================
def build_kernel(X, kind="cosine", eps=1e-8):
    """Similarity grid K[i, j] between every pair of frame fingerprints."""
    X = np.asarray(X, dtype=np.float64)
    if kind == "linear":
        return X @ X.T
    if kind == "cosine":
        norms = np.linalg.norm(X, axis=1, keepdims=True)
        Xn = X / np.maximum(norms, eps)
        return Xn @ Xn.T
    if kind == "rbf":
        sq = np.sum(X ** 2, axis=1)
        d2 = np.maximum(sq[:, None] + sq[None, :] - 2.0 * (X @ X.T), 0.0)
        pos = d2[d2 > 0]
        sigma = np.sqrt(0.5 * np.median(pos)) if pos.size else 1.0
        return np.exp(-d2 / (2.0 * max(sigma, eps) ** 2))
    raise ValueError(f"unknown kernel {kind!r}")


# ===========================================================================
# 3. Scatter (cost) table — within-segment variance for every segment, fast
# ===========================================================================
def calc_scatters(K):
    """J[i, j] = within-segment scatter of frames i..j, via prefix sums."""
    K = np.asarray(K, dtype=np.float64)
    n = K.shape[0]
    dcum = np.concatenate(([0.0], np.cumsum(np.diag(K))))
    Kcum = np.zeros((n + 1, n + 1))
    Kcum[1:, 1:] = np.cumsum(np.cumsum(K, axis=0), axis=1)

    J = np.zeros((n, n))
    for i in range(n):
        js = np.arange(i, n)
        L = js - i + 1
        diag_sum = dcum[js + 1] - dcum[i]
        block = (Kcum[js + 1, js + 1] - Kcum[i, js + 1]
                 - Kcum[js + 1, i] + Kcum[i, i])
        J[i, i:] = diag_sum - block / L
    return J


# ===========================================================================
# 4. Dynamic programming — optimal cuts for a fixed number of change points
# ===========================================================================
def cpd_nonlin(K, ncp, lmin=1, lmax=None, J=None):
    """Globally optimal segmentation with exactly `ncp` change points."""
    n = K.shape[0]
    m = int(ncp)
    if lmax is None:
        lmax = n
    if (m + 1) * lmin > n:
        raise ValueError("sequence too short for this many change points")
    if J is None:
        J = calc_scatters(K)

    INF = 1e18
    C = np.full((m + 1, n + 1), INF)
    back = np.zeros((m + 1, n + 1), dtype=int)
    for l in range(lmin, min(lmax, n) + 1):
        C[0, l] = J[0, l - 1]
    for k in range(1, m + 1):
        for l in range((k + 1) * lmin, n + 1):
            t_lo = max(k * lmin, l - lmax)
            t_hi = l - lmin
            if t_lo > t_hi:
                continue
            ts = np.arange(t_lo, t_hi + 1)
            cand = C[k - 1, ts] + J[ts, l - 1]
            idx = int(np.argmin(cand))
            C[k, l] = cand[idx]
            back[k, l] = ts[idx]

    cps = np.zeros(m, dtype=int)
    l = n
    for k in range(m, 0, -1):
        t = back[k, l]
        cps[k - 1] = t
        l = t
    return cps, float(C[m, n])


# ===========================================================================
# 5. Automatic model selection — let KTS choose the number of shots
# ===========================================================================
def cpd_auto(K, ncp_max, vmax=1.0, lmin=1, lmax=None):
    """Pick the number of change points by a penalized score; return cuts."""
    n = K.shape[0]
    ncp_max = min(ncp_max, n - 1)
    J = calc_scatters(K)
    best_score, best_cps = np.inf, np.array([], dtype=int)
    for m in range(0, ncp_max + 1):
        if (m + 1) * lmin > n:
            break
        cps, J_m = cpd_nonlin(K, m, lmin=lmin, lmax=lmax, J=J)
        penalty = 0.0 if m == 0 else (m / (2.0 * n)) * (np.log(n / m) + 1.0)
        score = J_m / n + vmax * penalty
        if score < best_score:
            best_score, best_cps = score, cps
    return best_cps


# ===========================================================================
# Helpers: segments, timestamps, keyframes
# ===========================================================================
def segments_from_cps(cps, n):
    """Turn change points into (start, end) index pairs covering 0..n-1."""
    bounds = [0] + list(cps) + [n]
    return [(bounds[i], bounds[i + 1] - 1) for i in range(len(bounds) - 1)]


def fmt_time(seconds):
    """Seconds -> H:MM:SS.s timestamp string."""
    h = int(seconds // 3600)
    m = int((seconds % 3600) // 60)
    s = seconds % 60
    return f"{h}:{m:02d}:{s:04.1f}"


def save_keyframes(path, segments, frame_index, outdir, max_side=480):
    """Save one representative frame (segment midpoint) per shot into outdir."""
    import cv2

    os.makedirs(outdir, exist_ok=True)
    cap = cv2.VideoCapture(path)
    saved = []
    for k, (a, b) in enumerate(segments):
        mid_sampled = (a + b) // 2
        src_frame = int(frame_index[mid_sampled])
        cap.set(cv2.CAP_PROP_POS_FRAMES, src_frame)
        ok, frame = cap.read()
        if not ok:
            continue
        h, w = frame.shape[:2]
        if max(h, w) > max_side:
            scale = max_side / max(h, w)
            frame = cv2.resize(frame, (int(w * scale), int(h * scale)))
        out = os.path.join(outdir, f"shot_{k:03d}.jpg")
        cv2.imwrite(out, frame)
        saved.append(out)
    cap.release()
    return saved


# ===========================================================================
# Command-line interface
# ===========================================================================
def main(argv=None):
    p = argparse.ArgumentParser(
        description="Detect shots in a video with Kernel Temporal Segmentation "
                    "and save one keyframe per shot (a storyboard).")
    p.add_argument("video", help="path to the input video file")
    p.add_argument("--fps", type=float, default=1.0,
                   help="frames sampled per second (default: 1)")
    p.add_argument("--vmax", type=float, default=1.0,
                   help="penalty weight; larger = fewer shots (default: 1.0)")
    p.add_argument("--num-shots", type=int, default=None,
                   help="force an exact number of shots (skips auto-selection)")
    p.add_argument("--max-shots", type=int, default=100,
                   help="upper bound for auto-selection (default: 100)")
    p.add_argument("--min-shot-sec", type=float, default=1.0,
                   help="minimum shot length in seconds (default: 1.0)")
    p.add_argument("--kernel", choices=["cosine", "linear", "rbf"],
                   default="cosine", help="similarity kernel (default: cosine)")
    p.add_argument("--bins", type=int, default=8,
                   help="histogram bins per channel (default: 8)")
    p.add_argument("--outdir", default="keyframes",
                   help="directory for keyframe images (default: keyframes)")
    p.add_argument("--no-keyframes", action="store_true",
                   help="only print the shot table; do not save images")
    args = p.parse_args(argv)

    # Step 1–2: features + kernel
    print(f"[1/4] decoding & sampling {args.video} at {args.fps} fps ...")
    X, frame_index, src_fps, n_total = extract_features(
        args.video, target_fps=args.fps, bins=args.bins)
    print(f"      {len(X)} frames sampled ({n_total} total, {src_fps:.2f} fps source)")

    print(f"[2/4] building {args.kernel} kernel matrix ({len(X)}x{len(X)}) ...")
    K = build_kernel(X, kind=args.kernel)

    # Step 3–5: segmentation
    sample_dt = (frame_index[1] - frame_index[0]) / src_fps if len(frame_index) > 1 else 1.0
    lmin = max(1, int(round(args.min_shot_sec / max(sample_dt, 1e-6))))
    if args.num_shots:
        print(f"[3/4] segmenting into exactly {args.num_shots} shots ...")
        cps, _ = cpd_nonlin(K, args.num_shots - 1, lmin=lmin)
    else:
        print(f"[3/4] auto-selecting shots (vmax={args.vmax}) ...")
        cps = cpd_auto(K, args.max_shots, vmax=args.vmax, lmin=lmin)

    segments = segments_from_cps(cps, len(X))

    # Report
    print(f"[4/4] found {len(segments)} shots:\n")
    print(f"  {'shot':>4}  {'start':>10}  {'end':>10}  {'dur':>7}")
    print("  " + "-" * 38)
    for k, (a, b) in enumerate(segments):
        t0 = frame_index[a] / src_fps
        t1 = (frame_index[b] / src_fps) + sample_dt
        print(f"  {k:>4}  {fmt_time(t0):>10}  {fmt_time(t1):>10}  {t1 - t0:>6.1f}s")

    if not args.no_keyframes:
        saved = save_keyframes(args.video, segments, frame_index, args.outdir)
        print(f"\nsaved {len(saved)} keyframes to ./{args.outdir}/")


if __name__ == "__main__":
    main()

That's the whole tool: decode → fingerprint → kernel → DP → timestamps, in one file you can read top to bottom.