数据分析 Agent 设计模式

Text2SQL 到自动可视化:构建安全可控的数据探索智能体

一、问题本质

数据分析 Agent 解决的核心问题是:让不会写 SQL 的人也能从数据库中获取洞察。但"生成 SQL"只是冰山一角,真正的工程挑战在于:如何让 Agent 理解 schema、生成安全的查询、处理执行错误、选择合适的可视化方式、并用业务语言解释结果。

二、架构总览

 数据分析 Agent 五阶段工作流
 ================================================================

 用户自然语言提问
   |
   v
 +---------------------------+
 | Stage 1: Schema 理解      |    ← 表/列/关系/业务含义
 +---------------------------+
   |
   v
 +---------------------------+
 | Stage 2: SQL 生成         |    ← Text2SQL (三种策略)
 +---------------------------+
   |
   v
 +---------------------------+
 | Stage 3: 安全执行         |    ← 只读 + 超时 + 行数限制
 +---------------------------+
   |
   v
 +---------------------------+
 | Stage 4: 可视化选择       |    ← 根据数据形态自动选图表类型
 +---------------------------+
   |
   v
 +---------------------------+
 | Stage 5: 业务解读         |    ← 用自然语言解释数据含义
 +---------------------------+

三、Schema 理解层

Agent 不能每次查询都扫描全库 schema。我们构建一个 Schema Registry 作为中间层,预先整理表的业务含义和关系。

from dataclasses import dataclass, field


@dataclass
class ColumnMeta:
    name: str
    dtype: str
    description: str
    is_pii: bool = False
    sample_values: list[str] = field(default_factory=list)


@dataclass
class TableMeta:
    name: str
    description: str
    columns: list[ColumnMeta]
    row_count_approx: int = 0
    relationships: list[str] = field(default_factory=list)


# Schema Registry: 业务语义到数据库结构的映射
SCHEMA_REGISTRY: list[TableMeta] = [
    TableMeta(
        name="orders",
        description="订单主表,记录所有交易订单",
        columns=[
            ColumnMeta("order_id", "VARCHAR(20)", "订单唯一编号"),
            ColumnMeta("user_id", "BIGINT", "下单用户ID", is_pii=True),
            ColumnMeta("total_amount", "DECIMAL(12,2)", "订单总金额(元)"),
            ColumnMeta("status", "VARCHAR(20)", "订单状态",
                       sample_values=["pending", "paid", "shipped", "completed", "refunded"]),
            ColumnMeta("created_at", "TIMESTAMP", "下单时间"),
        ],
        row_count_approx=5_000_000,
        relationships=["orders.user_id -> users.id", "orders.order_id -> order_items.order_id"],
    ),
    TableMeta(
        name="order_items",
        description="订单明细表,每笔订单包含的商品",
        columns=[
            ColumnMeta("item_id", "BIGINT", "明细行ID"),
            ColumnMeta("order_id", "VARCHAR(20)", "所属订单ID"),
            ColumnMeta("product_id", "BIGINT", "商品ID"),
            ColumnMeta("quantity", "INT", "购买数量"),
            ColumnMeta("unit_price", "DECIMAL(10,2)", "单价(元)"),
        ],
        row_count_approx=15_000_000,
        relationships=["order_items.product_id -> products.id"],
    ),
]


def build_schema_prompt(tables: list[TableMeta]) -> str:
    """将 Schema Registry 转换为 LLM 可理解的上下文。"""
    lines = ["# Database Schema\n"]
    for t in tables:
        lines.append(f"## Table: {t.name}")
        lines.append(f"Description: {t.description}")
        lines.append(f"Approx rows: {t.row_count_approx:,}")
        lines.append("Columns:")
        for c in t.columns:
            pii_tag = " [PII]" if c.is_pii else ""
            samples = f" (e.g., {', '.join(c.sample_values)})" if c.sample_values else ""
            lines.append(f"  - {c.name} ({c.dtype}): {c.description}{pii_tag}{samples}")
        if t.relationships:
            lines.append("Relationships: " + "; ".join(t.relationships))
        lines.append("")
    return "\n".join(lines)

四、Text2SQL 三种策略对比

 策略选择决策树
 =============================================

 用户查询复杂度?
   |
   +-- 简单(单表 + 条件过滤)
   |     --> 模板填充法 (Template)
   |         延迟: ~10ms / 准确率: 99% / 灵活性: 低
   |
   +-- 中等(多表 JOIN + 聚合)
   |     --> LLM 直接生成 (Prompt Engineering)
   |         延迟: ~800ms / 准确率: 85-90% / 灵活性: 高
   |
   +-- 复杂(窗口函数 + 子查询 + 业务逻辑)
         --> 微调模型 (Fine-tuned)
             延迟: ~400ms / 准确率: 90-95% / 灵活性: 高

LLM 直接生成是大多数场景的最佳起点。以下是基于 LangChain 的实现。

from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI


SQL_GENERATION_PROMPT = ChatPromptTemplate.from_messages([
    ("system", """You are a SQL expert. Generate a PostgreSQL query based on the user's question.

{schema_context}

Rules:
1. ONLY generate SELECT statements. Never INSERT/UPDATE/DELETE/DROP.
2. Always add LIMIT {max_rows} unless the user explicitly asks for all rows.
3. Use table aliases for readability (e.g., o for orders).
4. For time ranges, use created_at with explicit timezone (AT TIME ZONE 'Asia/Shanghai').
5. Never select PII columns (marked [PII]) unless the user explicitly requests them.
6. Wrap the SQL in ```sql ... ``` code block.
7. Before the SQL, write a one-line explanation of the query logic.
"""),
    ("human", "{question}"),
])


def create_sql_chain(schema_context: str, max_rows: int = 1000):
    """构建 Text2SQL chain,输出为纯 SQL 字符串。"""
    llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)

    chain = (
        SQL_GENERATION_PROMPT.partial(
            schema_context=schema_context,
            max_rows=str(max_rows),
        )
        | llm
        | StrOutputParser()
    )
    return chain


def extract_sql(llm_output: str) -> str:
    """从 LLM 输出中提取 SQL 代码块。"""
    if "```sql" in llm_output:
        start = llm_output.index("```sql") + 6
        end = llm_output.index("```", start)
        return llm_output[start:end].strip()
    return llm_output.strip()

五、安全执行层(Guardrails)

这是整个系统最关键的防线。数据库是不可逆的,一条错误的写操作可能造成灾难。

import re
import asyncio
from dataclasses import dataclass


@dataclass
class ExecutionConfig:
    max_rows: int = 1000
    timeout_seconds: int = 30
    max_query_cost: int = 100_000  # PostgreSQL estimated cost
    blocked_keywords: tuple = (
        "INSERT", "UPDATE", "DELETE", "DROP", "TRUNCATE",
        "ALTER", "CREATE", "GRANT", "REVOKE", "COPY",
    )


class QueryGuard:
    """SQL 执行守卫:确保查询安全、可控、可审计。"""

    def __init__(self, config: ExecutionConfig | None = None):
        self.config = config or ExecutionConfig()

    def validate(self, sql: str) -> tuple[bool, str]:
        """验证 SQL 安全性,返回 (is_safe, reason)。"""
        upper_sql = sql.upper().strip()

        # 规则 1:禁止写操作
        for kw in self.config.blocked_keywords:
            pattern = rf'\b{kw}\b'
            if re.search(pattern, upper_sql):
                return False, f"blocked_keyword: {kw}"

        # 规则 2:必须以 SELECT 或 WITH 开头
        if not (upper_sql.startswith("SELECT") or upper_sql.startswith("WITH")):
            return False, "query_must_start_with_SELECT_or_WITH"

        # 规则 3:禁止多语句(分号分隔的多条 SQL)
        statements = [s.strip() for s in sql.split(";") if s.strip()]
        if len(statements) > 1:
            return False, "multi_statement_not_allowed"

        # 规则 4:强制行数限制
        if "LIMIT" not in upper_sql:
            return False, "missing_LIMIT_clause"

        return True, "passed"

    async def execute_safe(self, pool, sql: str) -> dict:
        """安全执行 SQL:验证 + 超时 + 行数限制。"""
        is_safe, reason = self.validate(sql)
        if not is_safe:
            return {"status": "blocked", "reason": reason, "rows": []}

        try:
            async with asyncio.timeout(self.config.timeout_seconds):
                async with pool.acquire() as conn:
                    # 使用只读事务
                    async with conn.transaction(readonly=True):
                        rows = await conn.fetch(sql)
                        if len(rows) > self.config.max_rows:
                            rows = rows[:self.config.max_rows]
                        return {
                            "status": "success",
                            "row_count": len(rows),
                            "rows": [dict(r) for r in rows],
                        }
        except asyncio.TimeoutError:
            return {"status": "timeout", "reason": f"exceeded_{self.config.timeout_seconds}s"}
        except Exception as e:
            return {"status": "error", "reason": str(e), "rows": []}

六、可视化自动选择

 数据形态 -> 图表类型映射
 =============================================

 单值(标量)
   --> 大数字卡片 (Metric Card)

 时间序列(日期 + 数值)
   --> 折线图 (Line Chart)

 分类对比(类别 + 数值,类别 <= 10)
   --> 柱状图 (Bar Chart)

 占比分布(类别 + 数值,类别 <= 7)
   --> 饼图 / 环形图 (Pie / Donut)

 二维分布(两个数值列)
   --> 散点图 (Scatter Plot)

 多行多列(明细数据)
   --> 表格 (Table)

 地理数据(含经纬度或地区名)
   --> 地图 (Map)
from enum import Enum


class ChartType(Enum):
    METRIC_CARD = "metric_card"
    LINE_CHART = "line_chart"
    BAR_CHART = "bar_chart"
    PIE_CHART = "pie_chart"
    SCATTER_PLOT = "scatter_plot"
    TABLE = "table"


def infer_chart_type(columns: list[dict], row_count: int) -> ChartType:
    """根据数据形态推断最佳图表类型。"""
    col_types = {c["name"]: c["dtype"] for c in columns}
    num_cols = [n for n, t in col_types.items() if t in ("int", "float", "decimal")]
    date_cols = [n for n, t in col_types.items() if t in ("date", "timestamp")]
    cat_cols = [n for n, t in col_types.items() if t in ("varchar", "text")]

    # 单值
    if row_count == 1 and len(num_cols) == 1:
        return ChartType.METRIC_CARD

    # 时间序列
    if date_cols and num_cols:
        return ChartType.LINE_CHART

    # 分类对比
    if cat_cols and num_cols and row_count <= 10:
        return ChartType.BAR_CHART

    # 占比
    if cat_cols and num_cols and row_count <= 7:
        return ChartType.PIE_CHART

    # 二维分布
    if len(num_cols) >= 2 and not cat_cols:
        return ChartType.SCATTER_PLOT

    return ChartType.TABLE

七、System Prompt 模板

# 角色
你是一名数据分析助手,帮助用户从数据库中获取业务洞察。

# 工作流程
1. 理解用户问题,必要时追问以明确查询范围(时间/维度/指标)
2. 基于 Schema 生成 SQL,遵循安全规则
3. 解读查询结果,用业务语言回答(不是技术语言)
4. 推荐合适的可视化方式
5. 主动提出进一步分析建议

# SQL 生成规则
- 仅生成 SELECT 语句
- 默认 LIMIT 1000
- 时间默认最近 30 天(除非用户指定)
- 金额字段保留 2 位小数
- 避免 SELECT *,只选需要的列
- PII 字段(用户姓名/手机/地址)默认脱敏

# 解读规则
- 先给结论("上月销售额环比增长 12%"),再给数据支撑
- 标出异常值(超过均值 2 个标准差)
- 对比维度:环比/同比/与目标对比

八、错误恢复策略

 SQL 执行错误处理流程
 =============================================

 执行失败
   |
   +-- 语法错误 (SyntaxError)
   |     --> 提取错误位置 + 修正 SQL + 重试(最多 2 次)
   |
   +-- 表/列不存在 (UndefinedTable/Column)
   |     --> 重新检索 Schema Registry + 纠正表名/列名 + 重试
   |
   +-- 超时 (Timeout)
   |     --> 分析查询计划 + 添加索引建议 + 缩小范围重试
   |
   +-- 权限不足 (PermissionDenied)
   |     --> 告知用户该表无权限 + 建议联系 DBA
   |
   +-- 其他错误
         --> 记录完整错误栈 + 告知用户 + 建议重新描述需求

九、关键设计决策

  1. 为什么用 Schema Registry 而不是实时 INFORMATION_SCHEMA? 实时查询元数据在大型数据库上耗时数秒,且缺少业务语义。预构建的 Registry 同时解决了性能和语义两个问题。

  2. 为什么 Guardrails 用规则而不是 LLM 判断? SQL 安全性是二值判断(安全/不安全),规则引擎的确定性远高于 LLM。我们不能接受 95% 的安全率,必须是 100%。

  3. 为什么默认 LIMIT 1000? 超过 1000 行的结果在前端几乎无法有效展示。如果用户真的需要大数据集,应该导出为文件而不是在界面上渲染。

  4. 为什么不支持写操作? 数据分析 Agent 的定位是"只读观察者"。写操作需要完全不同的审批和回滚机制,应该由专门的数据管理 Agent 处理。


Maurice | maurice_wen@proton.me