AI 视频编辑自动化实战
AI 导读
AI 视频编辑自动化实战 FFmpeg 编排、场景检测、字幕生成与批量处理的工程化实践 一、自动化视频编辑的工程定位 传统视频编辑依赖 Premiere / DaVinci Resolve 等 GUI 工具,每个操作都需要人工执行。而在 AI 产品中,视频编辑是流水线的一环——需要程序化地完成裁剪、拼接、配字幕、加背景音乐等操作。 FFmpeg...
AI 视频编辑自动化实战
FFmpeg 编排、场景检测、字幕生成与批量处理的工程化实践
一、自动化视频编辑的工程定位
传统视频编辑依赖 Premiere / DaVinci Resolve 等 GUI 工具,每个操作都需要人工执行。而在 AI 产品中,视频编辑是流水线的一环——需要程序化地完成裁剪、拼接、配字幕、加背景音乐等操作。
FFmpeg 是这个领域的事实标准。它不是"简单的命令行工具",而是一个完整的多媒体处理框架,理解它的核心概念对于构建可靠的视频 Pipeline 至关重要。
核心能力矩阵
| 能力 | 工具/技术 | 典型耗时 | 复杂度 |
|---|---|---|---|
| 视频裁剪/拼接 | FFmpeg -ss -t / concat |
<5s | 低 |
| 场景检测 | PySceneDetect / FFmpeg | 10-30s | 中 |
| 字幕生成 | Whisper + FFmpeg ASS | 30-120s | 中 |
| 背景音乐 | Loudness normalization | <10s | 低 |
| 批量转码 | FFmpeg + 并发调度 | 变化大 | 高 |
| AI 风格化 | Stable Diffusion + FFmpeg | 极高 | 高 |
二、FFmpeg 核心概念与常用模式
2.1 滤镜图(Filter Graph)
FFmpeg 的强大之处在于其 filter graph 系统。理解 input -> filter -> output 的管道模型是写出正确命令的基础。
# 基本结构:-filter_complex 内定义完整的滤镜图
ffmpeg -i input.mp4 \
-filter_complex "
[0:v]scale=1920:1080[scaled];
[scaled]drawtext=text='Hello':fontsize=48:x=100:y=100[out]
" \
-map "[out]" -map 0:a \
output.mp4
2.2 常用操作的 Python 封装
# video_ops.py - FFmpeg operations wrapper
import subprocess
import json
from pathlib import Path
from dataclasses import dataclass
@dataclass
class VideoInfo:
duration: float
width: int
height: int
fps: float
codec: str
bitrate: int
audio_codec: str | None
def probe(path: str) -> VideoInfo:
"""Get video metadata via ffprobe."""
cmd = [
'ffprobe', '-v', 'quiet',
'-print_format', 'json',
'-show_format', '-show_streams',
path
]
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
data = json.loads(result.stdout)
video_stream = next(
s for s in data['streams'] if s['codec_type'] == 'video'
)
audio_stream = next(
(s for s in data['streams'] if s['codec_type'] == 'audio'), None
)
return VideoInfo(
duration=float(data['format']['duration']),
width=int(video_stream['width']),
height=int(video_stream['height']),
fps=eval(video_stream['r_frame_rate']), # e.g., "30/1"
codec=video_stream['codec_name'],
bitrate=int(data['format'].get('bit_rate', 0)),
audio_codec=audio_stream['codec_name'] if audio_stream else None,
)
def trim(input_path: str, output_path: str,
start: float, duration: float) -> None:
"""Trim video segment without re-encoding (fast)."""
cmd = [
'ffmpeg', '-y',
'-ss', str(start),
'-i', input_path,
'-t', str(duration),
'-c', 'copy',
'-avoid_negative_ts', 'make_zero',
output_path
]
subprocess.run(cmd, check=True, capture_output=True)
def concat(segments: list[str], output_path: str) -> None:
"""Concatenate video segments using concat demuxer."""
list_file = Path(output_path).with_suffix('.txt')
list_file.write_text(
'\n'.join(f"file '{s}'" for s in segments)
)
cmd = [
'ffmpeg', '-y',
'-f', 'concat', '-safe', '0',
'-i', str(list_file),
'-c', 'copy',
output_path
]
subprocess.run(cmd, check=True, capture_output=True)
list_file.unlink() # cleanup
def add_audio_track(
video_path: str,
audio_path: str,
output_path: str,
video_volume: float = 0.3,
audio_volume: float = 1.0
) -> None:
"""Mix background audio with video's original audio."""
cmd = [
'ffmpeg', '-y',
'-i', video_path,
'-i', audio_path,
'-filter_complex',
f'[0:a]volume={video_volume}[va];'
f'[1:a]volume={audio_volume}[ba];'
f'[va][ba]amix=inputs=2:duration=first[out]',
'-map', '0:v', '-map', '[out]',
'-c:v', 'copy', '-c:a', 'aac', '-b:a', '192k',
'-shortest',
output_path
]
subprocess.run(cmd, check=True, capture_output=True)
三、场景检测:从视频到结构化片段
场景检测(Scene Detection)是自动化编辑的第一步——把一段连续视频切分为有意义的场景片段。
3.1 PySceneDetect 集成
# scene_detection.py
from scenedetect import detect, ContentDetector, AdaptiveDetector
from scenedetect import open_video
from dataclasses import dataclass
@dataclass
class Scene:
index: int
start_time: float # seconds
end_time: float # seconds
duration: float # seconds
def detect_scenes(
video_path: str,
method: str = 'content', # 'content' or 'adaptive'
threshold: float = 27.0,
min_scene_len: float = 2.0, # minimum scene duration in seconds
) -> list[Scene]:
"""
Detect scene boundaries in a video.
'content' detector: compares HSV histograms between consecutive frames.
'adaptive' detector: uses rolling average for varying content.
"""
video = open_video(video_path)
if method == 'content':
detector = ContentDetector(
threshold=threshold,
min_scene_len=int(min_scene_len * video.frame_rate),
)
elif method == 'adaptive':
detector = AdaptiveDetector(
adaptive_threshold=3.0,
min_scene_len=int(min_scene_len * video.frame_rate),
)
else:
raise ValueError(f"Unknown method: {method}")
scene_list = detect(video_path, detector)
scenes = []
for i, (start, end) in enumerate(scene_list):
start_sec = start.get_seconds()
end_sec = end.get_seconds()
scenes.append(Scene(
index=i,
start_time=start_sec,
end_time=end_sec,
duration=end_sec - start_sec,
))
return scenes
def extract_scene_thumbnails(
video_path: str,
scenes: list[Scene],
output_dir: str,
) -> list[str]:
"""Extract a representative frame from each scene."""
thumbnails = []
for scene in scenes:
midpoint = (scene.start_time + scene.end_time) / 2
output_path = f"{output_dir}/scene_{scene.index:03d}.jpg"
cmd = [
'ffmpeg', '-y',
'-ss', str(midpoint),
'-i', video_path,
'-frames:v', '1',
'-q:v', '2',
output_path
]
subprocess.run(cmd, check=True, capture_output=True)
thumbnails.append(output_path)
return thumbnails
3.2 场景检测的工程选择
| 场景 | 推荐方法 | 阈值建议 |
|---|---|---|
| 电影/纪录片 | ContentDetector | 27-30 |
| 快节奏剪辑/MV | AdaptiveDetector | 3.0 (adaptive) |
| 演示/教学视频 | ContentDetector | 20-25 (更敏感) |
| 监控画面 | ContentDetector | 40+ (低灵敏) |
四、Whisper 字幕生成
OpenAI Whisper 是当前最强的开源语音识别模型。将其集成到视频编辑 Pipeline,可以自动生成精确时间戳的字幕。
4.1 完整字幕生成流程
# subtitle_gen.py
import whisper
import json
from pathlib import Path
def generate_subtitles(
video_path: str,
output_path: str,
model_size: str = 'medium', # tiny/base/small/medium/large
language: str = 'zh',
format: str = 'srt', # 'srt' or 'ass'
) -> str:
"""Generate subtitles from video audio using Whisper."""
# Step 1: Extract audio
audio_path = Path(video_path).with_suffix('.wav')
extract_audio(video_path, str(audio_path))
# Step 2: Transcribe with Whisper
model = whisper.load_model(model_size)
result = model.transcribe(
str(audio_path),
language=language,
word_timestamps=True,
verbose=False,
)
# Step 3: Format output
if format == 'srt':
content = segments_to_srt(result['segments'])
elif format == 'ass':
content = segments_to_ass(result['segments'])
else:
raise ValueError(f"Unknown format: {format}")
Path(output_path).write_text(content, encoding='utf-8')
audio_path.unlink() # cleanup
return output_path
def extract_audio(video_path: str, audio_path: str) -> None:
"""Extract audio track as WAV for Whisper processing."""
cmd = [
'ffmpeg', '-y',
'-i', video_path,
'-vn', # no video
'-acodec', 'pcm_s16le', # WAV format
'-ar', '16000', # 16kHz (Whisper optimal)
'-ac', '1', # mono
audio_path
]
subprocess.run(cmd, check=True, capture_output=True)
def segments_to_srt(segments: list[dict]) -> str:
"""Convert Whisper segments to SRT format."""
lines = []
for i, seg in enumerate(segments, 1):
start = format_timestamp_srt(seg['start'])
end = format_timestamp_srt(seg['end'])
text = seg['text'].strip()
lines.append(f"{i}\n{start} --> {end}\n{text}\n")
return '\n'.join(lines)
def format_timestamp_srt(seconds: float) -> str:
"""Format seconds to SRT timestamp: HH:MM:SS,mmm"""
h = int(seconds // 3600)
m = int((seconds % 3600) // 60)
s = int(seconds % 60)
ms = int((seconds % 1) * 1000)
return f"{h:02d}:{m:02d}:{s:02d},{ms:03d}"
def segments_to_ass(segments: list[dict]) -> str:
"""Convert Whisper segments to ASS format with styling."""
header = """[Script Info]
Title: Auto-generated Subtitles
ScriptType: v4.00+
PlayResX: 1920
PlayResY: 1080
[V4+ Styles]
Format: Name, Fontname, Fontsize, PrimaryColour, SecondaryColour, OutlineColour, BackColour, Bold, Italic, Underline, StrikeOut, ScaleX, ScaleY, Spacing, Angle, BorderStyle, Outline, Shadow, Alignment, MarginL, MarginR, MarginV, Encoding
Style: Default,Source Han Sans CN,56,&H00FFFFFF,&H000000FF,&H00000000,&H80000000,-1,0,0,0,100,100,0,0,1,2.5,1,2,30,30,40,1
[Events]
Format: Layer, Start, End, Style, Name, MarginL, MarginR, MarginV, Effect, Text
"""
events = []
for seg in segments:
start = format_timestamp_ass(seg['start'])
end = format_timestamp_ass(seg['end'])
text = seg['text'].strip()
events.append(
f"Dialogue: 0,{start},{end},Default,,0,0,0,,{text}"
)
return header + '\n'.join(events)
def format_timestamp_ass(seconds: float) -> str:
"""Format seconds to ASS timestamp: H:MM:SS.cc"""
h = int(seconds // 3600)
m = int((seconds % 3600) // 60)
s = int(seconds % 60)
cs = int((seconds % 1) * 100)
return f"{h}:{m:02d}:{s:02d}.{cs:02d}"
4.2 烧录字幕到视频
def burn_subtitles(
video_path: str,
subtitle_path: str,
output_path: str,
style_override: str | None = None,
) -> None:
"""Burn subtitles into video (hardcoded)."""
sub_filter = f"subtitles={subtitle_path}"
if style_override:
sub_filter += f":force_style='{style_override}'"
cmd = [
'ffmpeg', '-y',
'-i', video_path,
'-vf', sub_filter,
'-c:v', 'libx264', '-preset', 'medium', '-crf', '23',
'-c:a', 'copy',
output_path
]
subprocess.run(cmd, check=True, capture_output=True)
五、背景音乐自动化
5.1 音量标准化与混音
背景音乐需要遵循 EBU R128 响度标准,避免盖过旁白:
def normalize_loudness(
audio_path: str,
output_path: str,
target_lufs: float = -23.0, # EBU R128 standard
) -> None:
"""Normalize audio to target LUFS level."""
# First pass: measure current loudness
measure_cmd = [
'ffmpeg', '-i', audio_path,
'-af', f'loudnorm=I={target_lufs}:print_format=json',
'-f', 'null', '/dev/null'
]
result = subprocess.run(
measure_cmd, capture_output=True, text=True
)
# Parse measurement from stderr
stats = parse_loudnorm_stats(result.stderr)
# Second pass: apply correction
normalize_cmd = [
'ffmpeg', '-y', '-i', audio_path,
'-af', (
f"loudnorm=I={target_lufs}:"
f"measured_I={stats['input_i']}:"
f"measured_TP={stats['input_tp']}:"
f"measured_LRA={stats['input_lra']}:"
f"measured_thresh={stats['input_thresh']}:"
f"linear=true"
),
'-ar', '48000',
output_path
]
subprocess.run(normalize_cmd, check=True, capture_output=True)
def auto_mix_bgm(
video_path: str,
bgm_path: str,
output_path: str,
bgm_volume_db: float = -18.0,
ducking: bool = True,
) -> None:
"""
Add background music with optional ducking.
Ducking: lower BGM volume when speech is detected.
"""
if ducking:
# Use sidechaincompress for automatic ducking
filter_complex = (
f"[1:a]volume={bgm_volume_db}dB[bgm];"
f"[bgm][0:a]sidechaincompress="
f"threshold=0.02:ratio=6:attack=200:release=1000[ducked];"
f"[0:a][ducked]amix=inputs=2:duration=first[out]"
)
else:
filter_complex = (
f"[0:a]volume=1.0[voice];"
f"[1:a]volume={bgm_volume_db}dB[bgm];"
f"[voice][bgm]amix=inputs=2:duration=first[out]"
)
cmd = [
'ffmpeg', '-y',
'-i', video_path,
'-i', bgm_path,
'-filter_complex', filter_complex,
'-map', '0:v', '-map', '[out]',
'-c:v', 'copy', '-c:a', 'aac', '-b:a', '192k',
'-shortest',
output_path
]
subprocess.run(cmd, check=True, capture_output=True)
六、批量处理架构
当需要处理数十上百个视频时,单线程串行不可接受。但 FFmpeg 本身是 CPU 密集型的,盲目多进程会导致系统过载。
# batch_processor.py
import asyncio
import os
from concurrent.futures import ProcessPoolExecutor
from dataclasses import dataclass
@dataclass
class BatchJob:
input_path: str
output_path: str
operations: list[str] # e.g., ['trim:0:30', 'subtitle', 'bgm']
status: str = 'pending'
error: str | None = None
class BatchProcessor:
def __init__(self, max_workers: int | None = None):
# Default: half of CPU cores (FFmpeg uses multiple threads per job)
self.max_workers = max_workers or max(1, os.cpu_count() // 2)
self.executor = ProcessPoolExecutor(max_workers=self.max_workers)
async def process_batch(
self,
jobs: list[BatchJob],
on_progress: callable | None = None,
) -> list[BatchJob]:
"""Process a batch of video editing jobs."""
loop = asyncio.get_event_loop()
tasks = []
completed = 0
for job in jobs:
task = loop.run_in_executor(
self.executor,
self._process_single,
job,
)
tasks.append((job, task))
for job, task in tasks:
try:
await task
job.status = 'completed'
except Exception as e:
job.status = 'failed'
job.error = str(e)
completed += 1
if on_progress:
on_progress(completed, len(jobs))
return jobs
@staticmethod
def _process_single(job: BatchJob) -> None:
"""Process a single video job (runs in subprocess)."""
current_path = job.input_path
for op in job.operations:
op_name, *args = op.split(':')
if op_name == 'trim':
start, duration = float(args[0]), float(args[1])
next_path = current_path + '.trimmed.mp4'
trim(current_path, next_path, start, duration)
current_path = next_path
elif op_name == 'subtitle':
sub_path = current_path + '.srt'
generate_subtitles(current_path, sub_path)
next_path = current_path + '.subtitled.mp4'
burn_subtitles(current_path, sub_path, next_path)
current_path = next_path
elif op_name == 'bgm':
bgm_file = args[0] if args else 'default_bgm.mp3'
next_path = current_path + '.bgm.mp4'
auto_mix_bgm(current_path, bgm_file, next_path)
current_path = next_path
# Move final result to output path
os.rename(current_path, job.output_path)
# Usage
async def main():
processor = BatchProcessor(max_workers=4)
jobs = [
BatchJob(
input_path=f'raw/video_{i}.mp4',
output_path=f'processed/video_{i}.mp4',
operations=['trim:0:60', 'subtitle', 'bgm:music/ambient.mp3'],
)
for i in range(20)
]
results = await processor.process_batch(
jobs,
on_progress=lambda done, total: print(f"Progress: {done}/{total}")
)
failed = [j for j in results if j.status == 'failed']
print(f"Completed: {len(results) - len(failed)}/{len(results)}")
for j in failed:
print(f" FAILED: {j.input_path} -> {j.error}")
七、完整编辑 Pipeline 示例
将以上所有模块组合成一条完整的自动化编辑 Pipeline:
输入视频
|
v
[场景检测] -> 切分为 N 个场景
|
v
[Whisper 转写] -> 生成 SRT/ASS 字幕
|
v
[字幕烧录] -> 字幕嵌入视频
|
v
[BGM 混音] -> 添加背景音乐 + ducking
|
v
[标准化输出] -> H.264 + AAC, 1080p, 标准响度
|
v
输出视频
async def auto_edit_pipeline(
input_path: str,
output_path: str,
bgm_path: str | None = None,
language: str = 'zh',
) -> dict:
"""Complete automated video editing pipeline."""
info = probe(input_path)
print(f"Input: {info.width}x{info.height}, {info.duration:.1f}s")
# Step 1: Scene detection
scenes = detect_scenes(input_path, method='content')
print(f"Detected {len(scenes)} scenes")
# Step 2: Generate subtitles
srt_path = output_path.replace('.mp4', '.srt')
generate_subtitles(input_path, srt_path, language=language)
print(f"Subtitles generated: {srt_path}")
# Step 3: Burn subtitles
subtitled_path = output_path.replace('.mp4', '.sub.mp4')
burn_subtitles(input_path, srt_path, subtitled_path)
# Step 4: Add BGM (optional)
if bgm_path:
auto_mix_bgm(subtitled_path, bgm_path, output_path)
else:
os.rename(subtitled_path, output_path)
result_info = probe(output_path)
return {
'scenes': len(scenes),
'duration': result_info.duration,
'size_mb': os.path.getsize(output_path) / (1024 * 1024),
'subtitles': srt_path,
}
八、生产环境注意事项
内存管理
FFmpeg 在处理高分辨率视频时内存消耗可观。特别是使用复杂 filter graph 或处理 4K 素材时:
- 单个 FFmpeg 进程峰值可达 500MB-1GB
- 并发 4 个 FFmpeg 进程需要预留 4GB+ 内存
- PM2 / Docker 的内存限制必须留足余量
临时文件清理
Pipeline 中会产生大量临时文件(中间帧、临时音频、字幕文件)。必须在 Pipeline 正常完成和异常退出两种情况下都确保清理:
import tempfile
import shutil
from contextlib import contextmanager
@contextmanager
def temp_workspace():
"""Create and auto-cleanup a temporary workspace."""
tmpdir = tempfile.mkdtemp(prefix='video_edit_')
try:
yield tmpdir
finally:
shutil.rmtree(tmpdir, ignore_errors=True)
FFmpeg 版本兼容
不同系统的 FFmpeg 版本差异可能导致参数不兼容。建议:
- 项目中固定 FFmpeg 最低版本要求(如 5.0+)
- 启动时检测 FFmpeg 版本并在日志中记录
- 对关键特性(如
loudnorm)做版本检查
Maurice | maurice_wen@proton.me