AI 视频生成工作流设计

从脚本到成片的端到端 Pipeline 架构:异步编排、进度追踪与质量门禁


一、问题本质:为什么视频生成需要工作流引擎

视频生成不是一次 API 调用,而是一条多阶段、多模型、长耗时的异步流水线。一段 60 秒的 AI 视频,背后可能涉及:

  • LLM 生成脚本(5-15 秒)
  • TTS 语音合成(10-30 秒)
  • 图像/视频片段生成(30-180 秒/帧)
  • FFmpeg 合成渲染(20-60 秒)

任何一个环节失败,都需要从断点恢复而非从头重来。这要求我们把"生成视频"从一个黑盒函数,拆解为一条可观测、可恢复、可扩展的工作流。

核心设计原则

原则 说明 反模式
阶段隔离 每个阶段独立运行、独立失败 所有逻辑写在一个 async 函数里
断点续传 已完成阶段的产物持久化,失败后可从中间恢复 每次失败从头开始
进度可观测 每个阶段实时推送进度到前端 用户只看到一个转圈动画
质量门禁 阶段间设置校验关卡,不合格不进入下一阶段 垃圾输入一路传递到最终渲染

二、四阶段 Pipeline 架构

┌─────────────────────────────────────────────────────────────────┐
│                    Video Generation Pipeline                     │
├──────────┬──────────┬──────────────────┬───────────────────────┤
│ Stage 1  │ Stage 2  │    Stage 3       │     Stage 4           │
│ Script   │ Voice    │  Storyboard      │     Render            │
│          │          │                  │                       │
│ LLM      │ TTS API  │ Image Gen x N    │  FFmpeg Compose       │
│ ~10s     │ ~20s     │ ~30-180s/frame   │  ~30-60s              │
│          │          │ (concurrent)     │                       │
├──────────┴──────────┴──────────────────┴───────────────────────┤
│                    SSE Progress Stream                          │
│              event: progress | frame | complete | error         │
└─────────────────────────────────────────────────────────────────┘

2.1 Stage 1:脚本生成

脚本是整条 Pipeline 的源头。LLM 根据用户输入(主题、风格、时长)生成结构化脚本:

// types/video-pipeline.ts
interface VideoScript {
  title: string;
  totalDuration: number; // seconds
  scenes: SceneScript[];
}

interface SceneScript {
  index: number;
  narration: string;       // TTS input
  visualPrompt: string;    // Image generation prompt
  duration: number;        // seconds
  transition: 'fade' | 'cut' | 'dissolve';
}

// Stage 1: Script Generation
async function generateScript(
  topic: string,
  style: string,
  targetDuration: number
): Promise<VideoScript> {
  const prompt = buildScriptPrompt(topic, style, targetDuration);

  const response = await llm.chat({
    model: 'gemini-2.5-flash',
    messages: [{ role: 'user', content: prompt }],
    responseFormat: { type: 'json_object' },
  });

  const script = JSON.parse(response.content) as VideoScript;

  // Quality gate: validate structure
  validateScript(script, targetDuration);

  return script;
}

function validateScript(script: VideoScript, targetDuration: number): void {
  const totalNarration = script.scenes.reduce((s, sc) => s + sc.duration, 0);
  if (Math.abs(totalNarration - targetDuration) > 10) {
    throw new ScriptValidationError(
      `Duration mismatch: got ${totalNarration}s, expected ~${targetDuration}s`
    );
  }
  for (const scene of script.scenes) {
    if (!scene.narration || scene.narration.length < 10) {
      throw new ScriptValidationError(
        `Scene ${scene.index}: narration too short`
      );
    }
  }
}

2.2 Stage 2:语音合成

将脚本的每个场景旁白转为音频,同时获取精确时长用于后续渲染对齐:

// Stage 2: Voice Synthesis
async function generateVoices(
  scenes: SceneScript[],
  voiceConfig: VoiceConfig,
  onProgress: (index: number, total: number) => void
): Promise<VoiceResult[]> {
  const results: VoiceResult[] = [];

  for (const scene of scenes) {
    const audio = await tts.synthesize({
      text: scene.narration,
      voice: voiceConfig.voiceId,     // e.g., 'alloy', 'nova'
      model: 'tts-1-hd',
      speed: voiceConfig.speed ?? 1.0,
    });

    // Get precise duration from audio buffer
    const duration = await getAudioDuration(audio.buffer);

    results.push({
      sceneIndex: scene.index,
      audioBuffer: audio.buffer,
      audioUrl: await uploadToR2(audio.buffer, `voice-${scene.index}.mp3`),
      duration,
    });

    onProgress(scene.index + 1, scenes.length);
  }

  return results;
}

2.3 Stage 3:故事板(图像生成)

这是耗时最长的阶段。关键决策:并发控制和 Fallback 链。

// Stage 3: Storyboard Generation
async function generateStoryboard(
  scenes: SceneScript[],
  style: string,
  onFrame: (index: number, result: FrameResult) => void
): Promise<FrameResult[]> {
  const concurrency = 2; // Avoid burst quota exhaustion
  const frames: FrameResult[] = new Array(scenes.length);

  // Process in batches of `concurrency`
  for (let i = 0; i < scenes.length; i += concurrency) {
    const batch = scenes.slice(i, i + concurrency);
    const batchResults = await Promise.allSettled(
      batch.map(scene => generateFrameWithFallback(scene, style))
    );

    for (let j = 0; j < batchResults.length; j++) {
      const result = batchResults[j];
      const sceneIndex = i + j;

      if (result.status === 'fulfilled') {
        frames[sceneIndex] = result.value;
        onFrame(sceneIndex, result.value);
      } else {
        // Record failure but continue other frames
        frames[sceneIndex] = {
          sceneIndex,
          status: 'failed',
          error: result.reason.message,
        };
      }
    }
  }

  // Quality gate: require >= 80% frames succeeded
  const successCount = frames.filter(f => f.status === 'success').length;
  if (successCount / scenes.length < 0.8) {
    throw new StoryboardError(
      `Only ${successCount}/${scenes.length} frames generated`
    );
  }

  return frames;
}

// Fallback chain: Primary -> Secondary -> Placeholder
async function generateFrameWithFallback(
  scene: SceneScript,
  style: string
): Promise<FrameResult> {
  const providers = [
    { name: 'google-imagen', fn: () => googleImagen(scene.visualPrompt, style) },
    { name: 'poe-flux',      fn: () => poeFlux(scene.visualPrompt, style) },
  ];

  for (const provider of providers) {
    try {
      const image = await withTimeout(provider.fn(), 90_000);
      return {
        sceneIndex: scene.index,
        status: 'success',
        imageUrl: image.url,
        provider: provider.name,
      };
    } catch (err) {
      console.warn(`Frame ${scene.index}: ${provider.name} failed`, err);
      continue;
    }
  }

  throw new Error(`All providers failed for frame ${scene.index}`);
}

2.4 Stage 4:FFmpeg 渲染合成

将音频和图像帧合成为最终视频:

// Stage 4: Video Rendering
async function renderVideo(
  frames: FrameResult[],
  voices: VoiceResult[],
  scenes: SceneScript[],
  outputPath: string
): Promise<RenderResult> {
  // Build FFmpeg filter graph
  const inputs: string[] = [];
  const filterParts: string[] = [];

  for (let i = 0; i < frames.length; i++) {
    const frame = frames[i];
    const voice = voices[i];
    const scene = scenes[i];

    if (frame.status !== 'success') continue;

    // Download frame image as base64
    const imageBase64 = await downloadAsBase64(frame.imageUrl);
    const imagePath = path.join(tmpDir, `frame-${i}.png`);
    await fs.writeFile(imagePath, Buffer.from(imageBase64, 'base64'));

    inputs.push(`-loop 1 -t ${voice.duration} -i ${imagePath}`);
    inputs.push(`-i ${voice.audioUrl}`);
  }

  const ffmpegCmd = [
    'ffmpeg -y',
    ...inputs,
    '-filter_complex', buildFilterGraph(frames, voices, scenes),
    '-c:v libx264 -preset medium -crf 23',
    '-c:a aac -b:a 192k',
    '-movflags +faststart',
    outputPath,
  ].join(' ');

  await execAsync(ffmpegCmd, { timeout: 120_000 });

  const stats = await fs.stat(outputPath);
  return {
    path: outputPath,
    size: stats.size,
    duration: voices.reduce((s, v) => s + v.duration, 0),
  };
}

三、异步编排:SSE 进度推送

前端不能轮询,必须用 Server-Sent Events 实时推送每个阶段的进度:

// app/api/video-generate-sse/route.ts
export async function POST(req: Request): Promise<Response> {
  const { topic, style, duration, voiceConfig } = await req.json();

  const stream = new ReadableStream({
    async start(controller) {
      const send = (event: string, data: unknown) => {
        controller.enqueue(
          `event: ${event}\ndata: ${JSON.stringify(data)}\n\n`
        );
      };

      // Heartbeat to keep Cloudflare alive
      const heartbeat = setInterval(() => {
        send('ping', { ts: Date.now() });
      }, 25_000);

      try {
        // Stage 1: Script
        send('progress', { stage: 'script', status: 'running' });
        const script = await generateScript(topic, style, duration);
        send('progress', { stage: 'script', status: 'done', data: script });

        // Stage 2: Voice
        send('progress', { stage: 'voice', status: 'running' });
        const voices = await generateVoices(
          script.scenes,
          voiceConfig,
          (current, total) => {
            send('progress', {
              stage: 'voice',
              status: 'running',
              current,
              total,
            });
          }
        );
        send('progress', { stage: 'voice', status: 'done' });

        // Stage 3: Storyboard
        send('progress', { stage: 'storyboard', status: 'running' });
        const frames = await generateStoryboard(
          script.scenes,
          style,
          (index, frame) => {
            send('frame', { index, frame });
          }
        );
        send('progress', { stage: 'storyboard', status: 'done' });

        // Stage 4: Render
        send('progress', { stage: 'render', status: 'running' });
        const result = await renderVideo(
          frames, voices, script.scenes, outputPath
        );
        send('complete', { videoUrl: result.url, duration: result.duration });
      } catch (err) {
        send('error', { message: err.message, stage: currentStage });
      } finally {
        clearInterval(heartbeat);
        controller.close();
      }
    },
  });

  return new Response(stream, {
    headers: {
      'Content-Type': 'text/event-stream',
      'Cache-Control': 'no-cache',
      Connection: 'keep-alive',
    },
  });
}

四、断点续传与错误恢复

长流水线最怕中途失败全部重来。核心思路:每个阶段完成后将产物持久化。

interface PipelineState {
  jobId: string;
  status: 'pending' | 'running' | 'completed' | 'failed';
  currentStage: 'script' | 'voice' | 'storyboard' | 'render';
  artifacts: {
    script?: VideoScript;
    voices?: VoiceResult[];
    frames?: FrameResult[];
    video?: RenderResult;
  };
  error?: { stage: string; message: string; retryCount: number };
  createdAt: Date;
  updatedAt: Date;
}

// Resume from last successful stage
async function resumePipeline(state: PipelineState): Promise<void> {
  const stages = ['script', 'voice', 'storyboard', 'render'];
  const startIndex = stages.indexOf(state.currentStage);

  for (let i = startIndex; i < stages.length; i++) {
    const stage = stages[i];

    // Skip already completed stages
    if (state.artifacts[stage]) continue;

    try {
      const result = await executeStage(stage, state);
      state.artifacts[stage] = result;
      state.currentStage = stages[i + 1] ?? 'completed';
      await persistState(state); // Save checkpoint
    } catch (err) {
      state.status = 'failed';
      state.error = {
        stage,
        message: err.message,
        retryCount: (state.error?.retryCount ?? 0) + 1,
      };
      await persistState(state);

      // Auto-retry with exponential backoff (max 3 times)
      if (state.error.retryCount < 3) {
        const delay = Math.pow(2, state.error.retryCount) * 1000;
        await sleep(delay);
        return resumePipeline(state);
      }

      throw err;
    }
  }

  state.status = 'completed';
  await persistState(state);
}

五、质量门禁设计

每个阶段出口设置校验,防止劣质中间产物传递:

Stage 1 (Script)
  Gate: scenes.length >= 3 && total_duration within +-10s of target
        narration.length >= 10 per scene

Stage 2 (Voice)
  Gate: all audio files playable && duration > 0
        total voice duration matches script expectation

Stage 3 (Storyboard)
  Gate: success_rate >= 80%
        image dimensions match spec (e.g., 1920x1080)
        no NSFW content (optional moderation check)

Stage 4 (Render)
  Gate: output file exists && size > 100KB
        video duration matches sum of voice durations
        codec validation (H.264 + AAC)
// Quality gate example
interface QualityGate {
  name: string;
  check: (artifact: unknown) => boolean;
  severity: 'block' | 'warn';
}

const storyboardGates: QualityGate[] = [
  {
    name: 'min-success-rate',
    check: (frames: FrameResult[]) => {
      const successRate = frames.filter(f => f.status === 'success').length / frames.length;
      return successRate >= 0.8;
    },
    severity: 'block',
  },
  {
    name: 'resolution-check',
    check: (frames: FrameResult[]) => {
      return frames.every(f => f.width >= 1920 && f.height >= 1080);
    },
    severity: 'warn',
  },
];

function runGates(gates: QualityGate[], artifact: unknown): void {
  for (const gate of gates) {
    const passed = gate.check(artifact);
    if (!passed && gate.severity === 'block') {
      throw new QualityGateError(`Gate "${gate.name}" failed`);
    }
    if (!passed && gate.severity === 'warn') {
      console.warn(`Quality warning: gate "${gate.name}" did not pass`);
    }
  }
}

六、生产经验与陷阱

6.1 Cloudflare 代理超时

Cloudflare 免费版对 origin 连接有 100 秒超时。多阶段 Pipeline 总耗时远超此限制。

解决方案:

  • SSE 心跳每 25 秒发送一次 : heartbeat\n\n
  • LLM 调用使用 Flash 模型(2-5 秒/调用),避免 Pro 模型(20-30 秒/调用)
  • 在极端情况下,拆分为"提交任务 + 异步轮询"两步

6.2 图像生成并发配额

Google Imagen / DALL-E 等服务有 burst quota 限制。4 帧并发极易触发限流。

解决方案:

  • 默认并发控制为 2
  • 遇到 429 / quota error 时自动降级到串行
  • 设置 provider fallback 链(Google -> Poe -> Placeholder)

6.3 FFmpeg 内存峰值

渲染阶段 FFmpeg 处理 base64 帧时内存可达 600-700MB,PM2 的 max_memory_restart 设置过低会导致进程被杀。

解决方案:

  • PM2 内存限制设为 768MB 以上
  • 渲染前释放不再需要的中间数据(script / voice buffers)
  • 考虑将渲染拆到独立 Worker 进程

6.4 SSE 事件类型设计

事件类型 数据内容 用途
progress { stage, status, current?, total? } 阶段状态更新
frame { index, frame: { imageUrl, provider } } 单帧完成通知
complete { videoUrl, duration, size } 最终成片
error { stage, message, retryable } 错误通知
ping { ts } 心跳保活

不要在 progress 事件中传递帧数据,不要在 frame 事件中传递阶段状态。混淆事件语义是前端解析 Bug 的主要来源。


七、前端进度 UI 设计

┌──────────────────────────────────────────────────────┐
│  AI Video Generator                                   │
│                                                       │
│  [1. Script]  ──>  [2. Voice]  ──>  [3. Frames]  ──> [4. Render]  │
│    [done]           [done]          [3/5]             [waiting]    │
│                                                       │
│  ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐            │
│  │ F1  │ │ F2  │ │ F3  │ │ F4  │ │ F5  │            │
│  │ OK  │ │ OK  │ │ OK  │ │ ... │ │ ... │            │
│  └─────┘ └─────┘ └─────┘ └─────┘ └─────┘            │
│                                                       │
│  Elapsed: 1m 23s  |  Estimated: ~2m remaining        │
└──────────────────────────────────────────────────────┘

关键交互要求:

  • 每个阶段有明确的状态指示(waiting / running / done / failed)
  • 故事板阶段实时展示已生成的帧预览
  • 失败时显示具体阶段和可操作的错误信息
  • 提供"从失败处重试"按钮,而非只有"重新生成"

八、架构总结

一条可靠的 AI 视频生成 Pipeline 的核心不在于调用哪个模型,而在于:

  1. 阶段隔离:每个阶段独立运行、独立失败、独立重试
  2. 产物持久化:阶段完成后立即保存,支持断点续传
  3. 进度可观测:SSE 实时推送,前端可精确展示每一步状态
  4. 质量门禁:阶段间校验,防止垃圾传播
  5. 容错设计:Fallback 链、并发控制、超时管理、自动重试

这不是过度工程,而是对"一次视频生成可能花费用户 3-5 分钟等待时间"的尊重。


Maurice | maurice_wen@proton.me