数据分析 Agent 设计模式
AI 导读
数据分析 Agent 设计模式 Text2SQL 到自动可视化:构建安全可控的数据探索智能体 一、问题本质 数据分析 Agent 解决的核心问题是:让不会写 SQL 的人也能从数据库中获取洞察。但"生成 SQL"只是冰山一角,真正的工程挑战在于:如何让 Agent 理解 schema、生成安全的查询、处理执行错误、选择合适的可视化方式、并用业务语言解释结果。 二、架构总览 数据分析 Agent...
数据分析 Agent 设计模式
Text2SQL 到自动可视化:构建安全可控的数据探索智能体
一、问题本质
数据分析 Agent 解决的核心问题是:让不会写 SQL 的人也能从数据库中获取洞察。但"生成 SQL"只是冰山一角,真正的工程挑战在于:如何让 Agent 理解 schema、生成安全的查询、处理执行错误、选择合适的可视化方式、并用业务语言解释结果。
二、架构总览
数据分析 Agent 五阶段工作流
================================================================
用户自然语言提问
|
v
+---------------------------+
| Stage 1: Schema 理解 | ← 表/列/关系/业务含义
+---------------------------+
|
v
+---------------------------+
| Stage 2: SQL 生成 | ← Text2SQL (三种策略)
+---------------------------+
|
v
+---------------------------+
| Stage 3: 安全执行 | ← 只读 + 超时 + 行数限制
+---------------------------+
|
v
+---------------------------+
| Stage 4: 可视化选择 | ← 根据数据形态自动选图表类型
+---------------------------+
|
v
+---------------------------+
| Stage 5: 业务解读 | ← 用自然语言解释数据含义
+---------------------------+
三、Schema 理解层
Agent 不能每次查询都扫描全库 schema。我们构建一个 Schema Registry 作为中间层,预先整理表的业务含义和关系。
from dataclasses import dataclass, field
@dataclass
class ColumnMeta:
name: str
dtype: str
description: str
is_pii: bool = False
sample_values: list[str] = field(default_factory=list)
@dataclass
class TableMeta:
name: str
description: str
columns: list[ColumnMeta]
row_count_approx: int = 0
relationships: list[str] = field(default_factory=list)
# Schema Registry: 业务语义到数据库结构的映射
SCHEMA_REGISTRY: list[TableMeta] = [
TableMeta(
name="orders",
description="订单主表,记录所有交易订单",
columns=[
ColumnMeta("order_id", "VARCHAR(20)", "订单唯一编号"),
ColumnMeta("user_id", "BIGINT", "下单用户ID", is_pii=True),
ColumnMeta("total_amount", "DECIMAL(12,2)", "订单总金额(元)"),
ColumnMeta("status", "VARCHAR(20)", "订单状态",
sample_values=["pending", "paid", "shipped", "completed", "refunded"]),
ColumnMeta("created_at", "TIMESTAMP", "下单时间"),
],
row_count_approx=5_000_000,
relationships=["orders.user_id -> users.id", "orders.order_id -> order_items.order_id"],
),
TableMeta(
name="order_items",
description="订单明细表,每笔订单包含的商品",
columns=[
ColumnMeta("item_id", "BIGINT", "明细行ID"),
ColumnMeta("order_id", "VARCHAR(20)", "所属订单ID"),
ColumnMeta("product_id", "BIGINT", "商品ID"),
ColumnMeta("quantity", "INT", "购买数量"),
ColumnMeta("unit_price", "DECIMAL(10,2)", "单价(元)"),
],
row_count_approx=15_000_000,
relationships=["order_items.product_id -> products.id"],
),
]
def build_schema_prompt(tables: list[TableMeta]) -> str:
"""将 Schema Registry 转换为 LLM 可理解的上下文。"""
lines = ["# Database Schema\n"]
for t in tables:
lines.append(f"## Table: {t.name}")
lines.append(f"Description: {t.description}")
lines.append(f"Approx rows: {t.row_count_approx:,}")
lines.append("Columns:")
for c in t.columns:
pii_tag = " [PII]" if c.is_pii else ""
samples = f" (e.g., {', '.join(c.sample_values)})" if c.sample_values else ""
lines.append(f" - {c.name} ({c.dtype}): {c.description}{pii_tag}{samples}")
if t.relationships:
lines.append("Relationships: " + "; ".join(t.relationships))
lines.append("")
return "\n".join(lines)
四、Text2SQL 三种策略对比
策略选择决策树
=============================================
用户查询复杂度?
|
+-- 简单(单表 + 条件过滤)
| --> 模板填充法 (Template)
| 延迟: ~10ms / 准确率: 99% / 灵活性: 低
|
+-- 中等(多表 JOIN + 聚合)
| --> LLM 直接生成 (Prompt Engineering)
| 延迟: ~800ms / 准确率: 85-90% / 灵活性: 高
|
+-- 复杂(窗口函数 + 子查询 + 业务逻辑)
--> 微调模型 (Fine-tuned)
延迟: ~400ms / 准确率: 90-95% / 灵活性: 高
LLM 直接生成是大多数场景的最佳起点。以下是基于 LangChain 的实现。
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI
SQL_GENERATION_PROMPT = ChatPromptTemplate.from_messages([
("system", """You are a SQL expert. Generate a PostgreSQL query based on the user's question.
{schema_context}
Rules:
1. ONLY generate SELECT statements. Never INSERT/UPDATE/DELETE/DROP.
2. Always add LIMIT {max_rows} unless the user explicitly asks for all rows.
3. Use table aliases for readability (e.g., o for orders).
4. For time ranges, use created_at with explicit timezone (AT TIME ZONE 'Asia/Shanghai').
5. Never select PII columns (marked [PII]) unless the user explicitly requests them.
6. Wrap the SQL in ```sql ... ``` code block.
7. Before the SQL, write a one-line explanation of the query logic.
"""),
("human", "{question}"),
])
def create_sql_chain(schema_context: str, max_rows: int = 1000):
"""构建 Text2SQL chain,输出为纯 SQL 字符串。"""
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
chain = (
SQL_GENERATION_PROMPT.partial(
schema_context=schema_context,
max_rows=str(max_rows),
)
| llm
| StrOutputParser()
)
return chain
def extract_sql(llm_output: str) -> str:
"""从 LLM 输出中提取 SQL 代码块。"""
if "```sql" in llm_output:
start = llm_output.index("```sql") + 6
end = llm_output.index("```", start)
return llm_output[start:end].strip()
return llm_output.strip()
五、安全执行层(Guardrails)
这是整个系统最关键的防线。数据库是不可逆的,一条错误的写操作可能造成灾难。
import re
import asyncio
from dataclasses import dataclass
@dataclass
class ExecutionConfig:
max_rows: int = 1000
timeout_seconds: int = 30
max_query_cost: int = 100_000 # PostgreSQL estimated cost
blocked_keywords: tuple = (
"INSERT", "UPDATE", "DELETE", "DROP", "TRUNCATE",
"ALTER", "CREATE", "GRANT", "REVOKE", "COPY",
)
class QueryGuard:
"""SQL 执行守卫:确保查询安全、可控、可审计。"""
def __init__(self, config: ExecutionConfig | None = None):
self.config = config or ExecutionConfig()
def validate(self, sql: str) -> tuple[bool, str]:
"""验证 SQL 安全性,返回 (is_safe, reason)。"""
upper_sql = sql.upper().strip()
# 规则 1:禁止写操作
for kw in self.config.blocked_keywords:
pattern = rf'\b{kw}\b'
if re.search(pattern, upper_sql):
return False, f"blocked_keyword: {kw}"
# 规则 2:必须以 SELECT 或 WITH 开头
if not (upper_sql.startswith("SELECT") or upper_sql.startswith("WITH")):
return False, "query_must_start_with_SELECT_or_WITH"
# 规则 3:禁止多语句(分号分隔的多条 SQL)
statements = [s.strip() for s in sql.split(";") if s.strip()]
if len(statements) > 1:
return False, "multi_statement_not_allowed"
# 规则 4:强制行数限制
if "LIMIT" not in upper_sql:
return False, "missing_LIMIT_clause"
return True, "passed"
async def execute_safe(self, pool, sql: str) -> dict:
"""安全执行 SQL:验证 + 超时 + 行数限制。"""
is_safe, reason = self.validate(sql)
if not is_safe:
return {"status": "blocked", "reason": reason, "rows": []}
try:
async with asyncio.timeout(self.config.timeout_seconds):
async with pool.acquire() as conn:
# 使用只读事务
async with conn.transaction(readonly=True):
rows = await conn.fetch(sql)
if len(rows) > self.config.max_rows:
rows = rows[:self.config.max_rows]
return {
"status": "success",
"row_count": len(rows),
"rows": [dict(r) for r in rows],
}
except asyncio.TimeoutError:
return {"status": "timeout", "reason": f"exceeded_{self.config.timeout_seconds}s"}
except Exception as e:
return {"status": "error", "reason": str(e), "rows": []}
六、可视化自动选择
数据形态 -> 图表类型映射
=============================================
单值(标量)
--> 大数字卡片 (Metric Card)
时间序列(日期 + 数值)
--> 折线图 (Line Chart)
分类对比(类别 + 数值,类别 <= 10)
--> 柱状图 (Bar Chart)
占比分布(类别 + 数值,类别 <= 7)
--> 饼图 / 环形图 (Pie / Donut)
二维分布(两个数值列)
--> 散点图 (Scatter Plot)
多行多列(明细数据)
--> 表格 (Table)
地理数据(含经纬度或地区名)
--> 地图 (Map)
from enum import Enum
class ChartType(Enum):
METRIC_CARD = "metric_card"
LINE_CHART = "line_chart"
BAR_CHART = "bar_chart"
PIE_CHART = "pie_chart"
SCATTER_PLOT = "scatter_plot"
TABLE = "table"
def infer_chart_type(columns: list[dict], row_count: int) -> ChartType:
"""根据数据形态推断最佳图表类型。"""
col_types = {c["name"]: c["dtype"] for c in columns}
num_cols = [n for n, t in col_types.items() if t in ("int", "float", "decimal")]
date_cols = [n for n, t in col_types.items() if t in ("date", "timestamp")]
cat_cols = [n for n, t in col_types.items() if t in ("varchar", "text")]
# 单值
if row_count == 1 and len(num_cols) == 1:
return ChartType.METRIC_CARD
# 时间序列
if date_cols and num_cols:
return ChartType.LINE_CHART
# 分类对比
if cat_cols and num_cols and row_count <= 10:
return ChartType.BAR_CHART
# 占比
if cat_cols and num_cols and row_count <= 7:
return ChartType.PIE_CHART
# 二维分布
if len(num_cols) >= 2 and not cat_cols:
return ChartType.SCATTER_PLOT
return ChartType.TABLE
七、System Prompt 模板
# 角色
你是一名数据分析助手,帮助用户从数据库中获取业务洞察。
# 工作流程
1. 理解用户问题,必要时追问以明确查询范围(时间/维度/指标)
2. 基于 Schema 生成 SQL,遵循安全规则
3. 解读查询结果,用业务语言回答(不是技术语言)
4. 推荐合适的可视化方式
5. 主动提出进一步分析建议
# SQL 生成规则
- 仅生成 SELECT 语句
- 默认 LIMIT 1000
- 时间默认最近 30 天(除非用户指定)
- 金额字段保留 2 位小数
- 避免 SELECT *,只选需要的列
- PII 字段(用户姓名/手机/地址)默认脱敏
# 解读规则
- 先给结论("上月销售额环比增长 12%"),再给数据支撑
- 标出异常值(超过均值 2 个标准差)
- 对比维度:环比/同比/与目标对比
八、错误恢复策略
SQL 执行错误处理流程
=============================================
执行失败
|
+-- 语法错误 (SyntaxError)
| --> 提取错误位置 + 修正 SQL + 重试(最多 2 次)
|
+-- 表/列不存在 (UndefinedTable/Column)
| --> 重新检索 Schema Registry + 纠正表名/列名 + 重试
|
+-- 超时 (Timeout)
| --> 分析查询计划 + 添加索引建议 + 缩小范围重试
|
+-- 权限不足 (PermissionDenied)
| --> 告知用户该表无权限 + 建议联系 DBA
|
+-- 其他错误
--> 记录完整错误栈 + 告知用户 + 建议重新描述需求
九、关键设计决策
为什么用 Schema Registry 而不是实时 INFORMATION_SCHEMA? 实时查询元数据在大型数据库上耗时数秒,且缺少业务语义。预构建的 Registry 同时解决了性能和语义两个问题。
为什么 Guardrails 用规则而不是 LLM 判断? SQL 安全性是二值判断(安全/不安全),规则引擎的确定性远高于 LLM。我们不能接受 95% 的安全率,必须是 100%。
为什么默认 LIMIT 1000? 超过 1000 行的结果在前端几乎无法有效展示。如果用户真的需要大数据集,应该导出为文件而不是在界面上渲染。
为什么不支持写操作? 数据分析 Agent 的定位是"只读观察者"。写操作需要完全不同的审批和回滚机制,应该由专门的数据管理 Agent 处理。
Maurice | maurice_wen@proton.me