AI 视频内容审核技术

NSFW 检测、深度伪造识别、版权匹配与内容分类的审核流水线设计


一、为什么视频审核是刚需

用户生成内容(UGC)和 AI 生成内容(AIGC)的爆发式增长,使得视频平台面临三重审核压力:

  1. 合规压力:各国法规要求平台对有害内容承担审查责任
  2. 质量压力:AI 生成的视频可能包含不当内容、版权素材或误导信息
  3. 规模压力:每天数百万条视频,人工审核不可能覆盖

视频审核不是一个模型能解决的问题,而是一条多级 Pipeline——先用低成本的自动化手段过滤绝大多数内容,再把不确定的交给人工。

审核维度全景

维度 检测内容 技术路线 误判容忍度
NSFW 色情、暴力、血腥 CNN/ViT 分类器 低(宁可误报)
深度伪造 换脸、语音克隆 频域分析 + 对抗检测
版权 音乐、影视片段 指纹匹配(哈希)
内容分类 政治敏感、仇恨言论 多模态 LLM + 关键词
质量 低画质、黑屏、卡顿 信号处理 + 启发式

二、NSFW 检测

2.1 架构设计

NSFW 检测的核心是对视频帧进行图像分类。关键挑战在于:不能对每一帧都跑推理(太慢),也不能只看缩略图(会漏掉中间的违规内容)。

视频输入
  |
  v
[关键帧提取] -> 每秒 1 帧 + 场景切换帧
  |
  v
[NSFW 分类器] -> { safe, suggestive, explicit } x confidence
  |
  v
[决策引擎] -> 任一帧 explicit > 0.8 => 标记违规
  |
  v
[人工复核队列] (confidence 在 0.5-0.8 之间)

2.2 关键帧提取策略

# keyframe_extractor.py
import subprocess
import json
from pathlib import Path


def extract_keyframes(
    video_path: str,
    output_dir: str,
    strategy: str = 'hybrid',
    fps: float = 1.0,
) -> list[str]:
    """
    Extract keyframes for content moderation.

    Strategies:
    - 'uniform': fixed interval (1 fps)
    - 'scene': scene change detection
    - 'hybrid': uniform + scene changes (recommended)
    """
    Path(output_dir).mkdir(parents=True, exist_ok=True)

    if strategy == 'uniform':
        return _extract_uniform(video_path, output_dir, fps)
    elif strategy == 'scene':
        return _extract_scene_change(video_path, output_dir)
    elif strategy == 'hybrid':
        uniform = _extract_uniform(video_path, output_dir, fps)
        scene = _extract_scene_change(video_path, output_dir)
        # Deduplicate by removing scene frames too close to uniform frames
        return _merge_keyframes(uniform, scene, min_gap=0.5)
    else:
        raise ValueError(f"Unknown strategy: {strategy}")


def _extract_uniform(
    video_path: str, output_dir: str, fps: float
) -> list[str]:
    """Extract frames at fixed intervals."""
    pattern = f"{output_dir}/uniform_%04d.jpg"
    cmd = [
        'ffmpeg', '-i', video_path,
        '-vf', f'fps={fps}',
        '-q:v', '2',
        pattern
    ]
    subprocess.run(cmd, check=True, capture_output=True)
    return sorted(Path(output_dir).glob('uniform_*.jpg'))


def _extract_scene_change(
    video_path: str, output_dir: str, threshold: float = 0.3
) -> list[str]:
    """Extract frames at scene boundaries."""
    pattern = f"{output_dir}/scene_%04d.jpg"
    cmd = [
        'ffmpeg', '-i', video_path,
        '-vf', f"select='gt(scene,{threshold})'",
        '-vsync', 'vfr',
        '-q:v', '2',
        pattern
    ]
    subprocess.run(cmd, check=True, capture_output=True)
    return sorted(Path(output_dir).glob('scene_*.jpg'))

2.3 NSFW 分类模型集成

# nsfw_classifier.py
from dataclasses import dataclass
from enum import Enum
import numpy as np


class NSFWLevel(Enum):
    SAFE = 'safe'
    SUGGESTIVE = 'suggestive'
    EXPLICIT = 'explicit'


@dataclass
class NSFWResult:
    level: NSFWLevel
    confidence: float
    scores: dict[str, float]  # { safe: 0.95, suggestive: 0.03, explicit: 0.02 }


class NSFWClassifier:
    """
    NSFW classification using open-source models.

    Recommended models:
    - Falconsai/nsfw_image_detection (HuggingFace, ViT-based)
    - GantMan/nsfw_model (TensorFlow, MobileNet-based, lightweight)
    """

    def __init__(self, model_name: str = 'Falconsai/nsfw_image_detection'):
        from transformers import pipeline
        self.pipe = pipeline(
            'image-classification',
            model=model_name,
            device='mps',  # Use Metal on macOS; 'cuda' for NVIDIA
        )

    def classify(self, image_path: str) -> NSFWResult:
        """Classify a single image."""
        results = self.pipe(image_path)
        scores = {r['label'].lower(): r['score'] for r in results}

        # Map to our levels
        explicit_score = scores.get('nsfw', 0) + scores.get('explicit', 0)
        safe_score = scores.get('normal', 0) + scores.get('safe', 0)
        suggestive_score = 1.0 - explicit_score - safe_score

        if explicit_score > 0.8:
            level = NSFWLevel.EXPLICIT
        elif explicit_score > 0.3 or suggestive_score > 0.5:
            level = NSFWLevel.SUGGESTIVE
        else:
            level = NSFWLevel.SAFE

        return NSFWResult(
            level=level,
            confidence=max(explicit_score, safe_score),
            scores={
                'safe': safe_score,
                'suggestive': suggestive_score,
                'explicit': explicit_score,
            },
        )

    def classify_video(
        self,
        keyframes: list[str],
        policy: str = 'any',  # 'any' or 'majority'
    ) -> dict:
        """Classify all keyframes and aggregate."""
        results = [self.classify(kf) for kf in keyframes]

        if policy == 'any':
            # Any explicit frame -> video is explicit
            worst = max(results, key=lambda r: r.scores['explicit'])
            return {
                'verdict': worst.level.value,
                'confidence': worst.confidence,
                'flagged_frames': [
                    {'frame': kf, 'result': r}
                    for kf, r in zip(keyframes, results)
                    if r.level != NSFWLevel.SAFE
                ],
                'total_frames': len(keyframes),
            }
        else:
            raise NotImplementedError(f"Policy {policy} not implemented")

三、深度伪造检测(Deepfake Detection)

3.1 检测方法分类

方法 原理 优点 缺点
频域分析 GAN 生成的图像在频域有特征模式 不依赖人脸 对新 GAN 泛化差
人脸一致性 检测面部边界、光照不一致 直觉清晰 仅限换脸
时序分析 检测帧间不自然的闪烁/跳变 利用视频特性 需要多帧
对抗训练 在大量 real/fake 样本上训练分类器 通用性强 需要大规模数据集

3.2 实用检测 Pipeline

# deepfake_detector.py
import cv2
import numpy as np
from dataclasses import dataclass


@dataclass
class DeepfakeResult:
    is_fake: bool
    confidence: float
    method: str
    details: dict


class DeepfakeDetector:
    """
    Multi-method deepfake detection.

    Production recommendation: ensemble multiple detectors
    and use voting to reduce false positives.
    """

    def detect_frequency_artifacts(
        self, image: np.ndarray
    ) -> DeepfakeResult:
        """
        Frequency domain analysis.
        GAN-generated images often show characteristic patterns
        in the high-frequency spectrum.
        """
        gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)

        # 2D FFT
        f_transform = np.fft.fft2(gray)
        f_shift = np.fft.fftshift(f_transform)
        magnitude = np.abs(f_shift)
        log_magnitude = np.log1p(magnitude)

        # Analyze radial power spectrum
        center = np.array(log_magnitude.shape) // 2
        y, x = np.ogrid[:log_magnitude.shape[0], :log_magnitude.shape[1]]
        r = np.sqrt((x - center[1])**2 + (y - center[0])**2).astype(int)

        radial_profile = np.bincount(
            r.ravel(), log_magnitude.ravel()
        ) / np.bincount(r.ravel())

        # GAN artifacts: unusual energy in mid-high frequencies
        mid_high = radial_profile[len(radial_profile)//4:3*len(radial_profile)//4]
        anomaly_score = np.std(mid_high) / (np.mean(mid_high) + 1e-8)

        # Threshold determined empirically
        is_fake = anomaly_score > 1.5

        return DeepfakeResult(
            is_fake=is_fake,
            confidence=min(anomaly_score / 3.0, 1.0),
            method='frequency_analysis',
            details={'anomaly_score': float(anomaly_score)},
        )

    def detect_face_boundary(
        self, image: np.ndarray
    ) -> DeepfakeResult:
        """
        Face boundary analysis.
        Deepfake face swaps often have blending artifacts at face edges.
        """
        face_cascade = cv2.CascadeClassifier(
            cv2.data.haarcascades + 'haarcascade_frontalface_default.xml'
        )

        gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
        faces = face_cascade.detectMultiScale(gray, 1.1, 4)

        if len(faces) == 0:
            return DeepfakeResult(
                is_fake=False, confidence=0.0,
                method='face_boundary',
                details={'reason': 'no face detected'},
            )

        # Analyze face boundary region for each detected face
        boundary_scores = []
        for (x, y, w, h) in faces:
            # Extract face boundary ring (10% outside face bbox)
            margin = int(0.1 * max(w, h))
            face_region = image[
                max(0, y-margin):y+h+margin,
                max(0, x-margin):x+w+margin
            ]

            # Compute Laplacian (edge detection) in boundary region
            laplacian = cv2.Laplacian(face_region, cv2.CV_64F)
            edge_variance = laplacian.var()
            boundary_scores.append(edge_variance)

        avg_score = np.mean(boundary_scores)
        # High edge variance at boundaries suggests blending artifacts
        is_fake = avg_score > 500

        return DeepfakeResult(
            is_fake=is_fake,
            confidence=min(avg_score / 1000.0, 1.0),
            method='face_boundary',
            details={'boundary_scores': [float(s) for s in boundary_scores]},
        )

    def ensemble_detect(
        self, image_path: str
    ) -> DeepfakeResult:
        """Run all detectors and vote."""
        image = cv2.imread(image_path)
        results = [
            self.detect_frequency_artifacts(image),
            self.detect_face_boundary(image),
        ]

        fake_votes = sum(1 for r in results if r.is_fake)
        avg_confidence = np.mean([r.confidence for r in results])

        return DeepfakeResult(
            is_fake=fake_votes > len(results) / 2,
            confidence=float(avg_confidence),
            method='ensemble',
            details={'sub_results': [r.__dict__ for r in results]},
        )

四、版权匹配(Content Fingerprinting)

4.1 感知哈希原理

版权检测的核心技术是内容指纹——对视频/音频生成紧凑的哈希值,即使经过轻微修改(裁剪、压缩、变速)仍能匹配。

# fingerprint.py
import imagehash
from PIL import Image
import numpy as np


class VideoFingerprinter:
    """
    Video fingerprinting using perceptual hashing.

    Algorithm pipeline:
    1. Extract keyframes at fixed intervals
    2. Compute perceptual hash for each frame
    3. Store as fingerprint sequence
    4. Match against database using Hamming distance
    """

    def __init__(self, hash_size: int = 16, fps: float = 0.5):
        self.hash_size = hash_size
        self.fps = fps  # frames per second to sample

    def fingerprint(self, video_path: str) -> list[str]:
        """Generate fingerprint sequence for a video."""
        keyframes = extract_keyframes(
            video_path, '/tmp/fp_frames',
            strategy='uniform', fps=self.fps
        )

        hashes = []
        for frame_path in keyframes:
            img = Image.open(frame_path)
            # Perceptual hash: robust to compression, scaling
            phash = imagehash.phash(img, hash_size=self.hash_size)
            hashes.append(str(phash))

        return hashes

    def match(
        self,
        query_hashes: list[str],
        db_hashes: list[str],
        threshold: int = 10,      # max Hamming distance
        min_matches: int = 3,     # min consecutive matching frames
    ) -> dict:
        """
        Check if query video matches a database entry.
        Uses sliding window to find longest matching subsequence.
        """
        matches = []

        for i, q_hash in enumerate(query_hashes):
            q = imagehash.hex_to_hash(q_hash)
            for j, db_hash in enumerate(db_hashes):
                db = imagehash.hex_to_hash(db_hash)
                distance = q - db  # Hamming distance
                if distance <= threshold:
                    matches.append((i, j, distance))

        # Find consecutive matches (sliding window)
        if len(matches) < min_matches:
            return {'match': False, 'similarity': 0.0}

        # Check for sequential alignment
        consecutive = self._find_consecutive_matches(matches, min_matches)

        if consecutive:
            avg_distance = np.mean([m[2] for m in consecutive])
            similarity = 1.0 - (avg_distance / (self.hash_size ** 2))
            return {
                'match': True,
                'similarity': float(similarity),
                'matched_segments': len(consecutive),
                'query_range': (consecutive[0][0], consecutive[-1][0]),
                'db_range': (consecutive[0][1], consecutive[-1][1]),
            }

        return {'match': False, 'similarity': 0.0}

    @staticmethod
    def _find_consecutive_matches(
        matches: list[tuple], min_count: int
    ) -> list[tuple] | None:
        """Find longest run of sequential matches."""
        if not matches:
            return None

        matches.sort(key=lambda m: m[0])
        best_run = []
        current_run = [matches[0]]

        for i in range(1, len(matches)):
            prev_q, prev_db = matches[i-1][0], matches[i-1][1]
            curr_q, curr_db = matches[i][0], matches[i][1]

            if curr_q == prev_q + 1 and abs(curr_db - prev_db) <= 2:
                current_run.append(matches[i])
            else:
                if len(current_run) > len(best_run):
                    best_run = current_run
                current_run = [matches[i]]

        if len(current_run) > len(best_run):
            best_run = current_run

        return best_run if len(best_run) >= min_count else None

五、审核 Pipeline 整体设计

                    视频上传
                       |
                       v
              [预处理 & 关键帧提取]
                       |
          +------------+------------+
          |            |            |
          v            v            v
      [NSFW 检测]  [版权匹配]  [深度伪造]
          |            |            |
          v            v            v
              [决策聚合引擎]
                   |
        +----------+----------+
        |          |          |
        v          v          v
     [通过]    [人工复核]   [拒绝]
                   |
                   v
            [人工审核台]
                   |
              +----+----+
              |         |
              v         v
           [通过]    [拒绝]

决策聚合规则

# moderation_pipeline.py
from enum import Enum
from dataclasses import dataclass


class Verdict(Enum):
    APPROVED = 'approved'
    REJECTED = 'rejected'
    REVIEW = 'manual_review'


@dataclass
class ModerationResult:
    verdict: Verdict
    reasons: list[str]
    confidence: float
    details: dict


def aggregate_decisions(
    nsfw_result: dict,
    copyright_result: dict,
    deepfake_result: DeepfakeResult,
) -> ModerationResult:
    """
    Aggregate multiple detection results into final verdict.

    Policy: any hard rejection -> reject
            any uncertain -> manual review
            all clear -> approve
    """
    reasons = []
    min_confidence = 1.0

    # NSFW: explicit = reject, suggestive = review
    if nsfw_result['verdict'] == 'explicit':
        return ModerationResult(
            verdict=Verdict.REJECTED,
            reasons=['NSFW content detected (explicit)'],
            confidence=nsfw_result['confidence'],
            details={'nsfw': nsfw_result},
        )
    elif nsfw_result['verdict'] == 'suggestive':
        reasons.append('Potentially suggestive content')
        min_confidence = min(min_confidence, nsfw_result['confidence'])

    # Copyright: match = reject (or review for partial)
    if copyright_result.get('match'):
        similarity = copyright_result['similarity']
        if similarity > 0.9:
            return ModerationResult(
                verdict=Verdict.REJECTED,
                reasons=[f"Copyright match ({similarity:.0%} similarity)"],
                confidence=similarity,
                details={'copyright': copyright_result},
            )
        elif similarity > 0.6:
            reasons.append(f"Possible copyright issue ({similarity:.0%})")
            min_confidence = min(min_confidence, 1.0 - similarity)

    # Deepfake: high confidence fake = review (not auto-reject)
    if deepfake_result.is_fake and deepfake_result.confidence > 0.7:
        reasons.append('Potential deepfake content')
        min_confidence = min(min_confidence, 1.0 - deepfake_result.confidence)

    # Final decision
    if reasons:
        return ModerationResult(
            verdict=Verdict.REVIEW,
            reasons=reasons,
            confidence=min_confidence,
            details={
                'nsfw': nsfw_result,
                'copyright': copyright_result,
                'deepfake': deepfake_result.__dict__,
            },
        )

    return ModerationResult(
        verdict=Verdict.APPROVED,
        reasons=['All checks passed'],
        confidence=min_confidence,
        details={},
    )

六、性能优化与部署

关键性能指标

指标 目标 实现手段
审核延迟 < 30 秒/视频(1 分钟视频) GPU 推理 + 并行检测
吞吐量 > 1000 视频/小时 水平扩展 + 消息队列
误报率 < 5% (NSFW) 模型微调 + 多级阈值
漏报率 < 0.1% (NSFW explicit) 保守阈值 + 人工兜底

部署架构建议

  1. 消息队列驱动:视频上传后发送消息到审核队列(Redis / RabbitMQ),Worker 异步消费
  2. GPU Worker 池:NSFW 和 Deepfake 检测需要 GPU,按需扩缩容
  3. 指纹数据库:版权指纹存储在向量数据库(如 Milvus)中,支持亿级检索
  4. 人工审核台:为 REVIEW 状态的视频提供审核 UI,展示检测器的具体发现

合规注意事项

  • 审核日志必须保留(通常 6 个月至 3 年),包含时间戳、检测结果、审核决策
  • 人工审核必须支持"复议"流程
  • NSFW 模型的训练数据本身可能涉及敏感内容,需要妥善管理
  • 深度伪造检测结果不能作为法律证据,只能作为风险标记

Maurice | maurice_wen@proton.me