AI 视频生成工作流设计
AI 导读
AI 视频生成工作流设计 从脚本到成片的端到端 Pipeline 架构:异步编排、进度追踪与质量门禁 一、问题本质:为什么视频生成需要工作流引擎 视频生成不是一次 API 调用,而是一条多阶段、多模型、长耗时的异步流水线。一段 60 秒的 AI 视频,背后可能涉及: LLM 生成脚本(5-15 秒) TTS 语音合成(10-30 秒) 图像/视频片段生成(30-180 秒/帧) FFmpeg...
AI 视频生成工作流设计
从脚本到成片的端到端 Pipeline 架构:异步编排、进度追踪与质量门禁
一、问题本质:为什么视频生成需要工作流引擎
视频生成不是一次 API 调用,而是一条多阶段、多模型、长耗时的异步流水线。一段 60 秒的 AI 视频,背后可能涉及:
- LLM 生成脚本(5-15 秒)
- TTS 语音合成(10-30 秒)
- 图像/视频片段生成(30-180 秒/帧)
- FFmpeg 合成渲染(20-60 秒)
任何一个环节失败,都需要从断点恢复而非从头重来。这要求我们把"生成视频"从一个黑盒函数,拆解为一条可观测、可恢复、可扩展的工作流。
核心设计原则
| 原则 | 说明 | 反模式 |
|---|---|---|
| 阶段隔离 | 每个阶段独立运行、独立失败 | 所有逻辑写在一个 async 函数里 |
| 断点续传 | 已完成阶段的产物持久化,失败后可从中间恢复 | 每次失败从头开始 |
| 进度可观测 | 每个阶段实时推送进度到前端 | 用户只看到一个转圈动画 |
| 质量门禁 | 阶段间设置校验关卡,不合格不进入下一阶段 | 垃圾输入一路传递到最终渲染 |
二、四阶段 Pipeline 架构
┌─────────────────────────────────────────────────────────────────┐
│ Video Generation Pipeline │
├──────────┬──────────┬──────────────────┬───────────────────────┤
│ Stage 1 │ Stage 2 │ Stage 3 │ Stage 4 │
│ Script │ Voice │ Storyboard │ Render │
│ │ │ │ │
│ LLM │ TTS API │ Image Gen x N │ FFmpeg Compose │
│ ~10s │ ~20s │ ~30-180s/frame │ ~30-60s │
│ │ │ (concurrent) │ │
├──────────┴──────────┴──────────────────┴───────────────────────┤
│ SSE Progress Stream │
│ event: progress | frame | complete | error │
└─────────────────────────────────────────────────────────────────┘
2.1 Stage 1:脚本生成
脚本是整条 Pipeline 的源头。LLM 根据用户输入(主题、风格、时长)生成结构化脚本:
// types/video-pipeline.ts
interface VideoScript {
title: string;
totalDuration: number; // seconds
scenes: SceneScript[];
}
interface SceneScript {
index: number;
narration: string; // TTS input
visualPrompt: string; // Image generation prompt
duration: number; // seconds
transition: 'fade' | 'cut' | 'dissolve';
}
// Stage 1: Script Generation
async function generateScript(
topic: string,
style: string,
targetDuration: number
): Promise<VideoScript> {
const prompt = buildScriptPrompt(topic, style, targetDuration);
const response = await llm.chat({
model: 'gemini-2.5-flash',
messages: [{ role: 'user', content: prompt }],
responseFormat: { type: 'json_object' },
});
const script = JSON.parse(response.content) as VideoScript;
// Quality gate: validate structure
validateScript(script, targetDuration);
return script;
}
function validateScript(script: VideoScript, targetDuration: number): void {
const totalNarration = script.scenes.reduce((s, sc) => s + sc.duration, 0);
if (Math.abs(totalNarration - targetDuration) > 10) {
throw new ScriptValidationError(
`Duration mismatch: got ${totalNarration}s, expected ~${targetDuration}s`
);
}
for (const scene of script.scenes) {
if (!scene.narration || scene.narration.length < 10) {
throw new ScriptValidationError(
`Scene ${scene.index}: narration too short`
);
}
}
}
2.2 Stage 2:语音合成
将脚本的每个场景旁白转为音频,同时获取精确时长用于后续渲染对齐:
// Stage 2: Voice Synthesis
async function generateVoices(
scenes: SceneScript[],
voiceConfig: VoiceConfig,
onProgress: (index: number, total: number) => void
): Promise<VoiceResult[]> {
const results: VoiceResult[] = [];
for (const scene of scenes) {
const audio = await tts.synthesize({
text: scene.narration,
voice: voiceConfig.voiceId, // e.g., 'alloy', 'nova'
model: 'tts-1-hd',
speed: voiceConfig.speed ?? 1.0,
});
// Get precise duration from audio buffer
const duration = await getAudioDuration(audio.buffer);
results.push({
sceneIndex: scene.index,
audioBuffer: audio.buffer,
audioUrl: await uploadToR2(audio.buffer, `voice-${scene.index}.mp3`),
duration,
});
onProgress(scene.index + 1, scenes.length);
}
return results;
}
2.3 Stage 3:故事板(图像生成)
这是耗时最长的阶段。关键决策:并发控制和 Fallback 链。
// Stage 3: Storyboard Generation
async function generateStoryboard(
scenes: SceneScript[],
style: string,
onFrame: (index: number, result: FrameResult) => void
): Promise<FrameResult[]> {
const concurrency = 2; // Avoid burst quota exhaustion
const frames: FrameResult[] = new Array(scenes.length);
// Process in batches of `concurrency`
for (let i = 0; i < scenes.length; i += concurrency) {
const batch = scenes.slice(i, i + concurrency);
const batchResults = await Promise.allSettled(
batch.map(scene => generateFrameWithFallback(scene, style))
);
for (let j = 0; j < batchResults.length; j++) {
const result = batchResults[j];
const sceneIndex = i + j;
if (result.status === 'fulfilled') {
frames[sceneIndex] = result.value;
onFrame(sceneIndex, result.value);
} else {
// Record failure but continue other frames
frames[sceneIndex] = {
sceneIndex,
status: 'failed',
error: result.reason.message,
};
}
}
}
// Quality gate: require >= 80% frames succeeded
const successCount = frames.filter(f => f.status === 'success').length;
if (successCount / scenes.length < 0.8) {
throw new StoryboardError(
`Only ${successCount}/${scenes.length} frames generated`
);
}
return frames;
}
// Fallback chain: Primary -> Secondary -> Placeholder
async function generateFrameWithFallback(
scene: SceneScript,
style: string
): Promise<FrameResult> {
const providers = [
{ name: 'google-imagen', fn: () => googleImagen(scene.visualPrompt, style) },
{ name: 'poe-flux', fn: () => poeFlux(scene.visualPrompt, style) },
];
for (const provider of providers) {
try {
const image = await withTimeout(provider.fn(), 90_000);
return {
sceneIndex: scene.index,
status: 'success',
imageUrl: image.url,
provider: provider.name,
};
} catch (err) {
console.warn(`Frame ${scene.index}: ${provider.name} failed`, err);
continue;
}
}
throw new Error(`All providers failed for frame ${scene.index}`);
}
2.4 Stage 4:FFmpeg 渲染合成
将音频和图像帧合成为最终视频:
// Stage 4: Video Rendering
async function renderVideo(
frames: FrameResult[],
voices: VoiceResult[],
scenes: SceneScript[],
outputPath: string
): Promise<RenderResult> {
// Build FFmpeg filter graph
const inputs: string[] = [];
const filterParts: string[] = [];
for (let i = 0; i < frames.length; i++) {
const frame = frames[i];
const voice = voices[i];
const scene = scenes[i];
if (frame.status !== 'success') continue;
// Download frame image as base64
const imageBase64 = await downloadAsBase64(frame.imageUrl);
const imagePath = path.join(tmpDir, `frame-${i}.png`);
await fs.writeFile(imagePath, Buffer.from(imageBase64, 'base64'));
inputs.push(`-loop 1 -t ${voice.duration} -i ${imagePath}`);
inputs.push(`-i ${voice.audioUrl}`);
}
const ffmpegCmd = [
'ffmpeg -y',
...inputs,
'-filter_complex', buildFilterGraph(frames, voices, scenes),
'-c:v libx264 -preset medium -crf 23',
'-c:a aac -b:a 192k',
'-movflags +faststart',
outputPath,
].join(' ');
await execAsync(ffmpegCmd, { timeout: 120_000 });
const stats = await fs.stat(outputPath);
return {
path: outputPath,
size: stats.size,
duration: voices.reduce((s, v) => s + v.duration, 0),
};
}
三、异步编排:SSE 进度推送
前端不能轮询,必须用 Server-Sent Events 实时推送每个阶段的进度:
// app/api/video-generate-sse/route.ts
export async function POST(req: Request): Promise<Response> {
const { topic, style, duration, voiceConfig } = await req.json();
const stream = new ReadableStream({
async start(controller) {
const send = (event: string, data: unknown) => {
controller.enqueue(
`event: ${event}\ndata: ${JSON.stringify(data)}\n\n`
);
};
// Heartbeat to keep Cloudflare alive
const heartbeat = setInterval(() => {
send('ping', { ts: Date.now() });
}, 25_000);
try {
// Stage 1: Script
send('progress', { stage: 'script', status: 'running' });
const script = await generateScript(topic, style, duration);
send('progress', { stage: 'script', status: 'done', data: script });
// Stage 2: Voice
send('progress', { stage: 'voice', status: 'running' });
const voices = await generateVoices(
script.scenes,
voiceConfig,
(current, total) => {
send('progress', {
stage: 'voice',
status: 'running',
current,
total,
});
}
);
send('progress', { stage: 'voice', status: 'done' });
// Stage 3: Storyboard
send('progress', { stage: 'storyboard', status: 'running' });
const frames = await generateStoryboard(
script.scenes,
style,
(index, frame) => {
send('frame', { index, frame });
}
);
send('progress', { stage: 'storyboard', status: 'done' });
// Stage 4: Render
send('progress', { stage: 'render', status: 'running' });
const result = await renderVideo(
frames, voices, script.scenes, outputPath
);
send('complete', { videoUrl: result.url, duration: result.duration });
} catch (err) {
send('error', { message: err.message, stage: currentStage });
} finally {
clearInterval(heartbeat);
controller.close();
}
},
});
return new Response(stream, {
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
Connection: 'keep-alive',
},
});
}
四、断点续传与错误恢复
长流水线最怕中途失败全部重来。核心思路:每个阶段完成后将产物持久化。
interface PipelineState {
jobId: string;
status: 'pending' | 'running' | 'completed' | 'failed';
currentStage: 'script' | 'voice' | 'storyboard' | 'render';
artifacts: {
script?: VideoScript;
voices?: VoiceResult[];
frames?: FrameResult[];
video?: RenderResult;
};
error?: { stage: string; message: string; retryCount: number };
createdAt: Date;
updatedAt: Date;
}
// Resume from last successful stage
async function resumePipeline(state: PipelineState): Promise<void> {
const stages = ['script', 'voice', 'storyboard', 'render'];
const startIndex = stages.indexOf(state.currentStage);
for (let i = startIndex; i < stages.length; i++) {
const stage = stages[i];
// Skip already completed stages
if (state.artifacts[stage]) continue;
try {
const result = await executeStage(stage, state);
state.artifacts[stage] = result;
state.currentStage = stages[i + 1] ?? 'completed';
await persistState(state); // Save checkpoint
} catch (err) {
state.status = 'failed';
state.error = {
stage,
message: err.message,
retryCount: (state.error?.retryCount ?? 0) + 1,
};
await persistState(state);
// Auto-retry with exponential backoff (max 3 times)
if (state.error.retryCount < 3) {
const delay = Math.pow(2, state.error.retryCount) * 1000;
await sleep(delay);
return resumePipeline(state);
}
throw err;
}
}
state.status = 'completed';
await persistState(state);
}
五、质量门禁设计
每个阶段出口设置校验,防止劣质中间产物传递:
Stage 1 (Script)
Gate: scenes.length >= 3 && total_duration within +-10s of target
narration.length >= 10 per scene
Stage 2 (Voice)
Gate: all audio files playable && duration > 0
total voice duration matches script expectation
Stage 3 (Storyboard)
Gate: success_rate >= 80%
image dimensions match spec (e.g., 1920x1080)
no NSFW content (optional moderation check)
Stage 4 (Render)
Gate: output file exists && size > 100KB
video duration matches sum of voice durations
codec validation (H.264 + AAC)
// Quality gate example
interface QualityGate {
name: string;
check: (artifact: unknown) => boolean;
severity: 'block' | 'warn';
}
const storyboardGates: QualityGate[] = [
{
name: 'min-success-rate',
check: (frames: FrameResult[]) => {
const successRate = frames.filter(f => f.status === 'success').length / frames.length;
return successRate >= 0.8;
},
severity: 'block',
},
{
name: 'resolution-check',
check: (frames: FrameResult[]) => {
return frames.every(f => f.width >= 1920 && f.height >= 1080);
},
severity: 'warn',
},
];
function runGates(gates: QualityGate[], artifact: unknown): void {
for (const gate of gates) {
const passed = gate.check(artifact);
if (!passed && gate.severity === 'block') {
throw new QualityGateError(`Gate "${gate.name}" failed`);
}
if (!passed && gate.severity === 'warn') {
console.warn(`Quality warning: gate "${gate.name}" did not pass`);
}
}
}
六、生产经验与陷阱
6.1 Cloudflare 代理超时
Cloudflare 免费版对 origin 连接有 100 秒超时。多阶段 Pipeline 总耗时远超此限制。
解决方案:
- SSE 心跳每 25 秒发送一次
: heartbeat\n\n - LLM 调用使用 Flash 模型(2-5 秒/调用),避免 Pro 模型(20-30 秒/调用)
- 在极端情况下,拆分为"提交任务 + 异步轮询"两步
6.2 图像生成并发配额
Google Imagen / DALL-E 等服务有 burst quota 限制。4 帧并发极易触发限流。
解决方案:
- 默认并发控制为 2
- 遇到 429 / quota error 时自动降级到串行
- 设置 provider fallback 链(Google -> Poe -> Placeholder)
6.3 FFmpeg 内存峰值
渲染阶段 FFmpeg 处理 base64 帧时内存可达 600-700MB,PM2 的 max_memory_restart 设置过低会导致进程被杀。
解决方案:
- PM2 内存限制设为 768MB 以上
- 渲染前释放不再需要的中间数据(script / voice buffers)
- 考虑将渲染拆到独立 Worker 进程
6.4 SSE 事件类型设计
| 事件类型 | 数据内容 | 用途 |
|---|---|---|
progress |
{ stage, status, current?, total? } |
阶段状态更新 |
frame |
{ index, frame: { imageUrl, provider } } |
单帧完成通知 |
complete |
{ videoUrl, duration, size } |
最终成片 |
error |
{ stage, message, retryable } |
错误通知 |
ping |
{ ts } |
心跳保活 |
不要在 progress 事件中传递帧数据,不要在 frame 事件中传递阶段状态。混淆事件语义是前端解析 Bug 的主要来源。
七、前端进度 UI 设计
┌──────────────────────────────────────────────────────┐
│ AI Video Generator │
│ │
│ [1. Script] ──> [2. Voice] ──> [3. Frames] ──> [4. Render] │
│ [done] [done] [3/5] [waiting] │
│ │
│ ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐ │
│ │ F1 │ │ F2 │ │ F3 │ │ F4 │ │ F5 │ │
│ │ OK │ │ OK │ │ OK │ │ ... │ │ ... │ │
│ └─────┘ └─────┘ └─────┘ └─────┘ └─────┘ │
│ │
│ Elapsed: 1m 23s | Estimated: ~2m remaining │
└──────────────────────────────────────────────────────┘
关键交互要求:
- 每个阶段有明确的状态指示(waiting / running / done / failed)
- 故事板阶段实时展示已生成的帧预览
- 失败时显示具体阶段和可操作的错误信息
- 提供"从失败处重试"按钮,而非只有"重新生成"
八、架构总结
一条可靠的 AI 视频生成 Pipeline 的核心不在于调用哪个模型,而在于:
- 阶段隔离:每个阶段独立运行、独立失败、独立重试
- 产物持久化:阶段完成后立即保存,支持断点续传
- 进度可观测:SSE 实时推送,前端可精确展示每一步状态
- 质量门禁:阶段间校验,防止垃圾传播
- 容错设计:Fallback 链、并发控制、超时管理、自动重试
这不是过度工程,而是对"一次视频生成可能花费用户 3-5 分钟等待时间"的尊重。
Maurice | maurice_wen@proton.me