AI 视频编辑自动化实战

FFmpeg 编排、场景检测、字幕生成与批量处理的工程化实践


一、自动化视频编辑的工程定位

传统视频编辑依赖 Premiere / DaVinci Resolve 等 GUI 工具,每个操作都需要人工执行。而在 AI 产品中,视频编辑是流水线的一环——需要程序化地完成裁剪、拼接、配字幕、加背景音乐等操作。

FFmpeg 是这个领域的事实标准。它不是"简单的命令行工具",而是一个完整的多媒体处理框架,理解它的核心概念对于构建可靠的视频 Pipeline 至关重要。

核心能力矩阵

能力 工具/技术 典型耗时 复杂度
视频裁剪/拼接 FFmpeg -ss -t / concat <5s
场景检测 PySceneDetect / FFmpeg 10-30s
字幕生成 Whisper + FFmpeg ASS 30-120s
背景音乐 Loudness normalization <10s
批量转码 FFmpeg + 并发调度 变化大
AI 风格化 Stable Diffusion + FFmpeg 极高

二、FFmpeg 核心概念与常用模式

2.1 滤镜图(Filter Graph)

FFmpeg 的强大之处在于其 filter graph 系统。理解 input -> filter -> output 的管道模型是写出正确命令的基础。

# 基本结构:-filter_complex 内定义完整的滤镜图
ffmpeg -i input.mp4 \
  -filter_complex "
    [0:v]scale=1920:1080[scaled];
    [scaled]drawtext=text='Hello':fontsize=48:x=100:y=100[out]
  " \
  -map "[out]" -map 0:a \
  output.mp4

2.2 常用操作的 Python 封装

# video_ops.py - FFmpeg operations wrapper
import subprocess
import json
from pathlib import Path
from dataclasses import dataclass


@dataclass
class VideoInfo:
    duration: float
    width: int
    height: int
    fps: float
    codec: str
    bitrate: int
    audio_codec: str | None


def probe(path: str) -> VideoInfo:
    """Get video metadata via ffprobe."""
    cmd = [
        'ffprobe', '-v', 'quiet',
        '-print_format', 'json',
        '-show_format', '-show_streams',
        path
    ]
    result = subprocess.run(cmd, capture_output=True, text=True, check=True)
    data = json.loads(result.stdout)

    video_stream = next(
        s for s in data['streams'] if s['codec_type'] == 'video'
    )
    audio_stream = next(
        (s for s in data['streams'] if s['codec_type'] == 'audio'), None
    )

    return VideoInfo(
        duration=float(data['format']['duration']),
        width=int(video_stream['width']),
        height=int(video_stream['height']),
        fps=eval(video_stream['r_frame_rate']),  # e.g., "30/1"
        codec=video_stream['codec_name'],
        bitrate=int(data['format'].get('bit_rate', 0)),
        audio_codec=audio_stream['codec_name'] if audio_stream else None,
    )


def trim(input_path: str, output_path: str,
         start: float, duration: float) -> None:
    """Trim video segment without re-encoding (fast)."""
    cmd = [
        'ffmpeg', '-y',
        '-ss', str(start),
        '-i', input_path,
        '-t', str(duration),
        '-c', 'copy',
        '-avoid_negative_ts', 'make_zero',
        output_path
    ]
    subprocess.run(cmd, check=True, capture_output=True)


def concat(segments: list[str], output_path: str) -> None:
    """Concatenate video segments using concat demuxer."""
    list_file = Path(output_path).with_suffix('.txt')
    list_file.write_text(
        '\n'.join(f"file '{s}'" for s in segments)
    )
    cmd = [
        'ffmpeg', '-y',
        '-f', 'concat', '-safe', '0',
        '-i', str(list_file),
        '-c', 'copy',
        output_path
    ]
    subprocess.run(cmd, check=True, capture_output=True)
    list_file.unlink()  # cleanup


def add_audio_track(
    video_path: str,
    audio_path: str,
    output_path: str,
    video_volume: float = 0.3,
    audio_volume: float = 1.0
) -> None:
    """Mix background audio with video's original audio."""
    cmd = [
        'ffmpeg', '-y',
        '-i', video_path,
        '-i', audio_path,
        '-filter_complex',
        f'[0:a]volume={video_volume}[va];'
        f'[1:a]volume={audio_volume}[ba];'
        f'[va][ba]amix=inputs=2:duration=first[out]',
        '-map', '0:v', '-map', '[out]',
        '-c:v', 'copy', '-c:a', 'aac', '-b:a', '192k',
        '-shortest',
        output_path
    ]
    subprocess.run(cmd, check=True, capture_output=True)

三、场景检测:从视频到结构化片段

场景检测(Scene Detection)是自动化编辑的第一步——把一段连续视频切分为有意义的场景片段。

3.1 PySceneDetect 集成

# scene_detection.py
from scenedetect import detect, ContentDetector, AdaptiveDetector
from scenedetect import open_video
from dataclasses import dataclass


@dataclass
class Scene:
    index: int
    start_time: float    # seconds
    end_time: float      # seconds
    duration: float      # seconds


def detect_scenes(
    video_path: str,
    method: str = 'content',  # 'content' or 'adaptive'
    threshold: float = 27.0,
    min_scene_len: float = 2.0,  # minimum scene duration in seconds
) -> list[Scene]:
    """
    Detect scene boundaries in a video.

    'content' detector: compares HSV histograms between consecutive frames.
    'adaptive' detector: uses rolling average for varying content.
    """
    video = open_video(video_path)

    if method == 'content':
        detector = ContentDetector(
            threshold=threshold,
            min_scene_len=int(min_scene_len * video.frame_rate),
        )
    elif method == 'adaptive':
        detector = AdaptiveDetector(
            adaptive_threshold=3.0,
            min_scene_len=int(min_scene_len * video.frame_rate),
        )
    else:
        raise ValueError(f"Unknown method: {method}")

    scene_list = detect(video_path, detector)

    scenes = []
    for i, (start, end) in enumerate(scene_list):
        start_sec = start.get_seconds()
        end_sec = end.get_seconds()
        scenes.append(Scene(
            index=i,
            start_time=start_sec,
            end_time=end_sec,
            duration=end_sec - start_sec,
        ))

    return scenes


def extract_scene_thumbnails(
    video_path: str,
    scenes: list[Scene],
    output_dir: str,
) -> list[str]:
    """Extract a representative frame from each scene."""
    thumbnails = []
    for scene in scenes:
        midpoint = (scene.start_time + scene.end_time) / 2
        output_path = f"{output_dir}/scene_{scene.index:03d}.jpg"

        cmd = [
            'ffmpeg', '-y',
            '-ss', str(midpoint),
            '-i', video_path,
            '-frames:v', '1',
            '-q:v', '2',
            output_path
        ]
        subprocess.run(cmd, check=True, capture_output=True)
        thumbnails.append(output_path)

    return thumbnails

3.2 场景检测的工程选择

场景 推荐方法 阈值建议
电影/纪录片 ContentDetector 27-30
快节奏剪辑/MV AdaptiveDetector 3.0 (adaptive)
演示/教学视频 ContentDetector 20-25 (更敏感)
监控画面 ContentDetector 40+ (低灵敏)

四、Whisper 字幕生成

OpenAI Whisper 是当前最强的开源语音识别模型。将其集成到视频编辑 Pipeline,可以自动生成精确时间戳的字幕。

4.1 完整字幕生成流程

# subtitle_gen.py
import whisper
import json
from pathlib import Path


def generate_subtitles(
    video_path: str,
    output_path: str,
    model_size: str = 'medium',  # tiny/base/small/medium/large
    language: str = 'zh',
    format: str = 'srt',  # 'srt' or 'ass'
) -> str:
    """Generate subtitles from video audio using Whisper."""

    # Step 1: Extract audio
    audio_path = Path(video_path).with_suffix('.wav')
    extract_audio(video_path, str(audio_path))

    # Step 2: Transcribe with Whisper
    model = whisper.load_model(model_size)
    result = model.transcribe(
        str(audio_path),
        language=language,
        word_timestamps=True,
        verbose=False,
    )

    # Step 3: Format output
    if format == 'srt':
        content = segments_to_srt(result['segments'])
    elif format == 'ass':
        content = segments_to_ass(result['segments'])
    else:
        raise ValueError(f"Unknown format: {format}")

    Path(output_path).write_text(content, encoding='utf-8')
    audio_path.unlink()  # cleanup

    return output_path


def extract_audio(video_path: str, audio_path: str) -> None:
    """Extract audio track as WAV for Whisper processing."""
    cmd = [
        'ffmpeg', '-y',
        '-i', video_path,
        '-vn',                   # no video
        '-acodec', 'pcm_s16le', # WAV format
        '-ar', '16000',          # 16kHz (Whisper optimal)
        '-ac', '1',              # mono
        audio_path
    ]
    subprocess.run(cmd, check=True, capture_output=True)


def segments_to_srt(segments: list[dict]) -> str:
    """Convert Whisper segments to SRT format."""
    lines = []
    for i, seg in enumerate(segments, 1):
        start = format_timestamp_srt(seg['start'])
        end = format_timestamp_srt(seg['end'])
        text = seg['text'].strip()
        lines.append(f"{i}\n{start} --> {end}\n{text}\n")
    return '\n'.join(lines)


def format_timestamp_srt(seconds: float) -> str:
    """Format seconds to SRT timestamp: HH:MM:SS,mmm"""
    h = int(seconds // 3600)
    m = int((seconds % 3600) // 60)
    s = int(seconds % 60)
    ms = int((seconds % 1) * 1000)
    return f"{h:02d}:{m:02d}:{s:02d},{ms:03d}"


def segments_to_ass(segments: list[dict]) -> str:
    """Convert Whisper segments to ASS format with styling."""
    header = """[Script Info]
Title: Auto-generated Subtitles
ScriptType: v4.00+
PlayResX: 1920
PlayResY: 1080

[V4+ Styles]
Format: Name, Fontname, Fontsize, PrimaryColour, SecondaryColour, OutlineColour, BackColour, Bold, Italic, Underline, StrikeOut, ScaleX, ScaleY, Spacing, Angle, BorderStyle, Outline, Shadow, Alignment, MarginL, MarginR, MarginV, Encoding
Style: Default,Source Han Sans CN,56,&H00FFFFFF,&H000000FF,&H00000000,&H80000000,-1,0,0,0,100,100,0,0,1,2.5,1,2,30,30,40,1

[Events]
Format: Layer, Start, End, Style, Name, MarginL, MarginR, MarginV, Effect, Text
"""
    events = []
    for seg in segments:
        start = format_timestamp_ass(seg['start'])
        end = format_timestamp_ass(seg['end'])
        text = seg['text'].strip()
        events.append(
            f"Dialogue: 0,{start},{end},Default,,0,0,0,,{text}"
        )

    return header + '\n'.join(events)


def format_timestamp_ass(seconds: float) -> str:
    """Format seconds to ASS timestamp: H:MM:SS.cc"""
    h = int(seconds // 3600)
    m = int((seconds % 3600) // 60)
    s = int(seconds % 60)
    cs = int((seconds % 1) * 100)
    return f"{h}:{m:02d}:{s:02d}.{cs:02d}"

4.2 烧录字幕到视频

def burn_subtitles(
    video_path: str,
    subtitle_path: str,
    output_path: str,
    style_override: str | None = None,
) -> None:
    """Burn subtitles into video (hardcoded)."""
    sub_filter = f"subtitles={subtitle_path}"
    if style_override:
        sub_filter += f":force_style='{style_override}'"

    cmd = [
        'ffmpeg', '-y',
        '-i', video_path,
        '-vf', sub_filter,
        '-c:v', 'libx264', '-preset', 'medium', '-crf', '23',
        '-c:a', 'copy',
        output_path
    ]
    subprocess.run(cmd, check=True, capture_output=True)

五、背景音乐自动化

5.1 音量标准化与混音

背景音乐需要遵循 EBU R128 响度标准,避免盖过旁白:

def normalize_loudness(
    audio_path: str,
    output_path: str,
    target_lufs: float = -23.0,  # EBU R128 standard
) -> None:
    """Normalize audio to target LUFS level."""
    # First pass: measure current loudness
    measure_cmd = [
        'ffmpeg', '-i', audio_path,
        '-af', f'loudnorm=I={target_lufs}:print_format=json',
        '-f', 'null', '/dev/null'
    ]
    result = subprocess.run(
        measure_cmd, capture_output=True, text=True
    )

    # Parse measurement from stderr
    stats = parse_loudnorm_stats(result.stderr)

    # Second pass: apply correction
    normalize_cmd = [
        'ffmpeg', '-y', '-i', audio_path,
        '-af', (
            f"loudnorm=I={target_lufs}:"
            f"measured_I={stats['input_i']}:"
            f"measured_TP={stats['input_tp']}:"
            f"measured_LRA={stats['input_lra']}:"
            f"measured_thresh={stats['input_thresh']}:"
            f"linear=true"
        ),
        '-ar', '48000',
        output_path
    ]
    subprocess.run(normalize_cmd, check=True, capture_output=True)


def auto_mix_bgm(
    video_path: str,
    bgm_path: str,
    output_path: str,
    bgm_volume_db: float = -18.0,
    ducking: bool = True,
) -> None:
    """
    Add background music with optional ducking.
    Ducking: lower BGM volume when speech is detected.
    """
    if ducking:
        # Use sidechaincompress for automatic ducking
        filter_complex = (
            f"[1:a]volume={bgm_volume_db}dB[bgm];"
            f"[bgm][0:a]sidechaincompress="
            f"threshold=0.02:ratio=6:attack=200:release=1000[ducked];"
            f"[0:a][ducked]amix=inputs=2:duration=first[out]"
        )
    else:
        filter_complex = (
            f"[0:a]volume=1.0[voice];"
            f"[1:a]volume={bgm_volume_db}dB[bgm];"
            f"[voice][bgm]amix=inputs=2:duration=first[out]"
        )

    cmd = [
        'ffmpeg', '-y',
        '-i', video_path,
        '-i', bgm_path,
        '-filter_complex', filter_complex,
        '-map', '0:v', '-map', '[out]',
        '-c:v', 'copy', '-c:a', 'aac', '-b:a', '192k',
        '-shortest',
        output_path
    ]
    subprocess.run(cmd, check=True, capture_output=True)

六、批量处理架构

当需要处理数十上百个视频时,单线程串行不可接受。但 FFmpeg 本身是 CPU 密集型的,盲目多进程会导致系统过载。

# batch_processor.py
import asyncio
import os
from concurrent.futures import ProcessPoolExecutor
from dataclasses import dataclass


@dataclass
class BatchJob:
    input_path: str
    output_path: str
    operations: list[str]  # e.g., ['trim:0:30', 'subtitle', 'bgm']
    status: str = 'pending'
    error: str | None = None


class BatchProcessor:
    def __init__(self, max_workers: int | None = None):
        # Default: half of CPU cores (FFmpeg uses multiple threads per job)
        self.max_workers = max_workers or max(1, os.cpu_count() // 2)
        self.executor = ProcessPoolExecutor(max_workers=self.max_workers)

    async def process_batch(
        self,
        jobs: list[BatchJob],
        on_progress: callable | None = None,
    ) -> list[BatchJob]:
        """Process a batch of video editing jobs."""
        loop = asyncio.get_event_loop()
        tasks = []
        completed = 0

        for job in jobs:
            task = loop.run_in_executor(
                self.executor,
                self._process_single,
                job,
            )
            tasks.append((job, task))

        for job, task in tasks:
            try:
                await task
                job.status = 'completed'
            except Exception as e:
                job.status = 'failed'
                job.error = str(e)

            completed += 1
            if on_progress:
                on_progress(completed, len(jobs))

        return jobs

    @staticmethod
    def _process_single(job: BatchJob) -> None:
        """Process a single video job (runs in subprocess)."""
        current_path = job.input_path

        for op in job.operations:
            op_name, *args = op.split(':')

            if op_name == 'trim':
                start, duration = float(args[0]), float(args[1])
                next_path = current_path + '.trimmed.mp4'
                trim(current_path, next_path, start, duration)
                current_path = next_path

            elif op_name == 'subtitle':
                sub_path = current_path + '.srt'
                generate_subtitles(current_path, sub_path)
                next_path = current_path + '.subtitled.mp4'
                burn_subtitles(current_path, sub_path, next_path)
                current_path = next_path

            elif op_name == 'bgm':
                bgm_file = args[0] if args else 'default_bgm.mp3'
                next_path = current_path + '.bgm.mp4'
                auto_mix_bgm(current_path, bgm_file, next_path)
                current_path = next_path

        # Move final result to output path
        os.rename(current_path, job.output_path)


# Usage
async def main():
    processor = BatchProcessor(max_workers=4)

    jobs = [
        BatchJob(
            input_path=f'raw/video_{i}.mp4',
            output_path=f'processed/video_{i}.mp4',
            operations=['trim:0:60', 'subtitle', 'bgm:music/ambient.mp3'],
        )
        for i in range(20)
    ]

    results = await processor.process_batch(
        jobs,
        on_progress=lambda done, total: print(f"Progress: {done}/{total}")
    )

    failed = [j for j in results if j.status == 'failed']
    print(f"Completed: {len(results) - len(failed)}/{len(results)}")
    for j in failed:
        print(f"  FAILED: {j.input_path} -> {j.error}")

七、完整编辑 Pipeline 示例

将以上所有模块组合成一条完整的自动化编辑 Pipeline:

输入视频
  |
  v
[场景检测] -> 切分为 N 个场景
  |
  v
[Whisper 转写] -> 生成 SRT/ASS 字幕
  |
  v
[字幕烧录] -> 字幕嵌入视频
  |
  v
[BGM 混音] -> 添加背景音乐 + ducking
  |
  v
[标准化输出] -> H.264 + AAC, 1080p, 标准响度
  |
  v
输出视频
async def auto_edit_pipeline(
    input_path: str,
    output_path: str,
    bgm_path: str | None = None,
    language: str = 'zh',
) -> dict:
    """Complete automated video editing pipeline."""
    info = probe(input_path)
    print(f"Input: {info.width}x{info.height}, {info.duration:.1f}s")

    # Step 1: Scene detection
    scenes = detect_scenes(input_path, method='content')
    print(f"Detected {len(scenes)} scenes")

    # Step 2: Generate subtitles
    srt_path = output_path.replace('.mp4', '.srt')
    generate_subtitles(input_path, srt_path, language=language)
    print(f"Subtitles generated: {srt_path}")

    # Step 3: Burn subtitles
    subtitled_path = output_path.replace('.mp4', '.sub.mp4')
    burn_subtitles(input_path, srt_path, subtitled_path)

    # Step 4: Add BGM (optional)
    if bgm_path:
        auto_mix_bgm(subtitled_path, bgm_path, output_path)
    else:
        os.rename(subtitled_path, output_path)

    result_info = probe(output_path)
    return {
        'scenes': len(scenes),
        'duration': result_info.duration,
        'size_mb': os.path.getsize(output_path) / (1024 * 1024),
        'subtitles': srt_path,
    }

八、生产环境注意事项

内存管理

FFmpeg 在处理高分辨率视频时内存消耗可观。特别是使用复杂 filter graph 或处理 4K 素材时:

  • 单个 FFmpeg 进程峰值可达 500MB-1GB
  • 并发 4 个 FFmpeg 进程需要预留 4GB+ 内存
  • PM2 / Docker 的内存限制必须留足余量

临时文件清理

Pipeline 中会产生大量临时文件(中间帧、临时音频、字幕文件)。必须在 Pipeline 正常完成和异常退出两种情况下都确保清理:

import tempfile
import shutil
from contextlib import contextmanager

@contextmanager
def temp_workspace():
    """Create and auto-cleanup a temporary workspace."""
    tmpdir = tempfile.mkdtemp(prefix='video_edit_')
    try:
        yield tmpdir
    finally:
        shutil.rmtree(tmpdir, ignore_errors=True)

FFmpeg 版本兼容

不同系统的 FFmpeg 版本差异可能导致参数不兼容。建议:

  • 项目中固定 FFmpeg 最低版本要求(如 5.0+)
  • 启动时检测 FFmpeg 版本并在日志中记录
  • 对关键特性(如 loudnorm)做版本检查

Maurice | maurice_wen@proton.me