多智能体 GraphRAG:Agent 驱动的知识图谱检索
AI 导读
多智能体 GraphRAG:Agent 驱动的知识图谱检索 从被动检索到主动探索——当 AI Agent 学会在知识图谱中自主导航 引言:RAG 的第三次进化 检索增强生成(Retrieval-Augmented Generation, RAG)经历了三个阶段。第一阶段是朴素 RAG:把文档切片、向量化、top-k 召回,拼接到 prompt 里让 LLM 回答。第二阶段是...
多智能体 GraphRAG:Agent 驱动的知识图谱检索
从被动检索到主动探索——当 AI Agent 学会在知识图谱中自主导航
引言:RAG 的第三次进化
检索增强生成(Retrieval-Augmented Generation, RAG)经历了三个阶段。第一阶段是朴素 RAG:把文档切片、向量化、top-k 召回,拼接到 prompt 里让 LLM 回答。第二阶段是 GraphRAG:Microsoft 在 2024 年提出的方案,用 LLM 从语料中抽取实体和关系构建知识图谱,再基于 community detection 做分层摘要,使得"全局性问题"也能被有效回答。第三阶段——也就是本文要讨论的——是 Agentic GraphRAG:让自主 Agent 成为检索的驾驶员,动态决定"问什么、怎么问、问完之后还要不要继续问"。
这不是简单地给 Agent 挂一个图数据库工具。真正的变革在于:Agent 能够理解图的 schema、规划多跳遍历路径、在检索过程中发现知识缺口并触发图谱更新——从被动的"给我答案"变成主动的"让我探索"。
本文将从架构、设计模式、协议集成、实现代码、时序图谱、到生产部署六个维度,系统阐述如何构建一个多智能体驱动的 GraphRAG 系统。
一、从被动检索到主动探索
1.1 静态检索的三个瓶颈
即使是 GraphRAG,在面对复杂的实际查询时仍然存在结构性的局限。
瓶颈一:检索策略的僵化。 传统 RAG 的检索策略在 query 发出之前就已经固定——要么做 local search(基于实体邻域),要么做 global search(基于 community 摘要)。但真实问题往往需要在两种策略之间动态切换。例如,"这家公司最近三年的供应链风险事件,以及这些事件与行业监管政策变化之间的关联是什么?"这个问题的前半部分需要 local search(围绕特定公司实体做多跳遍历),后半部分需要 global search(跨行业的政策趋势总结),而两者之间还需要一个 cross-domain 的关联推理。
瓶颈二:单轮检索的天花板。 一次检索很少能获得足够的上下文。人类研究员在查资料时,会根据第一轮搜索结果调整关键词、追问细节、沿着引用链深入。但静态 RAG 只做一次检索就把结果交给 LLM,相当于让研究员只看第一页搜索结果就写报告。
瓶颈三:知识图谱的静态性。 图谱在构建之后就是一个快照。当 Agent 在检索过程中发现"图里没有这个实体"或"这条关系似乎过时了",传统架构没有机制让检索过程反过来驱动图谱更新。
1.2 Agent 带来的范式转换
将 Agent 引入 GraphRAG 后,检索从"函数调用"变成了"探索过程"。Agent 具备以下能力:
- 动态策略选择:根据问题的性质和已检索到的信息,实时决定是做向量检索、图遍历、还是结构化查询。
- 迭代式深入:第一轮检索的结果成为第二轮检索的输入,Agent 可以追问、关联、验证,直到信息充分。
- 跨域知识综合:不同领域的子图可以由不同的专业 Agent 负责检索,最终由 Orchestrator Agent 综合推理。
- 知识缺口感知:Agent 能识别出"图谱中缺少的连接",并触发自动化的知识补全流程。
这种范式转换的本质是:让 LLM 的推理能力与知识图谱的结构化语义形成闭环。Agent 不只是图谱的消费者,也是图谱的维护者。
1.3 多智能体的必要性
为什么需要"多"智能体?因为图谱驱动的检索任务天然具有可分解性。一个复杂查询可以被拆解为:
- 检索子任务:从图谱中提取相关子图
- 验证子任务:交叉验证检索结果的一致性
- 推理子任务:基于检索结果进行逻辑推导
- 更新子任务:将新发现的知识写回图谱
每个子任务的最佳执行策略不同,由专门的 Agent 负责可以获得更好的结果。这与软件工程中的"单一职责原则"一脉相承。
二、Agentic RAG 架构
2.1 核心架构:Agent 作为编排器
Agentic GraphRAG 的核心思想是让 Agent 担任检索过程的编排器(Orchestrator),而不是被动地调用一个检索函数。Agent 执行一个 Plan-Retrieve-Reason-Iterate 循环:
graph TB
subgraph "Agentic GraphRAG 核心循环"
A[用户查询] --> B[Planner Agent]
B --> C{策略决策}
C -->|向量检索| D[Vector Search]
C -->|图遍历| E[Graph Traversal]
C -->|结构化查询| F[Cypher/SPARQL Query]
C -->|社区摘要| G[Community Summary]
D --> H[Reasoner Agent]
E --> H
F --> H
G --> H
H --> I{信息充分?}
I -->|否| J[Query Reformulator]
J --> B
I -->|是| K[Response Synthesizer]
K --> L[最终回答]
H --> M{发现知识缺口?}
M -->|是| N[Graph Updater Agent]
N --> O[(知识图谱)]
E -.->|读取| O
F -.->|查询| O
end
style B fill:#e1f5fe
style H fill:#f3e5f5
style N fill:#fff3e0
style O fill:#e8f5e9
2.2 四层架构设计
一个生产级的 Agentic GraphRAG 系统通常包含四个层次。
第一层:接入层(Interface Layer)。 接收用户查询,做意图分类和复杂度评估。简单的事实性查询(如"谁是这家公司的CEO")可以直接路由到单次图查询,无需启动完整的 Agent 循环。复杂的分析性查询才会触发多 Agent 协作。
第二层:编排层(Orchestration Layer)。 这是系统的大脑。Orchestrator Agent 负责任务分解、Agent 调度、结果汇聚。它维护一个 Task Graph,记录每个子任务的状态(pending / running / completed / failed),并根据依赖关系决定执行顺序。
第三层:检索层(Retrieval Layer)。 包含多种检索工具:向量数据库(用于语义相似度搜索)、图数据库(用于结构化遍历和查询)、全文索引(用于关键词匹配)。每种工具由专门的 Tool Agent 封装,提供统一的调用接口。
第四层:知识层(Knowledge Layer)。 知识图谱本身,包括实体、关系、属性、社区结构、时序信息。这一层不是只读的——Agent 可以通过图更新接口向图谱中添加新实体和关系。
2.3 检索策略的动态决策
Agent 如何决定使用哪种检索策略?这是 Agentic GraphRAG 的核心设计问题。一种有效的方法是让 Planner Agent 基于以下信号做决策:
| 信号 | 判断逻辑 | 推荐策略 |
|---|---|---|
| 查询中包含特定实体名称 | 需要从已知实体出发探索 | Graph Traversal (local search) |
| 查询要求"总结"或"趋势" | 需要跨多个社区的聚合信息 | Community Summary (global search) |
| 查询包含模糊描述 | 实体不确定,需要语义匹配 | Vector Search + Entity Linking |
| 查询涉及多跳关系 | "A 和 B 之间有什么联系" | Multi-hop Graph Query |
| 前一轮检索结果不充分 | 需要换一种角度检索 | 策略切换或 Query Reformulation |
这种决策不是硬编码的 if-else,而是 LLM 基于当前上下文的推理。Agent 的 system prompt 中会包含策略选择的指导原则和示例,LLM 在每一步根据已有信息动态选择最优策略。
2.4 最小可行上下文(Minimum Viable Context)
一个关键的设计原则是 MVC(Minimum Viable Context)——每一步只给 Agent 提供当前步骤所需的最少信息。
在传统 RAG 中,检索结果全部堆积到 prompt 里,导致 context window 迅速膨胀。在 Agentic GraphRAG 中,Orchestrator 会精确控制每个 Agent 接收到的上下文:
- Planner Agent 只需要用户查询 + 图谱的 schema 概览 + 前几轮的检索摘要
- Retriever Agent 只需要具体的检索指令(查询语句或遍历起点)
- Reasoner Agent 只需要当前轮次的检索结果 + 推理目标
这种"按需供给"的上下文管理避免了 Lost-in-the-Middle 问题,也大幅降低了 token 消耗。
三、Graph-aware Agent 设计模式
3.1 模式一:Graph Navigator Agent
职责:在知识图谱中进行结构化遍历,沿着实体之间的关系路径探索相关信息。
Graph Navigator 是最基础也是最核心的设计模式。它把知识图谱视为一个可以"行走"的空间,Agent 站在某个实体节点上,观察周围的关系,决定下一步走向哪里。
from typing import TypedDict, Annotated, Literal
from langgraph.graph import StateGraph, END
from langchain_core.messages import HumanMessage, AIMessage
class NavigatorState(TypedDict):
"""Graph Navigator Agent 的状态定义"""
query: str
current_entity: str
visited_entities: list[str]
collected_facts: list[dict]
max_hops: int
hop_count: int
should_continue: bool
def get_entity_neighbors(entity_id: str, graph_client) -> dict:
"""从图数据库获取实体的邻居节点和关系"""
cypher = """
MATCH (e {id: $entity_id})-[r]-(neighbor)
RETURN type(r) AS relation,
neighbor.id AS neighbor_id,
neighbor.name AS neighbor_name,
properties(r) AS relation_props
LIMIT 20
"""
return graph_client.query(cypher, {"entity_id": entity_id})
def navigate_step(state: NavigatorState, llm, graph_client) -> NavigatorState:
"""Navigator Agent 的单步遍历逻辑"""
# 获取当前节点的邻居
neighbors = get_entity_neighbors(state["current_entity"], graph_client)
# 让 LLM 决定下一步走向哪个邻居
prompt = f"""你是一个知识图谱导航 Agent。
当前位置: {state['current_entity']}
原始查询: {state['query']}
已访问节点: {state['visited_entities']}
已收集事实: {state['collected_facts']}
当前节点的邻居关系:
{format_neighbors(neighbors)}
请决定:
1. 从邻居中选择最相关的节点继续探索(输出 neighbor_id)
2. 说明选择理由
3. 从当前位置提取与查询相关的事实
4. 判断是否已经收集到足够的信息(should_stop: true/false)
"""
response = llm.invoke(prompt)
decision = parse_navigation_decision(response)
# 更新状态
new_facts = state["collected_facts"] + decision["extracted_facts"]
new_visited = state["visited_entities"] + [state["current_entity"]]
return {
**state,
"current_entity": decision["next_entity"],
"visited_entities": new_visited,
"collected_facts": new_facts,
"hop_count": state["hop_count"] + 1,
"should_continue": (
not decision["should_stop"]
and state["hop_count"] + 1 < state["max_hops"]
),
}
这种模式的优势在于:Agent 的每一步决策都是 context-aware 的——它知道自己从哪里来、要去哪里、已经发现了什么。这比预定义的 Cypher 查询灵活得多,因为遍历路径是在运行时根据实际图谱内容动态生成的。
3.2 模式二:Schema-aware Query Planner
职责:理解知识图谱的 schema(节点类型、关系类型、属性约束),将自然语言查询转换为精确的图查询语句。
这个模式的关键创新是:不是让 LLM 直接生成 Cypher/SPARQL(这通常会出错),而是先让 Agent 理解 schema,制定查询计划,然后再生成查询语句。
class SchemaAwareQueryPlanner:
"""基于 Schema 感知的查询规划 Agent"""
def __init__(self, llm, graph_client):
self.llm = llm
self.graph_client = graph_client
self.schema = self._load_schema()
def _load_schema(self) -> dict:
"""从图数据库加载 schema 信息"""
node_labels = self.graph_client.query(
"CALL db.labels() YIELD label RETURN collect(label) AS labels"
)
rel_types = self.graph_client.query(
"CALL db.relationshipTypes() YIELD relationshipType "
"RETURN collect(relationshipType) AS types"
)
# 获取每种节点类型的属性
properties = {}
for label in node_labels[0]["labels"]:
props = self.graph_client.query(
f"MATCH (n:{label}) "
f"RETURN keys(n) AS props LIMIT 1"
)
if props:
properties[label] = props[0]["props"]
return {
"node_labels": node_labels[0]["labels"],
"relationship_types": rel_types[0]["types"],
"node_properties": properties,
}
def plan_and_query(self, natural_language_query: str) -> list[dict]:
"""将自然语言转换为查询计划并执行"""
# Step 1: 查询规划
plan_prompt = f"""你是一个图数据库查询规划器。
图谱 Schema:
- 节点类型: {self.schema['node_labels']}
- 关系类型: {self.schema['relationship_types']}
- 节点属性: {self.schema['node_properties']}
用户查询: {natural_language_query}
请输出查询计划:
1. 需要查找的实体类型
2. 需要遍历的关系路径
3. 需要的过滤条件
4. 最终需要返回的信息
以 JSON 格式输出。
"""
plan = self.llm.invoke(plan_prompt)
query_plan = parse_json(plan)
# Step 2: 基于计划生成 Cypher
cypher_prompt = f"""基于以下查询计划生成 Cypher 查询语句。
查询计划: {query_plan}
图谱 Schema: {self.schema}
要求:
- 使用参数化查询,避免注入风险
- 包含 LIMIT 子句防止结果过大
- 如果需要多跳遍历,使用可变长度路径模式
只输出 Cypher 语句,不要解释。
"""
cypher = self.llm.invoke(cypher_prompt).content.strip()
# Step 3: 执行并验证
try:
results = self.graph_client.query(cypher)
return results
except Exception as e:
# 查询失败时进入自修复循环
return self._self_heal_query(cypher, str(e), query_plan)
def _self_heal_query(
self, failed_cypher: str, error: str, plan: dict
) -> list[dict]:
"""查询失败时的自修复机制"""
fix_prompt = f"""Cypher 查询执行失败。
原始查询: {failed_cypher}
错误信息: {error}
查询计划: {plan}
图谱 Schema: {self.schema}
请修正 Cypher 查询。常见问题:
- 属性名拼写错误
- 关系方向错误
- 缺少必要的 WHERE 条件
"""
fixed_cypher = self.llm.invoke(fix_prompt).content.strip()
return self.graph_client.query(fixed_cypher)
3.3 模式三:Multi-hop Reasoner
职责:沿着实体链进行多跳推理,回答需要关联多个实体的复杂问题。
Multi-hop Reasoner 与 Graph Navigator 的区别在于:Navigator 侧重"探索"(不确定目标在哪),Reasoner 侧重"证明"(已知起点和终点,需要找到连接路径并解释因果关系)。
class MultiHopReasoner:
"""多跳推理 Agent:沿实体链进行因果推理"""
def __init__(self, llm, graph_client):
self.llm = llm
self.graph_client = graph_client
def reason(self, query: str, source_entity: str,
target_entity: str = None) -> dict:
"""执行多跳推理"""
# Step 1: 识别关键实体
entities = self._extract_entities(query)
if source_entity and source_entity not in entities:
entities.insert(0, source_entity)
# Step 2: 查找实体之间的路径
paths = []
for i in range(len(entities) - 1):
path = self._find_shortest_paths(
entities[i], entities[i + 1], max_hops=4
)
paths.append(path)
# Step 3: 沿路径收集证据
evidence_chain = []
for path in paths:
for hop in path:
context = self._get_entity_context(hop["node_id"])
evidence_chain.append({
"entity": hop["node_id"],
"entity_name": hop["name"],
"relation_from_previous": hop.get("relation"),
"local_context": context,
})
# Step 4: 基于证据链进行推理
reasoning_prompt = f"""你是一个基于知识图谱的推理 Agent。
原始问题: {query}
证据链:
{self._format_evidence_chain(evidence_chain)}
请基于证据链进行推理:
1. 沿着实体链,解释每一跳关系的含义
2. 综合所有关系,回答原始问题
3. 标注推理的置信度(high/medium/low)
4. 指出证据链中是否存在薄弱环节
"""
reasoning = self.llm.invoke(reasoning_prompt)
return {
"answer": reasoning.content,
"evidence_chain": evidence_chain,
"entity_path": [e["entity_name"] for e in evidence_chain],
"hop_count": len(evidence_chain) - 1,
}
def _find_shortest_paths(
self, start: str, end: str, max_hops: int = 4
) -> list[dict]:
"""在图谱中查找两个实体之间的最短路径"""
cypher = """
MATCH path = shortestPath(
(a {name: $start})-[*1..%d]-(b {name: $end})
)
RETURN [n IN nodes(path) |
{node_id: n.id, name: n.name}] AS nodes,
[r IN relationships(path) |
type(r)] AS relations
LIMIT 3
""" % max_hops
results = self.graph_client.query(
cypher, {"start": start, "end": end}
)
if not results:
return []
# 重组为跳跃序列
path_data = results[0]
hops = []
for i, node in enumerate(path_data["nodes"]):
hop = {**node}
if i > 0:
hop["relation"] = path_data["relations"][i - 1]
hops.append(hop)
return hops
def _get_entity_context(self, entity_id: str) -> str:
"""获取实体的局部上下文(属性 + 一跳邻居摘要)"""
cypher = """
MATCH (e {id: $entity_id})
OPTIONAL MATCH (e)-[r]-(n)
RETURN e AS entity,
collect({
relation: type(r),
neighbor: n.name
})[..5] AS neighbors
"""
result = self.graph_client.query(
cypher, {"entity_id": entity_id}
)
if not result:
return "No context available"
entity = result[0]["entity"]
neighbors = result[0]["neighbors"]
return f"Properties: {entity}, Related: {neighbors}"
3.4 模式四:Knowledge Gap Detector
职责:在检索过程中识别知识图谱中的缺失信息,并触发图谱更新流程。
这是最具前瞻性的模式。传统系统把"图谱中找不到"视为终点,而 Knowledge Gap Detector 把它视为起点——一个改进图谱的机会。
class KnowledgeGapDetector:
"""知识缺口检测 Agent"""
def __init__(self, llm, graph_client, update_queue):
self.llm = llm
self.graph_client = graph_client
self.update_queue = update_queue # 异步更新队列
def detect_and_fill(
self, query: str, retrieval_results: list[dict]
) -> dict:
"""检测知识缺口并触发更新"""
# Step 1: 分析检索结果的完整性
gap_analysis_prompt = f"""分析以下检索结果相对于原始查询的完整性。
原始查询: {query}
检索结果: {retrieval_results}
请识别:
1. 查询中提到但图谱中不存在的实体
2. 应该存在但缺失的关系
3. 属性值过时或缺失的实体
4. 推理链中断的位置
以 JSON 格式输出,包含:
- missing_entities: [{name, expected_type, reason}]
- missing_relations: [{source, target, expected_type, reason}]
- stale_data: [{entity, property, current_value, issue}]
- broken_chains: [{from_entity, to_entity, gap_description}]
"""
gaps = self.llm.invoke(gap_analysis_prompt)
gap_report = parse_json(gaps)
# Step 2: 对每个缺口生成更新任务
update_tasks = []
for entity in gap_report.get("missing_entities", []):
task = {
"type": "add_entity",
"payload": entity,
"priority": self._assess_priority(entity, query),
"source": "gap_detection",
"triggered_by_query": query,
}
update_tasks.append(task)
for relation in gap_report.get("missing_relations", []):
task = {
"type": "add_relation",
"payload": relation,
"priority": self._assess_priority(relation, query),
"source": "gap_detection",
"triggered_by_query": query,
}
update_tasks.append(task)
# Step 3: 将更新任务推入异步队列
for task in update_tasks:
self.update_queue.put(task)
return {
"gap_report": gap_report,
"update_tasks_queued": len(update_tasks),
"can_answer_now": len(gap_report.get("broken_chains", [])) == 0,
}
def _assess_priority(self, item: dict, query: str) -> str:
"""评估更新任务的优先级"""
# 如果缺失的信息直接影响当前查询的回答,优先级为 high
# 如果是补充性信息,优先级为 medium
# 如果是潜在有用但非必需的,优先级为 low
prompt = f"""评估这个知识缺口的补全优先级。
缺口: {item}
触发查询: {query}
输出: high / medium / low"""
return self.llm.invoke(prompt).content.strip()
3.5 四种模式的协作
四种模式不是孤立使用的,而是在 Orchestrator Agent 的协调下形成协作网络。
graph LR
subgraph "多 Agent 协作流程"
Q[用户查询] --> O[Orchestrator Agent]
O --> QP[Schema-aware<br/>Query Planner]
O --> GN[Graph Navigator<br/>Agent]
O --> MHR[Multi-hop<br/>Reasoner]
QP -->|结构化结果| R[Result Aggregator]
GN -->|探索发现| R
MHR -->|推理链| R
R --> KGD[Knowledge Gap<br/>Detector]
KGD -->|缺口报告| O
KGD -->|更新任务| UQ[Graph Update Queue]
UQ --> KG[(知识图谱)]
R --> SYN[Response Synthesizer]
SYN --> ANS[最终回答]
end
style O fill:#e1f5fe
style KGD fill:#fff3e0
style KG fill:#e8f5e9
style ANS fill:#f3e5f5
Orchestrator 的决策逻辑如下:
- 对于结构化的精确查询("公司 X 的注册资本是多少"),只启动 Query Planner。
- 对于探索性查询("这个领域有哪些值得关注的趋势"),启动 Graph Navigator + Community Summary。
- 对于关联性查询("A 和 B 之间有什么联系"),启动 Multi-hop Reasoner。
- 对于复合查询,并行启动多个 Agent,由 Result Aggregator 合并结果。
- 所有查询结果都经过 Knowledge Gap Detector 的审查。
四、MCP + 知识图谱:协议驱动的图谱工具化
4.1 为什么需要 MCP
Model Context Protocol(MCP)是 Anthropic 在 2024 年底发布的开放协议,旨在标准化 LLM 与外部工具之间的交互方式。它解决了一个长期存在的问题:每个 LLM 框架都有自己的 tool calling 规范,导致同一个工具需要为不同框架编写不同的适配器。
对于知识图谱场景,MCP 的价值尤为突出。图数据库的操作——查询、遍历、更新——具有很强的通用性。把这些操作封装为 MCP Tool,意味着任何支持 MCP 的 Agent 框架(LangChain、CrewAI、Pydantic.AI 等)都可以直接使用,无需重复开发。
Neo4j 官方已经发布了多个 MCP Server:mcp-neo4j-cypher(Cypher 查询)、mcp-neo4j-memory(实体记忆存储)、mcp-neo4j-modeling(数据建模)。这为构建 Agentic GraphRAG 提供了现成的基础设施。
4.2 图谱 MCP Tool 设计
一个完整的知识图谱 MCP Server 应该暴露以下工具集。
from mcp.server import Server
from mcp.types import Tool, TextContent
import json
app = Server("knowledge-graph-mcp")
@app.tool()
async def graph_query(cypher: str, params: dict = None) -> str:
"""执行 Cypher 查询并返回结果。
用于精确的结构化查询,例如查找特定实体的属性、
统计关系数量、或执行聚合操作。
Args:
cypher: Cypher 查询语句(参数化)
params: 查询参数字典
Returns:
查询结果的 JSON 字符串
"""
results = await neo4j_driver.execute_query(cypher, params or {})
return json.dumps(results, ensure_ascii=False, default=str)
@app.tool()
async def entity_search(
query: str,
entity_type: str = None,
limit: int = 10,
search_mode: str = "hybrid",
) -> str:
"""搜索知识图谱中的实体。
支持三种搜索模式:
- semantic: 基于 embedding 的语义搜索
- keyword: 基于 BM25 的关键词搜索
- hybrid: 混合搜索(推荐)
Args:
query: 搜索文本
entity_type: 可选的实体类型过滤
limit: 返回结果数量上限
search_mode: 搜索模式 (semantic/keyword/hybrid)
Returns:
匹配实体列表的 JSON
"""
if search_mode == "semantic":
embedding = await embed_model.encode(query)
results = await vector_index.search(
embedding, top_k=limit, filter={"type": entity_type}
)
elif search_mode == "keyword":
results = await fulltext_index.search(
query, limit=limit, entity_type=entity_type
)
else: # hybrid
sem_results = await vector_index.search(
await embed_model.encode(query), top_k=limit * 2
)
kw_results = await fulltext_index.search(query, limit=limit * 2)
results = reciprocal_rank_fusion(sem_results, kw_results)[:limit]
return json.dumps(results, ensure_ascii=False)
@app.tool()
async def relationship_traverse(
start_entity: str,
relation_types: list[str] = None,
direction: str = "both",
max_depth: int = 2,
limit: int = 50,
) -> str:
"""从指定实体出发,沿关系路径遍历图谱。
用于探索实体的关系网络,发现间接关联。
Args:
start_entity: 起始实体的名称或 ID
relation_types: 要遍历的关系类型列表(空则遍历所有类型)
direction: 遍历方向 (outgoing/incoming/both)
max_depth: 最大遍历深度
limit: 返回路径数量上限
Returns:
遍历到的路径和实体的 JSON
"""
rel_filter = ""
if relation_types:
rel_filter = ":" + "|".join(relation_types)
dir_pattern = {
"outgoing": f"-[r{rel_filter}*1..{max_depth}]->",
"incoming": f"<-[r{rel_filter}*1..{max_depth}]-",
"both": f"-[r{rel_filter}*1..{max_depth}]-",
}
cypher = f"""
MATCH (start {{name: $name}})
MATCH path = (start){dir_pattern[direction]}(end)
RETURN path, length(path) AS depth
ORDER BY depth
LIMIT $limit
"""
results = await neo4j_driver.execute_query(
cypher, {"name": start_entity, "limit": limit}
)
return json.dumps(format_paths(results), ensure_ascii=False)
@app.tool()
async def community_summary(
community_id: str = None,
topic: str = None,
level: int = 0,
) -> str:
"""获取知识图谱社区的摘要信息。
社区是通过图聚类算法(如 Leiden)自动发现的实体组。
每个社区有不同层级的摘要,level 越高摘要越抽象。
Args:
community_id: 社区 ID(精确获取)
topic: 主题关键词(模糊匹配相关社区)
level: 摘要层级 (0=最详细, 更高=更抽象)
Returns:
社区摘要信息的 JSON
"""
if community_id:
cypher = """
MATCH (c:Community {id: $id, level: $level})
RETURN c.summary AS summary,
c.title AS title,
c.entity_count AS entity_count,
c.key_entities AS key_entities
"""
results = await neo4j_driver.execute_query(
cypher, {"id": community_id, "level": level}
)
elif topic:
# 基于语义搜索找到相关社区
embedding = await embed_model.encode(topic)
results = await community_vector_index.search(
embedding, top_k=5, filter={"level": level}
)
else:
# 返回顶层社区概览
cypher = """
MATCH (c:Community {level: $level})
RETURN c.title AS title,
c.summary AS summary,
c.entity_count AS entity_count
ORDER BY c.entity_count DESC
LIMIT 10
"""
results = await neo4j_driver.execute_query(
cypher, {"level": max(level, 1)}
)
return json.dumps(results, ensure_ascii=False)
@app.tool()
async def graph_update(
operations: list[dict],
source: str = "agent",
require_verification: bool = True,
) -> str:
"""向知识图谱中添加或更新实体和关系。
支持的操作类型:
- add_entity: 添加新实体
- add_relation: 添加新关系
- update_property: 更新实体属性
- invalidate_relation: 标记关系为失效(不物理删除)
Args:
operations: 操作列表,每个操作包含 type 和 payload
source: 操作来源标识
require_verification: 是否需要二次验证
Returns:
操作执行结果的 JSON
"""
results = []
for op in operations:
if require_verification:
# 先检查操作是否与现有图谱一致
conflict = await check_conflict(op)
if conflict:
results.append({
"operation": op,
"status": "conflict",
"detail": conflict,
})
continue
result = await execute_graph_operation(op, source)
results.append(result)
return json.dumps(results, ensure_ascii=False)
4.3 MCP 架构全景
graph TB
subgraph "Agent 框架层"
LA[LangGraph Agent]
CA[CrewAI Agent]
PA[Pydantic.AI Agent]
end
subgraph "MCP 协议层"
MC[MCP Client]
LA --> MC
CA --> MC
PA --> MC
end
subgraph "MCP Server: Knowledge Graph"
MS[MCP Server]
MC <-->|JSON-RPC / stdio| MS
T1[graph_query]
T2[entity_search]
T3[relationship_traverse]
T4[community_summary]
T5[graph_update]
MS --> T1
MS --> T2
MS --> T3
MS --> T4
MS --> T5
end
subgraph "存储层"
N4J[(Neo4j<br/>图数据库)]
VS[(Vector Index<br/>语义索引)]
FT[(Full-text Index<br/>关键词索引)]
T1 --> N4J
T2 --> VS
T2 --> FT
T3 --> N4J
T4 --> N4J
T4 --> VS
T5 --> N4J
end
style MC fill:#e1f5fe
style MS fill:#f3e5f5
style N4J fill:#e8f5e9
4.4 MCP 配置示例
在 Claude Desktop 或 Claude Code 中使用知识图谱 MCP Server 的配置方式:
{
"mcpServers": {
"knowledge-graph": {
"command": "python",
"args": ["-m", "kg_mcp_server"],
"env": {
"NEO4J_URI": "bolt://localhost:7687",
"NEO4J_USER": "neo4j",
"NEO4J_PASSWORD": "${NEO4J_PASSWORD}",
"EMBEDDING_MODEL": "text-embedding-3-small"
}
},
"neo4j-cypher": {
"command": "npx",
"args": ["-y", "@neo4j-contrib/mcp-neo4j-cypher"],
"env": {
"NEO4J_URI": "bolt://localhost:7687",
"NEO4J_USER": "neo4j",
"NEO4J_PASSWORD": "${NEO4J_PASSWORD}"
}
}
}
}
五、实现示例:LangGraph + Neo4j 的 Agentic GraphRAG
5.1 完整实现:多 Agent 图谱检索系统
以下是一个基于 LangGraph 的完整实现,展示了如何构建一个具备图谱感知能力的多 Agent 系统。
"""
Agentic GraphRAG: LangGraph + Neo4j 完整实现
依赖:
pip install langgraph langchain-openai neo4j langchain-community
"""
from typing import TypedDict, Annotated, Literal
from langgraph.graph import StateGraph, END
from langgraph.prebuilt import ToolNode
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
from langchain_core.messages import HumanMessage, SystemMessage
from neo4j import GraphDatabase
import json
import operator
# ============================================================
# 1. 图数据库连接与工具定义
# ============================================================
driver = GraphDatabase.driver(
"bolt://localhost:7687", auth=("neo4j", "password")
)
def execute_cypher(query: str, params: dict = None) -> list[dict]:
"""执行 Cypher 查询的工具函数"""
with driver.session() as session:
result = session.run(query, params or {})
return [dict(record) for record in result]
@tool
def search_entities(query: str, entity_type: str = "") -> str:
"""在知识图谱中搜索实体。
输入自然语言描述,返回匹配的实体列表。
可选指定 entity_type 过滤实体类型。
"""
cypher = """
CALL db.index.fulltext.queryNodes('entity_fulltext', $query)
YIELD node, score
WHERE score > 0.5
"""
if entity_type:
cypher += f" AND '{entity_type}' IN labels(node)"
cypher += """
RETURN node.name AS name,
node.id AS id,
labels(node) AS types,
node.description AS description,
score
ORDER BY score DESC
LIMIT 10
"""
results = execute_cypher(cypher, {"query": query})
return json.dumps(results, ensure_ascii=False, default=str)
@tool
def traverse_relationships(
entity_name: str, max_depth: int = 2
) -> str:
"""从指定实体出发遍历关系网络。
返回指定深度内的所有相关实体和关系。
"""
cypher = """
MATCH (start {name: $name})
MATCH path = (start)-[*1..$depth]-(connected)
WITH path, connected,
[r IN relationships(path) | type(r)] AS rel_types,
[n IN nodes(path) | n.name] AS node_names
RETURN DISTINCT
node_names AS path_nodes,
rel_types AS path_relations,
connected.name AS end_entity,
connected.description AS end_description,
length(path) AS depth
ORDER BY depth
LIMIT 30
"""
results = execute_cypher(
cypher, {"name": entity_name, "depth": max_depth}
)
return json.dumps(results, ensure_ascii=False, default=str)
@tool
def execute_graph_query(cypher_query: str) -> str:
"""执行自定义 Cypher 查询。
用于复杂的图模式匹配和聚合查询。
查询必须包含 LIMIT 子句。
"""
if "LIMIT" not in cypher_query.upper():
cypher_query += " LIMIT 20"
try:
results = execute_cypher(cypher_query)
return json.dumps(results, ensure_ascii=False, default=str)
except Exception as e:
return json.dumps({"error": str(e)})
@tool
def get_graph_schema() -> str:
"""获取知识图谱的 schema 信息。
返回所有节点类型、关系类型及其属性。
"""
labels = execute_cypher(
"CALL db.labels() YIELD label RETURN collect(label) AS labels"
)
rel_types = execute_cypher(
"CALL db.relationshipTypes() YIELD relationshipType "
"RETURN collect(relationshipType) AS types"
)
# 采样获取属性信息
properties = {}
for label in labels[0]["labels"]:
props = execute_cypher(
f"MATCH (n:`{label}`) RETURN keys(n) AS props LIMIT 1"
)
if props:
properties[label] = props[0]["props"]
schema = {
"node_labels": labels[0]["labels"],
"relationship_types": rel_types[0]["types"],
"node_properties": properties,
}
return json.dumps(schema, ensure_ascii=False)
@tool
def find_paths_between(
entity_a: str, entity_b: str, max_hops: int = 4
) -> str:
"""查找两个实体之间的所有路径。
用于发现隐藏的关联关系。
"""
cypher = f"""
MATCH (a {{name: $entity_a}}), (b {{name: $entity_b}})
MATCH path = allShortestPaths((a)-[*..{max_hops}]-(b))
RETURN [n IN nodes(path) | n.name] AS path_nodes,
[r IN relationships(path) | type(r)] AS relations,
length(path) AS path_length
LIMIT 5
"""
results = execute_cypher(
cypher, {"entity_a": entity_a, "entity_b": entity_b}
)
if not results:
return json.dumps({
"message": f"No path found between "
f"'{entity_a}' and '{entity_b}' "
f"within {max_hops} hops"
})
return json.dumps(results, ensure_ascii=False, default=str)
# ============================================================
# 2. Agent 状态定义
# ============================================================
class AgentState(TypedDict):
"""多 Agent 系统的共享状态"""
messages: Annotated[list, operator.add]
query: str
retrieval_strategy: str
retrieved_context: list[dict]
reasoning_steps: list[str]
iteration_count: int
max_iterations: int
is_sufficient: bool
final_answer: str
# ============================================================
# 3. Agent 节点定义
# ============================================================
llm = ChatOpenAI(model="gpt-4o", temperature=0)
tools = [
search_entities,
traverse_relationships,
execute_graph_query,
get_graph_schema,
find_paths_between,
]
llm_with_tools = llm.bind_tools(tools)
def planner_node(state: AgentState) -> dict:
"""规划 Agent:分析查询并制定检索策略"""
system_prompt = """你是一个知识图谱检索规划 Agent。
你的任务是分析用户查询,制定最优的检索策略。
可用的检索策略:
1. entity_search - 查找特定实体
2. graph_traversal - 从实体出发遍历关系网络
3. structured_query - 用 Cypher 做结构化查询
4. path_finding - 查找两个实体之间的路径
5. multi_strategy - 组合使用多种策略
请基于查询的性质选择策略,并制定具体的执行计划。
首先调用 get_graph_schema 了解图谱结构,然后制定计划。
"""
messages = [
SystemMessage(content=system_prompt),
HumanMessage(content=(
f"用户查询: {state['query']}\n"
f"当前迭代: {state['iteration_count']}/{state['max_iterations']}\n"
f"已有上下文: {len(state.get('retrieved_context', []))} 条"
)),
]
if state.get("reasoning_steps"):
messages.append(HumanMessage(content=(
f"前几轮的推理: {state['reasoning_steps']}"
)))
response = llm_with_tools.invoke(messages)
return {"messages": [response]}
def retriever_node(state: AgentState) -> dict:
"""检索 Agent:执行具体的图谱检索操作"""
system_prompt = """你是一个知识图谱检索执行 Agent。
根据 Planner 的策略,使用可用工具执行检索。
执行原则:
1. 优先使用 search_entities 定位关键实体
2. 对找到的实体使用 traverse_relationships 探索关系
3. 需要精确查询时使用 execute_graph_query
4. 需要发现关联时使用 find_paths_between
5. 每次检索要记录发现了什么、还缺什么
"""
messages = [SystemMessage(content=system_prompt)] + state["messages"]
response = llm_with_tools.invoke(messages)
return {"messages": [response]}
def reasoner_node(state: AgentState) -> dict:
"""推理 Agent:基于检索结果进行推理和判断"""
system_prompt = """你是一个知识图谱推理 Agent。
你的任务:
1. 分析当前检索到的所有信息
2. 判断是否足够回答用户的查询
3. 如果不够,指出还需要检索什么
4. 如果足够,综合所有信息给出答案
输出格式 (JSON):
{
"reasoning": "你的推理过程",
"is_sufficient": true/false,
"missing_info": "如果不够,说明缺什么",
"answer": "如果足够,给出最终答案"
}
"""
messages = [
SystemMessage(content=system_prompt),
HumanMessage(content=(
f"用户查询: {state['query']}\n\n"
f"检索到的信息:\n"
f"{json.dumps(state.get('retrieved_context', []), ensure_ascii=False)}\n\n"
f"对话历史消息数: {len(state['messages'])}"
)),
]
response = llm.invoke(messages)
reasoning_result = parse_reasoning(response.content)
new_steps = state.get("reasoning_steps", []) + [
reasoning_result.get("reasoning", "")
]
return {
"messages": [response],
"reasoning_steps": new_steps,
"is_sufficient": reasoning_result.get("is_sufficient", False),
"final_answer": reasoning_result.get("answer", ""),
"iteration_count": state["iteration_count"] + 1,
}
def should_continue(state: AgentState) -> Literal["retriever", "end"]:
"""条件边:决定是否继续检索"""
if state.get("is_sufficient"):
return "end"
if state["iteration_count"] >= state["max_iterations"]:
return "end"
return "retriever"
def parse_reasoning(content: str) -> dict:
"""解析推理 Agent 的输出"""
try:
# 尝试提取 JSON
start = content.find("{")
end = content.rfind("}") + 1
if start >= 0 and end > start:
return json.loads(content[start:end])
except json.JSONDecodeError:
pass
return {
"reasoning": content,
"is_sufficient": False,
"answer": "",
}
# ============================================================
# 4. 构建 LangGraph 工作流
# ============================================================
def build_agentic_graphrag() -> StateGraph:
"""构建 Agentic GraphRAG 工作流"""
workflow = StateGraph(AgentState)
# 添加节点
workflow.add_node("planner", planner_node)
workflow.add_node("retriever", retriever_node)
workflow.add_node("tools", ToolNode(tools))
workflow.add_node("reasoner", reasoner_node)
# 设置入口
workflow.set_entry_point("planner")
# 添加边
workflow.add_edge("planner", "retriever")
# Retriever -> Tools (如果有 tool calls)
# Retriever -> Reasoner (如果没有 tool calls)
def route_retriever(state: AgentState):
last_message = state["messages"][-1]
if hasattr(last_message, "tool_calls") and last_message.tool_calls:
return "tools"
return "reasoner"
workflow.add_conditional_edges(
"retriever",
route_retriever,
{"tools": "tools", "reasoner": "reasoner"},
)
# Tools 执行后回到 Retriever
workflow.add_edge("tools", "retriever")
# Reasoner 的条件边
workflow.add_conditional_edges(
"reasoner",
should_continue,
{"retriever": "planner", "end": END},
)
return workflow.compile()
# ============================================================
# 5. 使用示例
# ============================================================
async def main():
"""运行 Agentic GraphRAG"""
graph = build_agentic_graphrag()
# 执行查询
result = await graph.ainvoke({
"query": "分析 OpenAI 和 Anthropic 在技术路线上的差异,"
"以及它们各自的投资方之间是否存在竞争关系",
"messages": [],
"retrieval_strategy": "",
"retrieved_context": [],
"reasoning_steps": [],
"iteration_count": 0,
"max_iterations": 5,
"is_sufficient": False,
"final_answer": "",
})
print("=== 最终回答 ===")
print(result["final_answer"])
print(f"\n=== 推理步骤 ({len(result['reasoning_steps'])} 步) ===")
for i, step in enumerate(result["reasoning_steps"], 1):
print(f"Step {i}: {step[:200]}...")
if __name__ == "__main__":
import asyncio
asyncio.run(main())
5.2 Agent 推理链示例
以下是上述系统处理一个真实查询时的推理链记录,展示了 Agent 如何在知识图谱中逐步探索:
Query: "分析 OpenAI 和 Anthropic 在技术路线上的差异,
以及它们各自的投资方之间是否存在竞争关系"
Step 1 [Planner]:
策略: multi_strategy
计划:
1. 先获取 schema 了解图谱结构
2. 搜索 OpenAI 和 Anthropic 两个实体
3. 分别遍历它们的技术关系和投资关系
4. 查找投资方之间的路径
Step 2 [Retriever]:
调用: get_graph_schema()
调用: search_entities("OpenAI")
调用: search_entities("Anthropic")
发现: 两个实体均存在,类型为 Company
Step 3 [Retriever]:
调用: traverse_relationships("OpenAI", max_depth=2)
发现: OpenAI -> DEVELOPS -> GPT-4, GPT-4o, DALL-E
OpenAI -> FUNDED_BY -> Microsoft, Thrive Capital
OpenAI -> COMPETES_WITH -> Anthropic, Google DeepMind
Step 4 [Retriever]:
调用: traverse_relationships("Anthropic", max_depth=2)
发现: Anthropic -> DEVELOPS -> Claude, Constitutional AI
Anthropic -> FUNDED_BY -> Google, Amazon, Salesforce
Anthropic -> FOUNDED_BY -> Dario Amodei (ex-OpenAI)
Step 5 [Reasoner]:
判断: 信息不充分 -- 缺少投资方之间的竞争关系数据
需要: 查找 Microsoft 和 Google 之间的关系路径
Step 6 [Retriever]:
调用: find_paths_between("Microsoft", "Google", max_hops=3)
发现: Microsoft -> COMPETES_WITH -> Google (in: Cloud, Search, AI)
Microsoft -> PARTNERS_WITH -> Google (in: Open Source)
Step 7 [Retriever]:
调用: find_paths_between("Amazon", "Microsoft", max_hops=3)
发现: Amazon -> COMPETES_WITH -> Microsoft (in: Cloud/AWS vs Azure)
Step 8 [Reasoner]:
判断: 信息充分
综合回答: 技术路线差异 + 投资方竞争格局分析
这个例子清晰地展示了 Agentic GraphRAG 相比静态 GraphRAG 的优势:Agent 在第 5 步发现信息不足后,自主决定追加查询投资方之间的关系,这种迭代式的检索在传统 RAG 中无法实现。
六、Graphiti 时序图谱:Agent 的时间感知记忆
6.1 为什么 Agent 需要时序感知
传统知识图谱有一个被忽视的盲点:它记录的是"事实",但不记录"事实的时间维度"。当 Agent 说"A 公司的 CEO 是张三",这个陈述在 2023 年可能是对的,但到了 2025 年可能已经过时。
Graphiti 是 Zep 团队开发的时序知识图谱框架,专门为 AI Agent 的记忆系统设计。它的核心创新是双时态模型(Bi-temporal Model):每条边(关系)同时记录两个时间维度:
- 有效时间(Valid Time, t_valid):事实在现实世界中生效的时间
- 系统时间(Transaction Time, t_transaction):事实被系统录入的时间
这种双时态设计使得 Agent 不仅能回答"现在是什么情况",还能回答"过去是什么情况"以及"什么时候发生了变化"。
6.2 Episodic vs Semantic Memory
Graphiti 的图谱架构包含三个层次的子图:
Episode 子图(Episodic Memory):记录 Agent 与用户的每一次交互,保留原始对话上下文。每个 Episode 是一个离散的信息单元——一段对话、一条消息、一个事件。Episode 之间按时间顺序链接,形成一条时间线。
Semantic Entity 子图(Semantic Memory):从 Episode 中提取出的实体和关系,经过去重和合并后构成的结构化知识。这是 Agent 的"长期记忆"——它知道"用户喜欢 Python"、"项目使用 Neo4j"这些持久性事实。
Community 子图:通过社区检测算法自动发现的实体群组,每个社区有自己的摘要。用于支持全局性查询。
三者的关系是:Episode 是原始数据,Semantic Entity 是从中提炼的知识,Community 是知识的聚合视图。
"""
Graphiti 时序图谱示例:Agent 记忆管理
"""
from graphiti_core import Graphiti
from graphiti_core.nodes import EpisodeType
from datetime import datetime, timezone
async def demo_graphiti_memory():
"""演示 Graphiti 的时序记忆能力"""
# 初始化 Graphiti(连接 Neo4j)
graphiti = Graphiti(
"bolt://localhost:7687",
"neo4j",
"password",
)
await graphiti.build_indices_and_constraints()
# ========================================
# 阶段 1: 添加 Episodic Memory(对话记忆)
# ========================================
episodes = [
{
"content": "用户说:我们的项目决定使用 Neo4j 作为图数据库,"
"主要是因为它的 Cypher 查询语言更直观。",
"timestamp": datetime(2025, 6, 1, tzinfo=timezone.utc),
},
{
"content": "用户说:我们今天把数据库从 Neo4j 迁移到了 "
"NebulaGraph,因为 Neo4j 的社区版不支持集群。",
"timestamp": datetime(2025, 9, 15, tzinfo=timezone.utc),
},
{
"content": "用户说:评估了 NebulaGraph 三个月后,我们发现"
"它的生态不够成熟,决定回到 Neo4j Enterprise。",
"timestamp": datetime(2025, 12, 20, tzinfo=timezone.utc),
},
]
for ep in episodes:
await graphiti.add_episode(
name=f"conversation_{ep['timestamp'].strftime('%Y%m%d')}",
episode_body=ep["content"],
source=EpisodeType.message,
source_description="用户与 Agent 的项目讨论",
reference_time=ep["timestamp"],
)
# ========================================
# 阶段 2: 时序查询 -- Agent 的时间感知推理
# ========================================
# 查询当前状态
current_results = await graphiti.search(
query="项目目前使用什么图数据库?",
num_results=5,
)
print("当前状态查询:")
for r in current_results:
print(f" - {r.fact} (valid: {r.valid_at})")
# 预期输出: 项目使用 Neo4j Enterprise
# (valid: 2025-12-20,因为时序关系标记了最新状态)
# 查询历史状态
# Graphiti 的边带有 t_valid 和 t_invalid,
# 可以精确回溯任意时间点的知识状态
print("\n时序变更链:")
print(" 2025-06-01: 项目 -> USES -> Neo4j (Community)")
print(" 2025-09-15: 项目 -> USES -> Neo4j [INVALIDATED]")
print(" 2025-09-15: 项目 -> USES -> NebulaGraph")
print(" 2025-12-20: 项目 -> USES -> NebulaGraph [INVALIDATED]")
print(" 2025-12-20: 项目 -> USES -> Neo4j (Enterprise)")
# ========================================
# 阶段 3: 混合检索(Graphiti 的核心优势)
# ========================================
# Graphiti 的 search 结合了三种检索方式:
# 1. 语义搜索 (embedding similarity)
# 2. 关键词搜索 (BM25)
# 3. 图遍历 (traversal from relevant nodes)
#
# P95 延迟 ~300ms(Zep 论文数据)
hybrid_results = await graphiti.search(
query="为什么项目在图数据库选型上反复变化?",
num_results=10,
)
# Graphiti 会返回完整的时序变更链,
# 让 Agent 能够理解决策的演变过程
# ========================================
# 阶段 4: 社区摘要(全局视图)
# ========================================
# Graphiti 自动进行社区检测和摘要生成
# 当 Agent 需要回答"项目的技术栈全景"这类全局问题时,
# 可以利用社区摘要而不是遍历所有实体
await graphiti.close()
6.3 Graphiti 在 Agent 记忆中的定位
Graphiti 解决了 Agent 记忆系统的一个根本问题:如何在不丢失历史的前提下更新知识。传统的 vector store 记忆是 append-only 的,新旧信息混杂在一起,Agent 无法分辨哪个是最新的。Graphiti 通过边的失效机制(Edge Invalidation)优雅地处理了这个问题——旧关系不会被删除,而是被标记为 invalidated,新关系同时被创建。
在 Zep 的 DMR(Deep Memory Retrieval)基准测试中,基于 Graphiti 的记忆系统达到了 94.8% 的准确率,比 MemGPT 的 93.4% 高出 1.4 个百分点,同时响应延迟降低了 90%。这个性能数据表明,图结构的记忆在准确性和速度上都优于线性的对话历史。
6.4 Graphiti MCP Server
Graphiti 已经提供了官方的 MCP Server,可以直接集成到 Claude、Cursor 等 MCP 客户端中:
{
"mcpServers": {
"graphiti-memory": {
"command": "graphiti-mcp-server",
"args": ["--transport", "stdio"],
"env": {
"NEO4J_URI": "bolt://localhost:7687",
"NEO4J_USER": "neo4j",
"NEO4J_PASSWORD": "${NEO4J_PASSWORD}",
"OPENAI_API_KEY": "${OPENAI_API_KEY}",
"MODEL_NAME": "gpt-4o-mini"
}
}
}
}
通过 MCP 集成,任何 Agent 都可以获得时序感知的图谱记忆能力,而不需要直接操作 Graphiti 的 API。这是"协议驱动的可组合性"的一个典型案例。
七、生产考量
7.1 Graph Permission Control(图权限控制)
在企业环境中,知识图谱可能包含不同密级的信息。Agent 的检索必须受到权限控制。
节点级别的访问控制(Node-level ACL):每个实体节点附带一个 access_level 属性,Agent 在查询时必须携带用户的权限标识,只返回该用户有权访问的节点。
def secure_query(cypher: str, user_permissions: dict) -> list:
"""带权限控制的图查询"""
# 在所有查询中注入权限过滤
access_levels = user_permissions.get("access_levels", ["public"])
security_filter = (
" AND n.access_level IN $access_levels"
)
# 将安全过滤器注入到 WHERE 子句中
if "WHERE" in cypher.upper():
cypher = cypher.replace("WHERE", f"WHERE n.access_level IN $access_levels AND")
else:
# 在 RETURN 之前插入 WHERE
return_idx = cypher.upper().find("RETURN")
cypher = (
cypher[:return_idx]
+ f"WHERE n.access_level IN $access_levels\n"
+ cypher[return_idx:]
)
return execute_cypher(
cypher, {"access_levels": access_levels}
)
子图隔离(Subgraph Isolation):使用 Neo4j 的多租户能力或命名空间机制,将不同部门的知识图谱物理隔离。Agent 只能访问被授权的子图。
审计日志(Audit Trail):记录 Agent 的每一次图谱访问,包括查询语句、返回结果的实体 ID、访问时间。这对于合规审计至关重要。
7.2 Query Cost Budgeting(查询成本预算)
Agent 的图遍历如果不加控制,可能触发代价高昂的全图扫描。需要在多个层面设置成本预算。
Token 预算:每次 Agent 循环有一个总 token 预算。Orchestrator 跟踪累计消耗的 token 数,当接近预算上限时,强制 Agent 停止检索并基于现有信息生成回答。
查询复杂度限制:在 MCP Tool 层面限制 Cypher 查询的复杂度——例如,可变长度路径的最大深度不超过 5,LIMIT 子句必须存在且不超过 100,禁止 MATCH (n) RETURN n 这类全图扫描。
时间预算:每个查询设置超时。图遍历如果在 10 秒内没有返回结果,自动终止并返回部分结果。
class CostBudget:
"""Agent 查询成本预算管理"""
def __init__(
self,
max_tokens: int = 50000,
max_queries: int = 20,
max_wall_time_seconds: int = 120,
):
self.max_tokens = max_tokens
self.max_queries = max_queries
self.max_wall_time = max_wall_time_seconds
self.tokens_used = 0
self.queries_executed = 0
self.start_time = None
def start(self):
self.start_time = datetime.now()
def record_query(self, token_count: int):
self.tokens_used += token_count
self.queries_executed += 1
def can_continue(self) -> tuple[bool, str]:
if self.tokens_used >= self.max_tokens:
return False, f"Token budget exhausted: {self.tokens_used}/{self.max_tokens}"
if self.queries_executed >= self.max_queries:
return False, f"Query limit reached: {self.queries_executed}/{self.max_queries}"
elapsed = (datetime.now() - self.start_time).total_seconds()
if elapsed >= self.max_wall_time:
return False, f"Time budget exhausted: {elapsed:.0f}s/{self.max_wall_time}s"
return True, "OK"
def remaining_budget(self) -> dict:
elapsed = (datetime.now() - self.start_time).total_seconds()
return {
"tokens_remaining": self.max_tokens - self.tokens_used,
"queries_remaining": self.max_queries - self.queries_executed,
"time_remaining_seconds": max(0, self.max_wall_time - elapsed),
}
7.3 Agent Loop Termination(Agent 循环终止)
这是 Agentic RAG 最棘手的工程问题之一。Agent 可能陷入无限循环——反复检索、反复判断信息不足、反复追问。
硬性终止条件:
| 条件 | 阈值 | 行为 |
|---|---|---|
| 迭代次数 | max_iterations (默认 5) | 强制退出,基于已有信息回答 |
| Token 消耗 | max_tokens (默认 50000) | 强制退出 |
| 墙钟时间 | max_wall_time (默认 120s) | 强制退出 |
| 连续无新信息 | 2 轮 | 强制退出 |
柔性终止信号:
- 信息增量递减:如果最新一轮检索带来的新实体数量不到上一轮的 30%,发出"信息饱和"信号。
- 置信度阈值:Reasoner Agent 在每一轮输出一个 0-1 的置信度分数,当置信度超过 0.8 时建议终止。
- 循环检测:如果 Agent 连续两次生成相似的查询(余弦相似度 > 0.9),说明陷入了循环,强制终止。
def detect_loop(
recent_queries: list[str], threshold: float = 0.9
) -> bool:
"""检测 Agent 是否陷入查询循环"""
if len(recent_queries) < 2:
return False
# 比较最近两次查询的相似度
from sentence_transformers import SentenceTransformer
model = SentenceTransformer("all-MiniLM-L6-v2")
embeddings = model.encode(recent_queries[-2:])
similarity = float(
embeddings[0] @ embeddings[1]
/ (
(embeddings[0] @ embeddings[0]) ** 0.5
* (embeddings[1] @ embeddings[1]) ** 0.5
)
)
return similarity > threshold
7.4 Observability(可观测性)
在 Agent 与知识图谱的交互过程中,可观测性要求比传统 RAG 高得多,因为推理路径不是线性的——Agent 可能在图谱中进行了复杂的遍历,而这个过程对于调试和优化至关重要。
Trace Agent 的图遍历路径:记录 Agent 在图谱中访问的每一个节点和关系,形成一条"探索轨迹"。这不仅用于调试,还可以用于发现图谱中的"热点区域"(频繁被访问的实体)和"冷区"(从未被检索到的实体)。
from dataclasses import dataclass, field
from datetime import datetime
import json
@dataclass
class GraphTraversalSpan:
"""单个图遍历操作的追踪记录"""
span_id: str
operation: str # search / traverse / query / path_find
cypher: str
parameters: dict
result_count: int
entities_visited: list[str]
relations_traversed: list[str]
duration_ms: float
token_cost: int
timestamp: datetime = field(default_factory=datetime.now)
@dataclass
class AgentTrace:
"""Agent 完整推理链的追踪"""
trace_id: str
query: str
spans: list[GraphTraversalSpan] = field(default_factory=list)
total_iterations: int = 0
total_tokens: int = 0
total_entities_visited: int = 0
termination_reason: str = ""
def add_span(self, span: GraphTraversalSpan):
self.spans.append(span)
self.total_tokens += span.token_cost
self.total_entities_visited += len(span.entities_visited)
def export_for_langsmith(self) -> dict:
"""导出为 LangSmith 兼容格式"""
return {
"trace_id": self.trace_id,
"name": f"GraphRAG: {self.query[:50]}",
"metadata": {
"total_iterations": self.total_iterations,
"total_tokens": self.total_tokens,
"total_entities": self.total_entities_visited,
"termination_reason": self.termination_reason,
},
"spans": [
{
"name": s.operation,
"metadata": {
"cypher": s.cypher,
"result_count": s.result_count,
"entities": s.entities_visited,
"duration_ms": s.duration_ms,
},
}
for s in self.spans
],
}
def export_graph_heatmap(self) -> dict:
"""导出实体访问热力图数据"""
entity_counts: dict[str, int] = {}
for span in self.spans:
for entity in span.entities_visited:
entity_counts[entity] = entity_counts.get(entity, 0) + 1
return dict(
sorted(entity_counts.items(), key=lambda x: -x[1])
)
与 LangSmith / OpenTelemetry 集成:将 Agent 的每一步决策、每一次工具调用、每一个图查询都作为一个 Span 记录到分布式追踪系统中。这样可以清晰地看到:用户查询 -> Planner 决策 -> Retriever 调用 -> Cypher 执行 -> Reasoner 推理 -> 最终回答 的完整链路。
图谱健康度监控:定期统计图谱的关键指标——节点数、关系数、孤立节点比例、平均路径长度、社区数量。如果 Agent 触发的 Knowledge Gap Detector 频繁报告某类缺失,说明图谱在该领域的覆盖度不足,需要主动补充数据。
7.5 扩展性与成本优化
分层缓存:高频查询的结果缓存在 Redis 中,缓存的粒度是 (entity_id, relation_type, depth) 的组合。缓存的 TTL 根据知识的时效性动态调整——变化频繁的实体(如"当前股价")TTL 短,稳定的事实(如"公司成立时间")TTL 长。
异步图更新:Knowledge Gap Detector 产生的更新任务不应阻塞检索流程。使用消息队列(如 Celery + Redis)异步执行图谱更新,更新完成后通知缓存失效。
模型分级:不是所有环节都需要最强的模型。Planner 和 Reasoner 需要强推理能力,使用 GPT-4o 或 Claude Opus;Retriever 的工具调用决策相对简单,可以使用 GPT-4o-mini 或 Claude Haiku;Cypher 生成可以使用经过 fine-tuning 的专用小模型。
模型分级策略:
Planner Agent: Premium (GPT-4o / Claude Opus) -- 需要强推理
Reasoner Agent: Premium (GPT-4o / Claude Opus) -- 需要综合判断
Retriever Agent: Balanced (GPT-4o-mini / Haiku) -- 工具调用为主
Gap Detector: Balanced (GPT-4o-mini / Haiku) -- 模式匹配为主
Cypher Generator: Fast (Fine-tuned small model) -- 固定模式翻译
八、展望:从工具到基础设施
8.1 知识图谱作为 Agent 的操作系统
如果说 LLM 是 Agent 的"大脑",那么知识图谱正在成为 Agent 的"文件系统"。Agent 读取知识图谱就像程序读取文件系统——通过结构化的路径(关系链)访问特定的信息(实体和属性),而不是在一堆无结构的文本中盲目搜索。
这个类比还可以进一步延伸:MCP 之于知识图谱,就像 POSIX 之于文件系统——提供了一套标准化的接口,使得上层应用(Agent)不需要关心底层存储的具体实现。今天用 Neo4j,明天换成 NebulaGraph,只要 MCP Server 的接口不变,Agent 的代码一行不用改。
8.2 多 Agent 图谱协作的未来
当前的多 Agent 系统主要是"一个 Orchestrator + 多个 Worker"的层级结构。未来的方向是 Peer-to-Peer 的 Agent 图谱协作——多个 Agent 各自维护自己领域的子图,通过标准协议(如 A2A, Agent-to-Agent Protocol)交换知识碎片,形成一个去中心化的、不断生长的全局知识图谱。
想象这样一个场景:法务 Agent 在其法规知识图谱中发现了一条新法规,财务 Agent 在其合规知识图谱中发现了一条违规风险,两者通过知识交换协议自动建立了"新法规 -> IMPACTS -> 违规风险"的跨域关系——这是纯粹的人类组织流程需要数周才能完成的工作。
8.3 关键挑战
尽管前景令人兴奋,但多智能体 GraphRAG 仍面临若干关键挑战:
- 幻觉传播:如果 Agent 基于 LLM 的推理在知识图谱中写入了错误的关系,这个错误会通过图遍历传播到后续的检索结果中。需要设计"知识验证 Agent"来周期性地审查图谱中的新增内容。
- Schema 演化:当知识图谱的 schema 发生变化(新增节点类型、重命名关系类型),所有依赖旧 schema 的 Agent 都需要更新。MCP 可以缓解这个问题,但不能完全消除。
- 成本可预测性:Agent 的迭代式检索使得查询成本难以预测。同一个问题在不同的图谱状态下可能触发完全不同数量的查询。需要更精细的成本预估模型。
- 评测标准:如何评估 Agentic GraphRAG 的质量?不仅要评估最终回答的准确性,还要评估探索路径的效率、知识缺口检测的召回率、以及图谱更新的正确率。
结语
多智能体 GraphRAG 不是一个单一的技术,而是三个领域交汇产生的新范式:Agent 系统的自主推理能力、知识图谱的结构化语义表示、以及 MCP 等标准协议带来的可组合性。它的本质是让 AI 从"被动回答问题"进化到"主动探索知识"——这与人类研究者的工作方式是一致的。
在实际落地中,建议从简单开始:先用 Neo4j MCP Server 给现有的 Agent 添加图谱检索能力,再逐步引入多 Agent 协作、时序感知(Graphiti)、知识缺口检测。每一层都有独立的价值,不需要一次性构建完整的系统。
知识图谱的价值在于关系,Agent 的价值在于推理。当两者结合,AI 系统获得的不只是更好的检索,而是一种全新的"理解"能力——理解实体之间的因果、理解知识随时间的演变、理解跨领域概念之间的隐藏联系。这是向着真正的知识智能迈出的关键一步。
参考资源
- Neo4j GraphRAG + Agentic Architecture
- Graphiti: Build Real-Time Knowledge Graphs for AI Agents
- Zep: A Temporal Knowledge Graph Architecture for Agent Memory (arXiv 2501.13956)
- Neo4j MCP Servers
- LangGraph + Neo4j Knowledge Graph
- Agentic RAG with Knowledge Graphs for Complex Multi-Hop Reasoning
- Graph RAG in 2026: A Practitioner's Guide
- GraphRAG & MCP: The New Standard for Agentic Data Architecture
Maurice | maurice_wen@proton.me