代码审查 Agent 模板

从 Diff 解析到多维评审:构建结构化代码审查智能体

一、为什么需要代码审查 Agent

人工 Code Review 有三个固有瓶颈:一是时间瓶颈,高级工程师的审查时间是团队最稀缺的资源;二是一致性瓶颈,不同审查者的标准不一致,同一个人在疲劳时标准也会下降;三是覆盖率瓶颈,大型 PR 中细节问题容易被忽略。Code Review Agent 不是替代人工审查,而是做"第一轮筛选",让人工审查聚焦在架构决策和业务逻辑上。

二、架构总览

 代码审查 Agent 架构
 ================================================================

 GitHub/GitLab Webhook
   |
   v
 +---------------------------+
 |    Webhook Receiver       |    ← PR opened / updated 事件
 +---------------------------+
   |
   v
 +---------------------------+
 |    Diff Parser            |    ← 提取变更文件、增删行、上下文
 +---------------------------+
   |
   v
 +---------------------------+
 |    File Router            |    ← 按文件类型分发到专项审查器
 +---------------------------+
   |         |         |         |
   v         v         v         v
 +------+ +------+ +------+ +--------+
 | 安全 | | 性能 | | 风格 | | 正确性 |
 | 审查 | | 审查 | | 审查 | | 审查   |
 +------+ +------+ +------+ +--------+
   |         |         |         |
   +----+----+----+----+---------+
        |
        v
 +---------------------------+
 |    Result Aggregator      |    ← 去重、排序、格式化
 +---------------------------+
   |
   v
 +---------------------------+
 |    PR Comment Publisher   |    ← 发布为 inline comments
 +---------------------------+

三、Diff 解析器

审查的第一步是准确理解"改了什么"。Git diff 的原始格式对 LLM 来说噪音太大,需要结构化处理。

from dataclasses import dataclass, field
import re


@dataclass
class HunkLine:
    line_number: int
    content: str
    change_type: str  # "added" | "removed" | "context"


@dataclass
class DiffHunk:
    old_start: int
    old_count: int
    new_start: int
    new_count: int
    lines: list[HunkLine] = field(default_factory=list)


@dataclass
class FileDiff:
    path: str
    old_path: str | None  # 重命名场景
    status: str  # "added" | "modified" | "deleted" | "renamed"
    hunks: list[DiffHunk] = field(default_factory=list)
    language: str = "unknown"

    @property
    def additions(self) -> int:
        return sum(1 for h in self.hunks for l in h.lines if l.change_type == "added")

    @property
    def deletions(self) -> int:
        return sum(1 for h in self.hunks for l in h.lines if l.change_type == "removed")


HUNK_HEADER_RE = re.compile(r'^@@ -(\d+),?(\d*) \+(\d+),?(\d*) @@')


def parse_unified_diff(diff_text: str) -> list[FileDiff]:
    """解析 unified diff 格式,返回结构化的文件变更列表。"""
    files: list[FileDiff] = []
    current_file: FileDiff | None = None
    current_hunk: DiffHunk | None = None
    line_counter = 0

    for raw_line in diff_text.splitlines():
        # 文件头
        if raw_line.startswith("diff --git"):
            parts = raw_line.split(" b/")
            path = parts[-1] if len(parts) > 1 else "unknown"
            current_file = FileDiff(path=path, old_path=None, status="modified")
            current_file.language = _detect_language(path)
            files.append(current_file)
            current_hunk = None
            continue

        if raw_line.startswith("new file"):
            if current_file:
                current_file.status = "added"
            continue

        if raw_line.startswith("deleted file"):
            if current_file:
                current_file.status = "deleted"
            continue

        # Hunk 头
        match = HUNK_HEADER_RE.match(raw_line)
        if match and current_file:
            current_hunk = DiffHunk(
                old_start=int(match.group(1)),
                old_count=int(match.group(2) or 1),
                new_start=int(match.group(3)),
                new_count=int(match.group(4) or 1),
            )
            current_file.hunks.append(current_hunk)
            line_counter = current_hunk.new_start
            continue

        # 变更行
        if current_hunk is not None:
            if raw_line.startswith("+") and not raw_line.startswith("+++"):
                current_hunk.lines.append(
                    HunkLine(line_counter, raw_line[1:], "added")
                )
                line_counter += 1
            elif raw_line.startswith("-") and not raw_line.startswith("---"):
                current_hunk.lines.append(
                    HunkLine(line_counter, raw_line[1:], "removed")
                )
            else:
                current_hunk.lines.append(
                    HunkLine(line_counter, raw_line[1:] if raw_line.startswith(" ") else raw_line, "context")
                )
                line_counter += 1

    return files


LANGUAGE_MAP = {
    ".py": "python", ".js": "javascript", ".ts": "typescript",
    ".tsx": "typescript", ".go": "go", ".rs": "rust",
    ".java": "java", ".rb": "ruby", ".sql": "sql",
}


def _detect_language(path: str) -> str:
    for ext, lang in LANGUAGE_MAP.items():
        if path.endswith(ext):
            return lang
    return "unknown"

四、多维审查系统

每个维度是一个独立的审查器,有自己的 System Prompt 和关注点。

 四维审查矩阵
 =============================================

 维度          关注点                     严重级别
 -------       -----------------------    ----------
 安全          SQL注入/XSS/硬编码密钥     CRITICAL
 (Security)    权限绕过/路径遍历           HIGH

 性能          N+1查询/大循环/内存泄漏     HIGH
 (Performance) 缺少索引/无分页/阻塞调用    MEDIUM

 风格          命名不规范/过长函数          LOW
 (Style)       缺少注释/不一致的格式        INFO

 正确性        边界条件/空指针/类型错误     HIGH
 (Correctness) 竞态条件/资源未释放          CRITICAL

五、System Prompt 模板(安全审查维度示例)

# 角色
你是一名高级安全工程师,负责对代码变更进行安全审查。

# 审查范围
仅审查 diff 中新增和修改的代码行。不评论已删除的代码。

# 审查清单
按以下优先级逐项检查:

1. [CRITICAL] 硬编码凭证
   - API keys, tokens, passwords, connection strings
   - 检查: 正则匹配常见密钥模式 (sk-xxx, ghp_xxx, AKIA...)

2. [CRITICAL] 注入漏洞
   - SQL injection: 字符串拼接构造查询
   - XSS: 未转义的用户输入直接渲染
   - Command injection: 用户输入传入 os.system/subprocess

3. [HIGH] 认证与授权
   - 接口缺少认证中间件
   - 权限检查缺失或可绕过
   - JWT/session 处理不当

4. [HIGH] 数据暴露
   - 日志中输出敏感数据
   - 错误信息泄露内部实现细节
   - API 响应包含多余字段

5. [MEDIUM] 依赖安全
   - 新增依赖是否有已知漏洞
   - 依赖版本是否锁定

# 输出格式
对每个发现,输出:
- severity: CRITICAL / HIGH / MEDIUM / LOW
- file: 文件路径
- line: 行号
- category: 问题类别
- description: 问题描述(1-2句)
- suggestion: 修复建议(含代码示例)

六、结构化输出格式

from dataclasses import dataclass


@dataclass
class ReviewFinding:
    severity: str        # CRITICAL / HIGH / MEDIUM / LOW / INFO
    dimension: str       # security / performance / style / correctness
    file: str
    line: int
    category: str
    description: str
    suggestion: str
    confidence: float    # 0.0 - 1.0


@dataclass
class ReviewReport:
    pr_number: int
    total_files: int
    total_additions: int
    total_deletions: int
    findings: list[ReviewFinding]
    summary: str

    @property
    def critical_count(self) -> int:
        return sum(1 for f in self.findings if f.severity == "CRITICAL")

    @property
    def should_block(self) -> bool:
        """是否应该阻塞合并:存在 CRITICAL 或 3 个以上 HIGH。"""
        high_count = sum(1 for f in self.findings if f.severity == "HIGH")
        return self.critical_count > 0 or high_count >= 3

    def to_markdown(self) -> str:
        """转换为 PR comment 格式的 Markdown。"""
        lines = [f"## Code Review Report\n"]
        lines.append(f"Files: {self.total_files} | "
                     f"+{self.total_additions} / -{self.total_deletions}\n")

        if self.should_block:
            lines.append("**STATUS: CHANGES REQUESTED**\n")
        else:
            lines.append("**STATUS: APPROVED (with suggestions)**\n")

        # 按严重级别分组
        for severity in ("CRITICAL", "HIGH", "MEDIUM", "LOW"):
            group = [f for f in self.findings if f.severity == severity]
            if not group:
                continue
            lines.append(f"\n### {severity} ({len(group)})\n")
            for finding in group:
                lines.append(
                    f"- **[{finding.dimension}]** `{finding.file}:{finding.line}` "
                    f"- {finding.description}\n"
                    f"  Suggestion: {finding.suggestion}\n"
                )

        lines.append(f"\n---\n{self.summary}")
        return "\n".join(lines)

七、GitHub Webhook 集成

 Webhook 事件处理流程
 =============================================

 GitHub 发送 webhook
   |
   v
 验证签名 (HMAC-SHA256)
   |
   v
 过滤事件类型
   |
   +-- pull_request.opened     --> 全量审查
   +-- pull_request.synchronize --> 增量审查(仅新 commits)
   +-- pull_request.reopened   --> 全量审查
   +-- 其他                    --> 忽略
   |
   v
 调用 GitHub API 获取 diff
   |
   v
 解析 + 审查 + 发布 comments
import hashlib
import hmac


def verify_github_signature(payload: bytes, signature: str, secret: str) -> bool:
    """验证 GitHub webhook 签名。"""
    expected = "sha256=" + hmac.new(
        secret.encode(), payload, hashlib.sha256
    ).hexdigest()
    return hmac.compare_digest(expected, signature)

八、与现有工具的对比

特性 自建 Agent CodeRabbit Qodo (CodiumAI)
定制化程度 完全可控 配置文件有限 中等
审查维度 自定义 预设 预设+自定义规则
私有部署 支持 仅企业版 仅企业版
模型选择 任意 固定 固定
成本 按 API 调用计费 按席位 按席位
集成深度 与内部系统深度集成 GitHub/GitLab GitHub/GitLab/Bitbucket
学习曲线 高(需开发)

选型建议: 小团队直接用 CodeRabbit/Qodo,快速获得价值。中大型团队或有特殊合规要求(代码不能出企业网络)的场景,自建 Agent 是唯一选择。

九、Few-shot 示例(提升审查质量)

在 System Prompt 中加入 few-shot 示例,能显著提升审查质量和一致性。

# 示例 1: 安全问题
Diff:
+ query = f"SELECT * FROM users WHERE name = '{user_input}'"
+ cursor.execute(query)

Review:
severity: CRITICAL
category: sql_injection
description: 用户输入直接拼接到 SQL 字符串中,存在 SQL 注入风险
suggestion: 使用参数化查询 cursor.execute("SELECT * FROM users WHERE name = %s", (user_input,))

# 示例 2: 性能问题
Diff:
+ for order in orders:
+     items = db.query(OrderItem).filter_by(order_id=order.id).all()

Review:
severity: HIGH
category: n_plus_one_query
description: 循环内执行数据库查询,当 orders 数量大时产生 N+1 问题
suggestion: 使用 joinedload 或提前批量查询 db.query(OrderItem).filter(OrderItem.order_id.in_(order_ids))

十、关键设计决策

  1. 为什么用四个独立审查器而不是一个全能 Prompt? 单一 Prompt 在长 diff 上容易丢失关注点(lost in the middle 效应)。拆分后每个审查器只需处理自己维度的规则,Token 消耗更低,准确率更高。

  2. 为什么要结构化输出而不是自由文本? 结构化输出可以被程序解析,用于自动化决策(是否阻塞合并)、数据聚合(团队审查趋势)、去重(同一问题不重复评论)。

  3. 为什么 confidence 字段很重要? LLM 的审查会有误报。confidence < 0.7 的发现标记为"建议"而非"问题",给开发者判断空间,减少审查疲劳。

  4. 为什么增量审查只看新 commits? 全量审查每次 push 都重跑,在大型 PR 上代价高昂。增量审查只处理新变更,但前提是首次 opened 时已做过全量审查。


Maurice | maurice_wen@proton.me