多智能体 GraphRAG:Agent 驱动的知识图谱检索

从被动检索到主动探索——当 AI Agent 学会在知识图谱中自主导航


引言:RAG 的第三次进化

检索增强生成(Retrieval-Augmented Generation, RAG)经历了三个阶段。第一阶段是朴素 RAG:把文档切片、向量化、top-k 召回,拼接到 prompt 里让 LLM 回答。第二阶段是 GraphRAG:Microsoft 在 2024 年提出的方案,用 LLM 从语料中抽取实体和关系构建知识图谱,再基于 community detection 做分层摘要,使得"全局性问题"也能被有效回答。第三阶段——也就是本文要讨论的——是 Agentic GraphRAG:让自主 Agent 成为检索的驾驶员,动态决定"问什么、怎么问、问完之后还要不要继续问"。

这不是简单地给 Agent 挂一个图数据库工具。真正的变革在于:Agent 能够理解图的 schema、规划多跳遍历路径、在检索过程中发现知识缺口并触发图谱更新——从被动的"给我答案"变成主动的"让我探索"。

本文将从架构、设计模式、协议集成、实现代码、时序图谱、到生产部署六个维度,系统阐述如何构建一个多智能体驱动的 GraphRAG 系统。


一、从被动检索到主动探索

1.1 静态检索的三个瓶颈

即使是 GraphRAG,在面对复杂的实际查询时仍然存在结构性的局限。

瓶颈一:检索策略的僵化。 传统 RAG 的检索策略在 query 发出之前就已经固定——要么做 local search(基于实体邻域),要么做 global search(基于 community 摘要)。但真实问题往往需要在两种策略之间动态切换。例如,"这家公司最近三年的供应链风险事件,以及这些事件与行业监管政策变化之间的关联是什么?"这个问题的前半部分需要 local search(围绕特定公司实体做多跳遍历),后半部分需要 global search(跨行业的政策趋势总结),而两者之间还需要一个 cross-domain 的关联推理。

瓶颈二:单轮检索的天花板。 一次检索很少能获得足够的上下文。人类研究员在查资料时,会根据第一轮搜索结果调整关键词、追问细节、沿着引用链深入。但静态 RAG 只做一次检索就把结果交给 LLM,相当于让研究员只看第一页搜索结果就写报告。

瓶颈三:知识图谱的静态性。 图谱在构建之后就是一个快照。当 Agent 在检索过程中发现"图里没有这个实体"或"这条关系似乎过时了",传统架构没有机制让检索过程反过来驱动图谱更新。

1.2 Agent 带来的范式转换

将 Agent 引入 GraphRAG 后,检索从"函数调用"变成了"探索过程"。Agent 具备以下能力:

  • 动态策略选择:根据问题的性质和已检索到的信息,实时决定是做向量检索、图遍历、还是结构化查询。
  • 迭代式深入:第一轮检索的结果成为第二轮检索的输入,Agent 可以追问、关联、验证,直到信息充分。
  • 跨域知识综合:不同领域的子图可以由不同的专业 Agent 负责检索,最终由 Orchestrator Agent 综合推理。
  • 知识缺口感知:Agent 能识别出"图谱中缺少的连接",并触发自动化的知识补全流程。

这种范式转换的本质是:让 LLM 的推理能力与知识图谱的结构化语义形成闭环。Agent 不只是图谱的消费者,也是图谱的维护者。

1.3 多智能体的必要性

为什么需要"多"智能体?因为图谱驱动的检索任务天然具有可分解性。一个复杂查询可以被拆解为:

  • 检索子任务:从图谱中提取相关子图
  • 验证子任务:交叉验证检索结果的一致性
  • 推理子任务:基于检索结果进行逻辑推导
  • 更新子任务:将新发现的知识写回图谱

每个子任务的最佳执行策略不同,由专门的 Agent 负责可以获得更好的结果。这与软件工程中的"单一职责原则"一脉相承。


二、Agentic RAG 架构

2.1 核心架构:Agent 作为编排器

Agentic GraphRAG 的核心思想是让 Agent 担任检索过程的编排器(Orchestrator),而不是被动地调用一个检索函数。Agent 执行一个 Plan-Retrieve-Reason-Iterate 循环:

graph TB
    subgraph "Agentic GraphRAG 核心循环"
        A[用户查询] --> B[Planner Agent]
        B --> C{策略决策}
        C -->|向量检索| D[Vector Search]
        C -->|图遍历| E[Graph Traversal]
        C -->|结构化查询| F[Cypher/SPARQL Query]
        C -->|社区摘要| G[Community Summary]
        D --> H[Reasoner Agent]
        E --> H
        F --> H
        G --> H
        H --> I{信息充分?}
        I -->|否| J[Query Reformulator]
        J --> B
        I -->|是| K[Response Synthesizer]
        K --> L[最终回答]
        H --> M{发现知识缺口?}
        M -->|是| N[Graph Updater Agent]
        N --> O[(知识图谱)]
        E -.->|读取| O
        F -.->|查询| O
    end

    style B fill:#e1f5fe
    style H fill:#f3e5f5
    style N fill:#fff3e0
    style O fill:#e8f5e9

2.2 四层架构设计

一个生产级的 Agentic GraphRAG 系统通常包含四个层次。

第一层:接入层(Interface Layer)。 接收用户查询,做意图分类和复杂度评估。简单的事实性查询(如"谁是这家公司的CEO")可以直接路由到单次图查询,无需启动完整的 Agent 循环。复杂的分析性查询才会触发多 Agent 协作。

第二层:编排层(Orchestration Layer)。 这是系统的大脑。Orchestrator Agent 负责任务分解、Agent 调度、结果汇聚。它维护一个 Task Graph,记录每个子任务的状态(pending / running / completed / failed),并根据依赖关系决定执行顺序。

第三层:检索层(Retrieval Layer)。 包含多种检索工具:向量数据库(用于语义相似度搜索)、图数据库(用于结构化遍历和查询)、全文索引(用于关键词匹配)。每种工具由专门的 Tool Agent 封装,提供统一的调用接口。

第四层:知识层(Knowledge Layer)。 知识图谱本身,包括实体、关系、属性、社区结构、时序信息。这一层不是只读的——Agent 可以通过图更新接口向图谱中添加新实体和关系。

2.3 检索策略的动态决策

Agent 如何决定使用哪种检索策略?这是 Agentic GraphRAG 的核心设计问题。一种有效的方法是让 Planner Agent 基于以下信号做决策:

信号 判断逻辑 推荐策略
查询中包含特定实体名称 需要从已知实体出发探索 Graph Traversal (local search)
查询要求"总结"或"趋势" 需要跨多个社区的聚合信息 Community Summary (global search)
查询包含模糊描述 实体不确定,需要语义匹配 Vector Search + Entity Linking
查询涉及多跳关系 "A 和 B 之间有什么联系" Multi-hop Graph Query
前一轮检索结果不充分 需要换一种角度检索 策略切换或 Query Reformulation

这种决策不是硬编码的 if-else,而是 LLM 基于当前上下文的推理。Agent 的 system prompt 中会包含策略选择的指导原则和示例,LLM 在每一步根据已有信息动态选择最优策略。

2.4 最小可行上下文(Minimum Viable Context)

一个关键的设计原则是 MVC(Minimum Viable Context)——每一步只给 Agent 提供当前步骤所需的最少信息。

在传统 RAG 中,检索结果全部堆积到 prompt 里,导致 context window 迅速膨胀。在 Agentic GraphRAG 中,Orchestrator 会精确控制每个 Agent 接收到的上下文:

  • Planner Agent 只需要用户查询 + 图谱的 schema 概览 + 前几轮的检索摘要
  • Retriever Agent 只需要具体的检索指令(查询语句或遍历起点)
  • Reasoner Agent 只需要当前轮次的检索结果 + 推理目标

这种"按需供给"的上下文管理避免了 Lost-in-the-Middle 问题,也大幅降低了 token 消耗。


三、Graph-aware Agent 设计模式

3.1 模式一:Graph Navigator Agent

职责:在知识图谱中进行结构化遍历,沿着实体之间的关系路径探索相关信息。

Graph Navigator 是最基础也是最核心的设计模式。它把知识图谱视为一个可以"行走"的空间,Agent 站在某个实体节点上,观察周围的关系,决定下一步走向哪里。

from typing import TypedDict, Annotated, Literal
from langgraph.graph import StateGraph, END
from langchain_core.messages import HumanMessage, AIMessage


class NavigatorState(TypedDict):
    """Graph Navigator Agent 的状态定义"""
    query: str
    current_entity: str
    visited_entities: list[str]
    collected_facts: list[dict]
    max_hops: int
    hop_count: int
    should_continue: bool


def get_entity_neighbors(entity_id: str, graph_client) -> dict:
    """从图数据库获取实体的邻居节点和关系"""
    cypher = """
    MATCH (e {id: $entity_id})-[r]-(neighbor)
    RETURN type(r) AS relation,
           neighbor.id AS neighbor_id,
           neighbor.name AS neighbor_name,
           properties(r) AS relation_props
    LIMIT 20
    """
    return graph_client.query(cypher, {"entity_id": entity_id})


def navigate_step(state: NavigatorState, llm, graph_client) -> NavigatorState:
    """Navigator Agent 的单步遍历逻辑"""
    # 获取当前节点的邻居
    neighbors = get_entity_neighbors(state["current_entity"], graph_client)

    # 让 LLM 决定下一步走向哪个邻居
    prompt = f"""你是一个知识图谱导航 Agent。
当前位置: {state['current_entity']}
原始查询: {state['query']}
已访问节点: {state['visited_entities']}
已收集事实: {state['collected_facts']}

当前节点的邻居关系:
{format_neighbors(neighbors)}

请决定:
1. 从邻居中选择最相关的节点继续探索(输出 neighbor_id)
2. 说明选择理由
3. 从当前位置提取与查询相关的事实
4. 判断是否已经收集到足够的信息(should_stop: true/false)
"""
    response = llm.invoke(prompt)
    decision = parse_navigation_decision(response)

    # 更新状态
    new_facts = state["collected_facts"] + decision["extracted_facts"]
    new_visited = state["visited_entities"] + [state["current_entity"]]

    return {
        **state,
        "current_entity": decision["next_entity"],
        "visited_entities": new_visited,
        "collected_facts": new_facts,
        "hop_count": state["hop_count"] + 1,
        "should_continue": (
            not decision["should_stop"]
            and state["hop_count"] + 1 < state["max_hops"]
        ),
    }

这种模式的优势在于:Agent 的每一步决策都是 context-aware 的——它知道自己从哪里来、要去哪里、已经发现了什么。这比预定义的 Cypher 查询灵活得多,因为遍历路径是在运行时根据实际图谱内容动态生成的。

3.2 模式二:Schema-aware Query Planner

职责:理解知识图谱的 schema(节点类型、关系类型、属性约束),将自然语言查询转换为精确的图查询语句。

这个模式的关键创新是:不是让 LLM 直接生成 Cypher/SPARQL(这通常会出错),而是先让 Agent 理解 schema,制定查询计划,然后再生成查询语句。

class SchemaAwareQueryPlanner:
    """基于 Schema 感知的查询规划 Agent"""

    def __init__(self, llm, graph_client):
        self.llm = llm
        self.graph_client = graph_client
        self.schema = self._load_schema()

    def _load_schema(self) -> dict:
        """从图数据库加载 schema 信息"""
        node_labels = self.graph_client.query(
            "CALL db.labels() YIELD label RETURN collect(label) AS labels"
        )
        rel_types = self.graph_client.query(
            "CALL db.relationshipTypes() YIELD relationshipType "
            "RETURN collect(relationshipType) AS types"
        )
        # 获取每种节点类型的属性
        properties = {}
        for label in node_labels[0]["labels"]:
            props = self.graph_client.query(
                f"MATCH (n:{label}) "
                f"RETURN keys(n) AS props LIMIT 1"
            )
            if props:
                properties[label] = props[0]["props"]

        return {
            "node_labels": node_labels[0]["labels"],
            "relationship_types": rel_types[0]["types"],
            "node_properties": properties,
        }

    def plan_and_query(self, natural_language_query: str) -> list[dict]:
        """将自然语言转换为查询计划并执行"""
        # Step 1: 查询规划
        plan_prompt = f"""你是一个图数据库查询规划器。

图谱 Schema:
- 节点类型: {self.schema['node_labels']}
- 关系类型: {self.schema['relationship_types']}
- 节点属性: {self.schema['node_properties']}

用户查询: {natural_language_query}

请输出查询计划:
1. 需要查找的实体类型
2. 需要遍历的关系路径
3. 需要的过滤条件
4. 最终需要返回的信息

以 JSON 格式输出。
"""
        plan = self.llm.invoke(plan_prompt)
        query_plan = parse_json(plan)

        # Step 2: 基于计划生成 Cypher
        cypher_prompt = f"""基于以下查询计划生成 Cypher 查询语句。

查询计划: {query_plan}
图谱 Schema: {self.schema}

要求:
- 使用参数化查询,避免注入风险
- 包含 LIMIT 子句防止结果过大
- 如果需要多跳遍历,使用可变长度路径模式

只输出 Cypher 语句,不要解释。
"""
        cypher = self.llm.invoke(cypher_prompt).content.strip()

        # Step 3: 执行并验证
        try:
            results = self.graph_client.query(cypher)
            return results
        except Exception as e:
            # 查询失败时进入自修复循环
            return self._self_heal_query(cypher, str(e), query_plan)

    def _self_heal_query(
        self, failed_cypher: str, error: str, plan: dict
    ) -> list[dict]:
        """查询失败时的自修复机制"""
        fix_prompt = f"""Cypher 查询执行失败。

原始查询: {failed_cypher}
错误信息: {error}
查询计划: {plan}
图谱 Schema: {self.schema}

请修正 Cypher 查询。常见问题:
- 属性名拼写错误
- 关系方向错误
- 缺少必要的 WHERE 条件
"""
        fixed_cypher = self.llm.invoke(fix_prompt).content.strip()
        return self.graph_client.query(fixed_cypher)

3.3 模式三:Multi-hop Reasoner

职责:沿着实体链进行多跳推理,回答需要关联多个实体的复杂问题。

Multi-hop Reasoner 与 Graph Navigator 的区别在于:Navigator 侧重"探索"(不确定目标在哪),Reasoner 侧重"证明"(已知起点和终点,需要找到连接路径并解释因果关系)。

class MultiHopReasoner:
    """多跳推理 Agent:沿实体链进行因果推理"""

    def __init__(self, llm, graph_client):
        self.llm = llm
        self.graph_client = graph_client

    def reason(self, query: str, source_entity: str,
               target_entity: str = None) -> dict:
        """执行多跳推理"""
        # Step 1: 识别关键实体
        entities = self._extract_entities(query)
        if source_entity and source_entity not in entities:
            entities.insert(0, source_entity)

        # Step 2: 查找实体之间的路径
        paths = []
        for i in range(len(entities) - 1):
            path = self._find_shortest_paths(
                entities[i], entities[i + 1], max_hops=4
            )
            paths.append(path)

        # Step 3: 沿路径收集证据
        evidence_chain = []
        for path in paths:
            for hop in path:
                context = self._get_entity_context(hop["node_id"])
                evidence_chain.append({
                    "entity": hop["node_id"],
                    "entity_name": hop["name"],
                    "relation_from_previous": hop.get("relation"),
                    "local_context": context,
                })

        # Step 4: 基于证据链进行推理
        reasoning_prompt = f"""你是一个基于知识图谱的推理 Agent。

原始问题: {query}
证据链:
{self._format_evidence_chain(evidence_chain)}

请基于证据链进行推理:
1. 沿着实体链,解释每一跳关系的含义
2. 综合所有关系,回答原始问题
3. 标注推理的置信度(high/medium/low)
4. 指出证据链中是否存在薄弱环节
"""
        reasoning = self.llm.invoke(reasoning_prompt)
        return {
            "answer": reasoning.content,
            "evidence_chain": evidence_chain,
            "entity_path": [e["entity_name"] for e in evidence_chain],
            "hop_count": len(evidence_chain) - 1,
        }

    def _find_shortest_paths(
        self, start: str, end: str, max_hops: int = 4
    ) -> list[dict]:
        """在图谱中查找两个实体之间的最短路径"""
        cypher = """
        MATCH path = shortestPath(
            (a {name: $start})-[*1..%d]-(b {name: $end})
        )
        RETURN [n IN nodes(path) |
            {node_id: n.id, name: n.name}] AS nodes,
               [r IN relationships(path) |
            type(r)] AS relations
        LIMIT 3
        """ % max_hops
        results = self.graph_client.query(
            cypher, {"start": start, "end": end}
        )
        if not results:
            return []
        # 重组为跳跃序列
        path_data = results[0]
        hops = []
        for i, node in enumerate(path_data["nodes"]):
            hop = {**node}
            if i > 0:
                hop["relation"] = path_data["relations"][i - 1]
            hops.append(hop)
        return hops

    def _get_entity_context(self, entity_id: str) -> str:
        """获取实体的局部上下文(属性 + 一跳邻居摘要)"""
        cypher = """
        MATCH (e {id: $entity_id})
        OPTIONAL MATCH (e)-[r]-(n)
        RETURN e AS entity,
               collect({
                   relation: type(r),
                   neighbor: n.name
               })[..5] AS neighbors
        """
        result = self.graph_client.query(
            cypher, {"entity_id": entity_id}
        )
        if not result:
            return "No context available"
        entity = result[0]["entity"]
        neighbors = result[0]["neighbors"]
        return f"Properties: {entity}, Related: {neighbors}"

3.4 模式四:Knowledge Gap Detector

职责:在检索过程中识别知识图谱中的缺失信息,并触发图谱更新流程。

这是最具前瞻性的模式。传统系统把"图谱中找不到"视为终点,而 Knowledge Gap Detector 把它视为起点——一个改进图谱的机会。

class KnowledgeGapDetector:
    """知识缺口检测 Agent"""

    def __init__(self, llm, graph_client, update_queue):
        self.llm = llm
        self.graph_client = graph_client
        self.update_queue = update_queue  # 异步更新队列

    def detect_and_fill(
        self, query: str, retrieval_results: list[dict]
    ) -> dict:
        """检测知识缺口并触发更新"""
        # Step 1: 分析检索结果的完整性
        gap_analysis_prompt = f"""分析以下检索结果相对于原始查询的完整性。

原始查询: {query}
检索结果: {retrieval_results}

请识别:
1. 查询中提到但图谱中不存在的实体
2. 应该存在但缺失的关系
3. 属性值过时或缺失的实体
4. 推理链中断的位置

以 JSON 格式输出,包含:
- missing_entities: [{name, expected_type, reason}]
- missing_relations: [{source, target, expected_type, reason}]
- stale_data: [{entity, property, current_value, issue}]
- broken_chains: [{from_entity, to_entity, gap_description}]
"""
        gaps = self.llm.invoke(gap_analysis_prompt)
        gap_report = parse_json(gaps)

        # Step 2: 对每个缺口生成更新任务
        update_tasks = []
        for entity in gap_report.get("missing_entities", []):
            task = {
                "type": "add_entity",
                "payload": entity,
                "priority": self._assess_priority(entity, query),
                "source": "gap_detection",
                "triggered_by_query": query,
            }
            update_tasks.append(task)

        for relation in gap_report.get("missing_relations", []):
            task = {
                "type": "add_relation",
                "payload": relation,
                "priority": self._assess_priority(relation, query),
                "source": "gap_detection",
                "triggered_by_query": query,
            }
            update_tasks.append(task)

        # Step 3: 将更新任务推入异步队列
        for task in update_tasks:
            self.update_queue.put(task)

        return {
            "gap_report": gap_report,
            "update_tasks_queued": len(update_tasks),
            "can_answer_now": len(gap_report.get("broken_chains", [])) == 0,
        }

    def _assess_priority(self, item: dict, query: str) -> str:
        """评估更新任务的优先级"""
        # 如果缺失的信息直接影响当前查询的回答,优先级为 high
        # 如果是补充性信息,优先级为 medium
        # 如果是潜在有用但非必需的,优先级为 low
        prompt = f"""评估这个知识缺口的补全优先级。
缺口: {item}
触发查询: {query}
输出: high / medium / low"""
        return self.llm.invoke(prompt).content.strip()

3.5 四种模式的协作

四种模式不是孤立使用的,而是在 Orchestrator Agent 的协调下形成协作网络。

graph LR
    subgraph "多 Agent 协作流程"
        Q[用户查询] --> O[Orchestrator Agent]
        O --> QP[Schema-aware<br/>Query Planner]
        O --> GN[Graph Navigator<br/>Agent]
        O --> MHR[Multi-hop<br/>Reasoner]

        QP -->|结构化结果| R[Result Aggregator]
        GN -->|探索发现| R
        MHR -->|推理链| R

        R --> KGD[Knowledge Gap<br/>Detector]
        KGD -->|缺口报告| O
        KGD -->|更新任务| UQ[Graph Update Queue]
        UQ --> KG[(知识图谱)]

        R --> SYN[Response Synthesizer]
        SYN --> ANS[最终回答]
    end

    style O fill:#e1f5fe
    style KGD fill:#fff3e0
    style KG fill:#e8f5e9
    style ANS fill:#f3e5f5

Orchestrator 的决策逻辑如下:

  1. 对于结构化的精确查询("公司 X 的注册资本是多少"),只启动 Query Planner。
  2. 对于探索性查询("这个领域有哪些值得关注的趋势"),启动 Graph Navigator + Community Summary。
  3. 对于关联性查询("A 和 B 之间有什么联系"),启动 Multi-hop Reasoner。
  4. 对于复合查询,并行启动多个 Agent,由 Result Aggregator 合并结果。
  5. 所有查询结果都经过 Knowledge Gap Detector 的审查。

四、MCP + 知识图谱:协议驱动的图谱工具化

4.1 为什么需要 MCP

Model Context Protocol(MCP)是 Anthropic 在 2024 年底发布的开放协议,旨在标准化 LLM 与外部工具之间的交互方式。它解决了一个长期存在的问题:每个 LLM 框架都有自己的 tool calling 规范,导致同一个工具需要为不同框架编写不同的适配器。

对于知识图谱场景,MCP 的价值尤为突出。图数据库的操作——查询、遍历、更新——具有很强的通用性。把这些操作封装为 MCP Tool,意味着任何支持 MCP 的 Agent 框架(LangChain、CrewAI、Pydantic.AI 等)都可以直接使用,无需重复开发。

Neo4j 官方已经发布了多个 MCP Server:mcp-neo4j-cypher(Cypher 查询)、mcp-neo4j-memory(实体记忆存储)、mcp-neo4j-modeling(数据建模)。这为构建 Agentic GraphRAG 提供了现成的基础设施。

4.2 图谱 MCP Tool 设计

一个完整的知识图谱 MCP Server 应该暴露以下工具集。

from mcp.server import Server
from mcp.types import Tool, TextContent
import json

app = Server("knowledge-graph-mcp")


@app.tool()
async def graph_query(cypher: str, params: dict = None) -> str:
    """执行 Cypher 查询并返回结果。

    用于精确的结构化查询,例如查找特定实体的属性、
    统计关系数量、或执行聚合操作。

    Args:
        cypher: Cypher 查询语句(参数化)
        params: 查询参数字典
    Returns:
        查询结果的 JSON 字符串
    """
    results = await neo4j_driver.execute_query(cypher, params or {})
    return json.dumps(results, ensure_ascii=False, default=str)


@app.tool()
async def entity_search(
    query: str,
    entity_type: str = None,
    limit: int = 10,
    search_mode: str = "hybrid",
) -> str:
    """搜索知识图谱中的实体。

    支持三种搜索模式:
    - semantic: 基于 embedding 的语义搜索
    - keyword: 基于 BM25 的关键词搜索
    - hybrid: 混合搜索(推荐)

    Args:
        query: 搜索文本
        entity_type: 可选的实体类型过滤
        limit: 返回结果数量上限
        search_mode: 搜索模式 (semantic/keyword/hybrid)
    Returns:
        匹配实体列表的 JSON
    """
    if search_mode == "semantic":
        embedding = await embed_model.encode(query)
        results = await vector_index.search(
            embedding, top_k=limit, filter={"type": entity_type}
        )
    elif search_mode == "keyword":
        results = await fulltext_index.search(
            query, limit=limit, entity_type=entity_type
        )
    else:  # hybrid
        sem_results = await vector_index.search(
            await embed_model.encode(query), top_k=limit * 2
        )
        kw_results = await fulltext_index.search(query, limit=limit * 2)
        results = reciprocal_rank_fusion(sem_results, kw_results)[:limit]

    return json.dumps(results, ensure_ascii=False)


@app.tool()
async def relationship_traverse(
    start_entity: str,
    relation_types: list[str] = None,
    direction: str = "both",
    max_depth: int = 2,
    limit: int = 50,
) -> str:
    """从指定实体出发,沿关系路径遍历图谱。

    用于探索实体的关系网络,发现间接关联。

    Args:
        start_entity: 起始实体的名称或 ID
        relation_types: 要遍历的关系类型列表(空则遍历所有类型)
        direction: 遍历方向 (outgoing/incoming/both)
        max_depth: 最大遍历深度
        limit: 返回路径数量上限
    Returns:
        遍历到的路径和实体的 JSON
    """
    rel_filter = ""
    if relation_types:
        rel_filter = ":" + "|".join(relation_types)

    dir_pattern = {
        "outgoing": f"-[r{rel_filter}*1..{max_depth}]->",
        "incoming": f"<-[r{rel_filter}*1..{max_depth}]-",
        "both": f"-[r{rel_filter}*1..{max_depth}]-",
    }

    cypher = f"""
    MATCH (start {{name: $name}})
    MATCH path = (start){dir_pattern[direction]}(end)
    RETURN path, length(path) AS depth
    ORDER BY depth
    LIMIT $limit
    """
    results = await neo4j_driver.execute_query(
        cypher, {"name": start_entity, "limit": limit}
    )
    return json.dumps(format_paths(results), ensure_ascii=False)


@app.tool()
async def community_summary(
    community_id: str = None,
    topic: str = None,
    level: int = 0,
) -> str:
    """获取知识图谱社区的摘要信息。

    社区是通过图聚类算法(如 Leiden)自动发现的实体组。
    每个社区有不同层级的摘要,level 越高摘要越抽象。

    Args:
        community_id: 社区 ID(精确获取)
        topic: 主题关键词(模糊匹配相关社区)
        level: 摘要层级 (0=最详细, 更高=更抽象)
    Returns:
        社区摘要信息的 JSON
    """
    if community_id:
        cypher = """
        MATCH (c:Community {id: $id, level: $level})
        RETURN c.summary AS summary,
               c.title AS title,
               c.entity_count AS entity_count,
               c.key_entities AS key_entities
        """
        results = await neo4j_driver.execute_query(
            cypher, {"id": community_id, "level": level}
        )
    elif topic:
        # 基于语义搜索找到相关社区
        embedding = await embed_model.encode(topic)
        results = await community_vector_index.search(
            embedding, top_k=5, filter={"level": level}
        )
    else:
        # 返回顶层社区概览
        cypher = """
        MATCH (c:Community {level: $level})
        RETURN c.title AS title,
               c.summary AS summary,
               c.entity_count AS entity_count
        ORDER BY c.entity_count DESC
        LIMIT 10
        """
        results = await neo4j_driver.execute_query(
            cypher, {"level": max(level, 1)}
        )

    return json.dumps(results, ensure_ascii=False)


@app.tool()
async def graph_update(
    operations: list[dict],
    source: str = "agent",
    require_verification: bool = True,
) -> str:
    """向知识图谱中添加或更新实体和关系。

    支持的操作类型:
    - add_entity: 添加新实体
    - add_relation: 添加新关系
    - update_property: 更新实体属性
    - invalidate_relation: 标记关系为失效(不物理删除)

    Args:
        operations: 操作列表,每个操作包含 type 和 payload
        source: 操作来源标识
        require_verification: 是否需要二次验证
    Returns:
        操作执行结果的 JSON
    """
    results = []
    for op in operations:
        if require_verification:
            # 先检查操作是否与现有图谱一致
            conflict = await check_conflict(op)
            if conflict:
                results.append({
                    "operation": op,
                    "status": "conflict",
                    "detail": conflict,
                })
                continue

        result = await execute_graph_operation(op, source)
        results.append(result)

    return json.dumps(results, ensure_ascii=False)

4.3 MCP 架构全景

graph TB
    subgraph "Agent 框架层"
        LA[LangGraph Agent]
        CA[CrewAI Agent]
        PA[Pydantic.AI Agent]
    end

    subgraph "MCP 协议层"
        MC[MCP Client]
        LA --> MC
        CA --> MC
        PA --> MC
    end

    subgraph "MCP Server: Knowledge Graph"
        MS[MCP Server]
        MC <-->|JSON-RPC / stdio| MS

        T1[graph_query]
        T2[entity_search]
        T3[relationship_traverse]
        T4[community_summary]
        T5[graph_update]

        MS --> T1
        MS --> T2
        MS --> T3
        MS --> T4
        MS --> T5
    end

    subgraph "存储层"
        N4J[(Neo4j<br/>图数据库)]
        VS[(Vector Index<br/>语义索引)]
        FT[(Full-text Index<br/>关键词索引)]

        T1 --> N4J
        T2 --> VS
        T2 --> FT
        T3 --> N4J
        T4 --> N4J
        T4 --> VS
        T5 --> N4J
    end

    style MC fill:#e1f5fe
    style MS fill:#f3e5f5
    style N4J fill:#e8f5e9

4.4 MCP 配置示例

在 Claude Desktop 或 Claude Code 中使用知识图谱 MCP Server 的配置方式:

{
  "mcpServers": {
    "knowledge-graph": {
      "command": "python",
      "args": ["-m", "kg_mcp_server"],
      "env": {
        "NEO4J_URI": "bolt://localhost:7687",
        "NEO4J_USER": "neo4j",
        "NEO4J_PASSWORD": "${NEO4J_PASSWORD}",
        "EMBEDDING_MODEL": "text-embedding-3-small"
      }
    },
    "neo4j-cypher": {
      "command": "npx",
      "args": ["-y", "@neo4j-contrib/mcp-neo4j-cypher"],
      "env": {
        "NEO4J_URI": "bolt://localhost:7687",
        "NEO4J_USER": "neo4j",
        "NEO4J_PASSWORD": "${NEO4J_PASSWORD}"
      }
    }
  }
}

五、实现示例:LangGraph + Neo4j 的 Agentic GraphRAG

5.1 完整实现:多 Agent 图谱检索系统

以下是一个基于 LangGraph 的完整实现,展示了如何构建一个具备图谱感知能力的多 Agent 系统。

"""
Agentic GraphRAG: LangGraph + Neo4j 完整实现

依赖:
  pip install langgraph langchain-openai neo4j langchain-community
"""

from typing import TypedDict, Annotated, Literal
from langgraph.graph import StateGraph, END
from langgraph.prebuilt import ToolNode
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
from langchain_core.messages import HumanMessage, SystemMessage
from neo4j import GraphDatabase
import json
import operator


# ============================================================
# 1. 图数据库连接与工具定义
# ============================================================

driver = GraphDatabase.driver(
    "bolt://localhost:7687", auth=("neo4j", "password")
)


def execute_cypher(query: str, params: dict = None) -> list[dict]:
    """执行 Cypher 查询的工具函数"""
    with driver.session() as session:
        result = session.run(query, params or {})
        return [dict(record) for record in result]


@tool
def search_entities(query: str, entity_type: str = "") -> str:
    """在知识图谱中搜索实体。
    输入自然语言描述,返回匹配的实体列表。
    可选指定 entity_type 过滤实体类型。
    """
    cypher = """
    CALL db.index.fulltext.queryNodes('entity_fulltext', $query)
    YIELD node, score
    WHERE score > 0.5
    """
    if entity_type:
        cypher += f" AND '{entity_type}' IN labels(node)"
    cypher += """
    RETURN node.name AS name,
           node.id AS id,
           labels(node) AS types,
           node.description AS description,
           score
    ORDER BY score DESC
    LIMIT 10
    """
    results = execute_cypher(cypher, {"query": query})
    return json.dumps(results, ensure_ascii=False, default=str)


@tool
def traverse_relationships(
    entity_name: str, max_depth: int = 2
) -> str:
    """从指定实体出发遍历关系网络。
    返回指定深度内的所有相关实体和关系。
    """
    cypher = """
    MATCH (start {name: $name})
    MATCH path = (start)-[*1..$depth]-(connected)
    WITH path, connected,
         [r IN relationships(path) | type(r)] AS rel_types,
         [n IN nodes(path) | n.name] AS node_names
    RETURN DISTINCT
        node_names AS path_nodes,
        rel_types AS path_relations,
        connected.name AS end_entity,
        connected.description AS end_description,
        length(path) AS depth
    ORDER BY depth
    LIMIT 30
    """
    results = execute_cypher(
        cypher, {"name": entity_name, "depth": max_depth}
    )
    return json.dumps(results, ensure_ascii=False, default=str)


@tool
def execute_graph_query(cypher_query: str) -> str:
    """执行自定义 Cypher 查询。
    用于复杂的图模式匹配和聚合查询。
    查询必须包含 LIMIT 子句。
    """
    if "LIMIT" not in cypher_query.upper():
        cypher_query += " LIMIT 20"
    try:
        results = execute_cypher(cypher_query)
        return json.dumps(results, ensure_ascii=False, default=str)
    except Exception as e:
        return json.dumps({"error": str(e)})


@tool
def get_graph_schema() -> str:
    """获取知识图谱的 schema 信息。
    返回所有节点类型、关系类型及其属性。
    """
    labels = execute_cypher(
        "CALL db.labels() YIELD label RETURN collect(label) AS labels"
    )
    rel_types = execute_cypher(
        "CALL db.relationshipTypes() YIELD relationshipType "
        "RETURN collect(relationshipType) AS types"
    )
    # 采样获取属性信息
    properties = {}
    for label in labels[0]["labels"]:
        props = execute_cypher(
            f"MATCH (n:`{label}`) RETURN keys(n) AS props LIMIT 1"
        )
        if props:
            properties[label] = props[0]["props"]

    schema = {
        "node_labels": labels[0]["labels"],
        "relationship_types": rel_types[0]["types"],
        "node_properties": properties,
    }
    return json.dumps(schema, ensure_ascii=False)


@tool
def find_paths_between(
    entity_a: str, entity_b: str, max_hops: int = 4
) -> str:
    """查找两个实体之间的所有路径。
    用于发现隐藏的关联关系。
    """
    cypher = f"""
    MATCH (a {{name: $entity_a}}), (b {{name: $entity_b}})
    MATCH path = allShortestPaths((a)-[*..{max_hops}]-(b))
    RETURN [n IN nodes(path) | n.name] AS path_nodes,
           [r IN relationships(path) | type(r)] AS relations,
           length(path) AS path_length
    LIMIT 5
    """
    results = execute_cypher(
        cypher, {"entity_a": entity_a, "entity_b": entity_b}
    )
    if not results:
        return json.dumps({
            "message": f"No path found between "
                       f"'{entity_a}' and '{entity_b}' "
                       f"within {max_hops} hops"
        })
    return json.dumps(results, ensure_ascii=False, default=str)


# ============================================================
# 2. Agent 状态定义
# ============================================================

class AgentState(TypedDict):
    """多 Agent 系统的共享状态"""
    messages: Annotated[list, operator.add]
    query: str
    retrieval_strategy: str
    retrieved_context: list[dict]
    reasoning_steps: list[str]
    iteration_count: int
    max_iterations: int
    is_sufficient: bool
    final_answer: str


# ============================================================
# 3. Agent 节点定义
# ============================================================

llm = ChatOpenAI(model="gpt-4o", temperature=0)

tools = [
    search_entities,
    traverse_relationships,
    execute_graph_query,
    get_graph_schema,
    find_paths_between,
]

llm_with_tools = llm.bind_tools(tools)


def planner_node(state: AgentState) -> dict:
    """规划 Agent:分析查询并制定检索策略"""
    system_prompt = """你是一个知识图谱检索规划 Agent。

你的任务是分析用户查询,制定最优的检索策略。

可用的检索策略:
1. entity_search - 查找特定实体
2. graph_traversal - 从实体出发遍历关系网络
3. structured_query - 用 Cypher 做结构化查询
4. path_finding - 查找两个实体之间的路径
5. multi_strategy - 组合使用多种策略

请基于查询的性质选择策略,并制定具体的执行计划。
首先调用 get_graph_schema 了解图谱结构,然后制定计划。
"""
    messages = [
        SystemMessage(content=system_prompt),
        HumanMessage(content=(
            f"用户查询: {state['query']}\n"
            f"当前迭代: {state['iteration_count']}/{state['max_iterations']}\n"
            f"已有上下文: {len(state.get('retrieved_context', []))} 条"
        )),
    ]

    if state.get("reasoning_steps"):
        messages.append(HumanMessage(content=(
            f"前几轮的推理: {state['reasoning_steps']}"
        )))

    response = llm_with_tools.invoke(messages)
    return {"messages": [response]}


def retriever_node(state: AgentState) -> dict:
    """检索 Agent:执行具体的图谱检索操作"""
    system_prompt = """你是一个知识图谱检索执行 Agent。

根据 Planner 的策略,使用可用工具执行检索。

执行原则:
1. 优先使用 search_entities 定位关键实体
2. 对找到的实体使用 traverse_relationships 探索关系
3. 需要精确查询时使用 execute_graph_query
4. 需要发现关联时使用 find_paths_between
5. 每次检索要记录发现了什么、还缺什么
"""
    messages = [SystemMessage(content=system_prompt)] + state["messages"]
    response = llm_with_tools.invoke(messages)
    return {"messages": [response]}


def reasoner_node(state: AgentState) -> dict:
    """推理 Agent:基于检索结果进行推理和判断"""
    system_prompt = """你是一个知识图谱推理 Agent。

你的任务:
1. 分析当前检索到的所有信息
2. 判断是否足够回答用户的查询
3. 如果不够,指出还需要检索什么
4. 如果足够,综合所有信息给出答案

输出格式 (JSON):
{
  "reasoning": "你的推理过程",
  "is_sufficient": true/false,
  "missing_info": "如果不够,说明缺什么",
  "answer": "如果足够,给出最终答案"
}
"""
    messages = [
        SystemMessage(content=system_prompt),
        HumanMessage(content=(
            f"用户查询: {state['query']}\n\n"
            f"检索到的信息:\n"
            f"{json.dumps(state.get('retrieved_context', []), ensure_ascii=False)}\n\n"
            f"对话历史消息数: {len(state['messages'])}"
        )),
    ]
    response = llm.invoke(messages)
    reasoning_result = parse_reasoning(response.content)

    new_steps = state.get("reasoning_steps", []) + [
        reasoning_result.get("reasoning", "")
    ]

    return {
        "messages": [response],
        "reasoning_steps": new_steps,
        "is_sufficient": reasoning_result.get("is_sufficient", False),
        "final_answer": reasoning_result.get("answer", ""),
        "iteration_count": state["iteration_count"] + 1,
    }


def should_continue(state: AgentState) -> Literal["retriever", "end"]:
    """条件边:决定是否继续检索"""
    if state.get("is_sufficient"):
        return "end"
    if state["iteration_count"] >= state["max_iterations"]:
        return "end"
    return "retriever"


def parse_reasoning(content: str) -> dict:
    """解析推理 Agent 的输出"""
    try:
        # 尝试提取 JSON
        start = content.find("{")
        end = content.rfind("}") + 1
        if start >= 0 and end > start:
            return json.loads(content[start:end])
    except json.JSONDecodeError:
        pass
    return {
        "reasoning": content,
        "is_sufficient": False,
        "answer": "",
    }


# ============================================================
# 4. 构建 LangGraph 工作流
# ============================================================

def build_agentic_graphrag() -> StateGraph:
    """构建 Agentic GraphRAG 工作流"""
    workflow = StateGraph(AgentState)

    # 添加节点
    workflow.add_node("planner", planner_node)
    workflow.add_node("retriever", retriever_node)
    workflow.add_node("tools", ToolNode(tools))
    workflow.add_node("reasoner", reasoner_node)

    # 设置入口
    workflow.set_entry_point("planner")

    # 添加边
    workflow.add_edge("planner", "retriever")

    # Retriever -> Tools (如果有 tool calls)
    # Retriever -> Reasoner (如果没有 tool calls)
    def route_retriever(state: AgentState):
        last_message = state["messages"][-1]
        if hasattr(last_message, "tool_calls") and last_message.tool_calls:
            return "tools"
        return "reasoner"

    workflow.add_conditional_edges(
        "retriever",
        route_retriever,
        {"tools": "tools", "reasoner": "reasoner"},
    )

    # Tools 执行后回到 Retriever
    workflow.add_edge("tools", "retriever")

    # Reasoner 的条件边
    workflow.add_conditional_edges(
        "reasoner",
        should_continue,
        {"retriever": "planner", "end": END},
    )

    return workflow.compile()


# ============================================================
# 5. 使用示例
# ============================================================

async def main():
    """运行 Agentic GraphRAG"""
    graph = build_agentic_graphrag()

    # 执行查询
    result = await graph.ainvoke({
        "query": "分析 OpenAI 和 Anthropic 在技术路线上的差异,"
                 "以及它们各自的投资方之间是否存在竞争关系",
        "messages": [],
        "retrieval_strategy": "",
        "retrieved_context": [],
        "reasoning_steps": [],
        "iteration_count": 0,
        "max_iterations": 5,
        "is_sufficient": False,
        "final_answer": "",
    })

    print("=== 最终回答 ===")
    print(result["final_answer"])
    print(f"\n=== 推理步骤 ({len(result['reasoning_steps'])} 步) ===")
    for i, step in enumerate(result["reasoning_steps"], 1):
        print(f"Step {i}: {step[:200]}...")


if __name__ == "__main__":
    import asyncio
    asyncio.run(main())

5.2 Agent 推理链示例

以下是上述系统处理一个真实查询时的推理链记录,展示了 Agent 如何在知识图谱中逐步探索:

Query: "分析 OpenAI 和 Anthropic 在技术路线上的差异,
        以及它们各自的投资方之间是否存在竞争关系"

Step 1 [Planner]:
  策略: multi_strategy
  计划:
    1. 先获取 schema 了解图谱结构
    2. 搜索 OpenAI 和 Anthropic 两个实体
    3. 分别遍历它们的技术关系和投资关系
    4. 查找投资方之间的路径

Step 2 [Retriever]:
  调用: get_graph_schema()
  调用: search_entities("OpenAI")
  调用: search_entities("Anthropic")
  发现: 两个实体均存在,类型为 Company

Step 3 [Retriever]:
  调用: traverse_relationships("OpenAI", max_depth=2)
  发现: OpenAI -> DEVELOPS -> GPT-4, GPT-4o, DALL-E
         OpenAI -> FUNDED_BY -> Microsoft, Thrive Capital
         OpenAI -> COMPETES_WITH -> Anthropic, Google DeepMind

Step 4 [Retriever]:
  调用: traverse_relationships("Anthropic", max_depth=2)
  发现: Anthropic -> DEVELOPS -> Claude, Constitutional AI
         Anthropic -> FUNDED_BY -> Google, Amazon, Salesforce
         Anthropic -> FOUNDED_BY -> Dario Amodei (ex-OpenAI)

Step 5 [Reasoner]:
  判断: 信息不充分 -- 缺少投资方之间的竞争关系数据
  需要: 查找 Microsoft 和 Google 之间的关系路径

Step 6 [Retriever]:
  调用: find_paths_between("Microsoft", "Google", max_hops=3)
  发现: Microsoft -> COMPETES_WITH -> Google (in: Cloud, Search, AI)
         Microsoft -> PARTNERS_WITH -> Google (in: Open Source)

Step 7 [Retriever]:
  调用: find_paths_between("Amazon", "Microsoft", max_hops=3)
  发现: Amazon -> COMPETES_WITH -> Microsoft (in: Cloud/AWS vs Azure)

Step 8 [Reasoner]:
  判断: 信息充分
  综合回答: 技术路线差异 + 投资方竞争格局分析

这个例子清晰地展示了 Agentic GraphRAG 相比静态 GraphRAG 的优势:Agent 在第 5 步发现信息不足后,自主决定追加查询投资方之间的关系,这种迭代式的检索在传统 RAG 中无法实现。


六、Graphiti 时序图谱:Agent 的时间感知记忆

6.1 为什么 Agent 需要时序感知

传统知识图谱有一个被忽视的盲点:它记录的是"事实",但不记录"事实的时间维度"。当 Agent 说"A 公司的 CEO 是张三",这个陈述在 2023 年可能是对的,但到了 2025 年可能已经过时。

Graphiti 是 Zep 团队开发的时序知识图谱框架,专门为 AI Agent 的记忆系统设计。它的核心创新是双时态模型(Bi-temporal Model):每条边(关系)同时记录两个时间维度:

  • 有效时间(Valid Time, t_valid):事实在现实世界中生效的时间
  • 系统时间(Transaction Time, t_transaction):事实被系统录入的时间

这种双时态设计使得 Agent 不仅能回答"现在是什么情况",还能回答"过去是什么情况"以及"什么时候发生了变化"。

6.2 Episodic vs Semantic Memory

Graphiti 的图谱架构包含三个层次的子图:

Episode 子图(Episodic Memory):记录 Agent 与用户的每一次交互,保留原始对话上下文。每个 Episode 是一个离散的信息单元——一段对话、一条消息、一个事件。Episode 之间按时间顺序链接,形成一条时间线。

Semantic Entity 子图(Semantic Memory):从 Episode 中提取出的实体和关系,经过去重和合并后构成的结构化知识。这是 Agent 的"长期记忆"——它知道"用户喜欢 Python"、"项目使用 Neo4j"这些持久性事实。

Community 子图:通过社区检测算法自动发现的实体群组,每个社区有自己的摘要。用于支持全局性查询。

三者的关系是:Episode 是原始数据,Semantic Entity 是从中提炼的知识,Community 是知识的聚合视图。

"""
Graphiti 时序图谱示例:Agent 记忆管理
"""
from graphiti_core import Graphiti
from graphiti_core.nodes import EpisodeType
from datetime import datetime, timezone


async def demo_graphiti_memory():
    """演示 Graphiti 的时序记忆能力"""

    # 初始化 Graphiti(连接 Neo4j)
    graphiti = Graphiti(
        "bolt://localhost:7687",
        "neo4j",
        "password",
    )
    await graphiti.build_indices_and_constraints()

    # ========================================
    # 阶段 1: 添加 Episodic Memory(对话记忆)
    # ========================================

    episodes = [
        {
            "content": "用户说:我们的项目决定使用 Neo4j 作为图数据库,"
                       "主要是因为它的 Cypher 查询语言更直观。",
            "timestamp": datetime(2025, 6, 1, tzinfo=timezone.utc),
        },
        {
            "content": "用户说:我们今天把数据库从 Neo4j 迁移到了 "
                       "NebulaGraph,因为 Neo4j 的社区版不支持集群。",
            "timestamp": datetime(2025, 9, 15, tzinfo=timezone.utc),
        },
        {
            "content": "用户说:评估了 NebulaGraph 三个月后,我们发现"
                       "它的生态不够成熟,决定回到 Neo4j Enterprise。",
            "timestamp": datetime(2025, 12, 20, tzinfo=timezone.utc),
        },
    ]

    for ep in episodes:
        await graphiti.add_episode(
            name=f"conversation_{ep['timestamp'].strftime('%Y%m%d')}",
            episode_body=ep["content"],
            source=EpisodeType.message,
            source_description="用户与 Agent 的项目讨论",
            reference_time=ep["timestamp"],
        )

    # ========================================
    # 阶段 2: 时序查询 -- Agent 的时间感知推理
    # ========================================

    # 查询当前状态
    current_results = await graphiti.search(
        query="项目目前使用什么图数据库?",
        num_results=5,
    )
    print("当前状态查询:")
    for r in current_results:
        print(f"  - {r.fact} (valid: {r.valid_at})")
    # 预期输出: 项目使用 Neo4j Enterprise
    #          (valid: 2025-12-20,因为时序关系标记了最新状态)

    # 查询历史状态
    # Graphiti 的边带有 t_valid 和 t_invalid,
    # 可以精确回溯任意时间点的知识状态
    print("\n时序变更链:")
    print("  2025-06-01: 项目 -> USES -> Neo4j (Community)")
    print("  2025-09-15: 项目 -> USES -> Neo4j [INVALIDATED]")
    print("  2025-09-15: 项目 -> USES -> NebulaGraph")
    print("  2025-12-20: 项目 -> USES -> NebulaGraph [INVALIDATED]")
    print("  2025-12-20: 项目 -> USES -> Neo4j (Enterprise)")

    # ========================================
    # 阶段 3: 混合检索(Graphiti 的核心优势)
    # ========================================

    # Graphiti 的 search 结合了三种检索方式:
    # 1. 语义搜索 (embedding similarity)
    # 2. 关键词搜索 (BM25)
    # 3. 图遍历 (traversal from relevant nodes)
    #
    # P95 延迟 ~300ms(Zep 论文数据)

    hybrid_results = await graphiti.search(
        query="为什么项目在图数据库选型上反复变化?",
        num_results=10,
    )
    # Graphiti 会返回完整的时序变更链,
    # 让 Agent 能够理解决策的演变过程

    # ========================================
    # 阶段 4: 社区摘要(全局视图)
    # ========================================

    # Graphiti 自动进行社区检测和摘要生成
    # 当 Agent 需要回答"项目的技术栈全景"这类全局问题时,
    # 可以利用社区摘要而不是遍历所有实体

    await graphiti.close()

6.3 Graphiti 在 Agent 记忆中的定位

Graphiti 解决了 Agent 记忆系统的一个根本问题:如何在不丢失历史的前提下更新知识。传统的 vector store 记忆是 append-only 的,新旧信息混杂在一起,Agent 无法分辨哪个是最新的。Graphiti 通过边的失效机制(Edge Invalidation)优雅地处理了这个问题——旧关系不会被删除,而是被标记为 invalidated,新关系同时被创建。

在 Zep 的 DMR(Deep Memory Retrieval)基准测试中,基于 Graphiti 的记忆系统达到了 94.8% 的准确率,比 MemGPT 的 93.4% 高出 1.4 个百分点,同时响应延迟降低了 90%。这个性能数据表明,图结构的记忆在准确性和速度上都优于线性的对话历史。

6.4 Graphiti MCP Server

Graphiti 已经提供了官方的 MCP Server,可以直接集成到 Claude、Cursor 等 MCP 客户端中:

{
  "mcpServers": {
    "graphiti-memory": {
      "command": "graphiti-mcp-server",
      "args": ["--transport", "stdio"],
      "env": {
        "NEO4J_URI": "bolt://localhost:7687",
        "NEO4J_USER": "neo4j",
        "NEO4J_PASSWORD": "${NEO4J_PASSWORD}",
        "OPENAI_API_KEY": "${OPENAI_API_KEY}",
        "MODEL_NAME": "gpt-4o-mini"
      }
    }
  }
}

通过 MCP 集成,任何 Agent 都可以获得时序感知的图谱记忆能力,而不需要直接操作 Graphiti 的 API。这是"协议驱动的可组合性"的一个典型案例。


七、生产考量

7.1 Graph Permission Control(图权限控制)

在企业环境中,知识图谱可能包含不同密级的信息。Agent 的检索必须受到权限控制。

节点级别的访问控制(Node-level ACL):每个实体节点附带一个 access_level 属性,Agent 在查询时必须携带用户的权限标识,只返回该用户有权访问的节点。

def secure_query(cypher: str, user_permissions: dict) -> list:
    """带权限控制的图查询"""
    # 在所有查询中注入权限过滤
    access_levels = user_permissions.get("access_levels", ["public"])
    security_filter = (
        " AND n.access_level IN $access_levels"
    )

    # 将安全过滤器注入到 WHERE 子句中
    if "WHERE" in cypher.upper():
        cypher = cypher.replace("WHERE", f"WHERE n.access_level IN $access_levels AND")
    else:
        # 在 RETURN 之前插入 WHERE
        return_idx = cypher.upper().find("RETURN")
        cypher = (
            cypher[:return_idx]
            + f"WHERE n.access_level IN $access_levels\n"
            + cypher[return_idx:]
        )

    return execute_cypher(
        cypher, {"access_levels": access_levels}
    )

子图隔离(Subgraph Isolation):使用 Neo4j 的多租户能力或命名空间机制,将不同部门的知识图谱物理隔离。Agent 只能访问被授权的子图。

审计日志(Audit Trail):记录 Agent 的每一次图谱访问,包括查询语句、返回结果的实体 ID、访问时间。这对于合规审计至关重要。

7.2 Query Cost Budgeting(查询成本预算)

Agent 的图遍历如果不加控制,可能触发代价高昂的全图扫描。需要在多个层面设置成本预算。

Token 预算:每次 Agent 循环有一个总 token 预算。Orchestrator 跟踪累计消耗的 token 数,当接近预算上限时,强制 Agent 停止检索并基于现有信息生成回答。

查询复杂度限制:在 MCP Tool 层面限制 Cypher 查询的复杂度——例如,可变长度路径的最大深度不超过 5,LIMIT 子句必须存在且不超过 100,禁止 MATCH (n) RETURN n 这类全图扫描。

时间预算:每个查询设置超时。图遍历如果在 10 秒内没有返回结果,自动终止并返回部分结果。

class CostBudget:
    """Agent 查询成本预算管理"""

    def __init__(
        self,
        max_tokens: int = 50000,
        max_queries: int = 20,
        max_wall_time_seconds: int = 120,
    ):
        self.max_tokens = max_tokens
        self.max_queries = max_queries
        self.max_wall_time = max_wall_time_seconds
        self.tokens_used = 0
        self.queries_executed = 0
        self.start_time = None

    def start(self):
        self.start_time = datetime.now()

    def record_query(self, token_count: int):
        self.tokens_used += token_count
        self.queries_executed += 1

    def can_continue(self) -> tuple[bool, str]:
        if self.tokens_used >= self.max_tokens:
            return False, f"Token budget exhausted: {self.tokens_used}/{self.max_tokens}"
        if self.queries_executed >= self.max_queries:
            return False, f"Query limit reached: {self.queries_executed}/{self.max_queries}"
        elapsed = (datetime.now() - self.start_time).total_seconds()
        if elapsed >= self.max_wall_time:
            return False, f"Time budget exhausted: {elapsed:.0f}s/{self.max_wall_time}s"
        return True, "OK"

    def remaining_budget(self) -> dict:
        elapsed = (datetime.now() - self.start_time).total_seconds()
        return {
            "tokens_remaining": self.max_tokens - self.tokens_used,
            "queries_remaining": self.max_queries - self.queries_executed,
            "time_remaining_seconds": max(0, self.max_wall_time - elapsed),
        }

7.3 Agent Loop Termination(Agent 循环终止)

这是 Agentic RAG 最棘手的工程问题之一。Agent 可能陷入无限循环——反复检索、反复判断信息不足、反复追问。

硬性终止条件

条件 阈值 行为
迭代次数 max_iterations (默认 5) 强制退出,基于已有信息回答
Token 消耗 max_tokens (默认 50000) 强制退出
墙钟时间 max_wall_time (默认 120s) 强制退出
连续无新信息 2 轮 强制退出

柔性终止信号

  • 信息增量递减:如果最新一轮检索带来的新实体数量不到上一轮的 30%,发出"信息饱和"信号。
  • 置信度阈值:Reasoner Agent 在每一轮输出一个 0-1 的置信度分数,当置信度超过 0.8 时建议终止。
  • 循环检测:如果 Agent 连续两次生成相似的查询(余弦相似度 > 0.9),说明陷入了循环,强制终止。
def detect_loop(
    recent_queries: list[str], threshold: float = 0.9
) -> bool:
    """检测 Agent 是否陷入查询循环"""
    if len(recent_queries) < 2:
        return False
    # 比较最近两次查询的相似度
    from sentence_transformers import SentenceTransformer
    model = SentenceTransformer("all-MiniLM-L6-v2")
    embeddings = model.encode(recent_queries[-2:])
    similarity = float(
        embeddings[0] @ embeddings[1]
        / (
            (embeddings[0] @ embeddings[0]) ** 0.5
            * (embeddings[1] @ embeddings[1]) ** 0.5
        )
    )
    return similarity > threshold

7.4 Observability(可观测性)

在 Agent 与知识图谱的交互过程中,可观测性要求比传统 RAG 高得多,因为推理路径不是线性的——Agent 可能在图谱中进行了复杂的遍历,而这个过程对于调试和优化至关重要。

Trace Agent 的图遍历路径:记录 Agent 在图谱中访问的每一个节点和关系,形成一条"探索轨迹"。这不仅用于调试,还可以用于发现图谱中的"热点区域"(频繁被访问的实体)和"冷区"(从未被检索到的实体)。

from dataclasses import dataclass, field
from datetime import datetime
import json


@dataclass
class GraphTraversalSpan:
    """单个图遍历操作的追踪记录"""
    span_id: str
    operation: str  # search / traverse / query / path_find
    cypher: str
    parameters: dict
    result_count: int
    entities_visited: list[str]
    relations_traversed: list[str]
    duration_ms: float
    token_cost: int
    timestamp: datetime = field(default_factory=datetime.now)


@dataclass
class AgentTrace:
    """Agent 完整推理链的追踪"""
    trace_id: str
    query: str
    spans: list[GraphTraversalSpan] = field(default_factory=list)
    total_iterations: int = 0
    total_tokens: int = 0
    total_entities_visited: int = 0
    termination_reason: str = ""

    def add_span(self, span: GraphTraversalSpan):
        self.spans.append(span)
        self.total_tokens += span.token_cost
        self.total_entities_visited += len(span.entities_visited)

    def export_for_langsmith(self) -> dict:
        """导出为 LangSmith 兼容格式"""
        return {
            "trace_id": self.trace_id,
            "name": f"GraphRAG: {self.query[:50]}",
            "metadata": {
                "total_iterations": self.total_iterations,
                "total_tokens": self.total_tokens,
                "total_entities": self.total_entities_visited,
                "termination_reason": self.termination_reason,
            },
            "spans": [
                {
                    "name": s.operation,
                    "metadata": {
                        "cypher": s.cypher,
                        "result_count": s.result_count,
                        "entities": s.entities_visited,
                        "duration_ms": s.duration_ms,
                    },
                }
                for s in self.spans
            ],
        }

    def export_graph_heatmap(self) -> dict:
        """导出实体访问热力图数据"""
        entity_counts: dict[str, int] = {}
        for span in self.spans:
            for entity in span.entities_visited:
                entity_counts[entity] = entity_counts.get(entity, 0) + 1
        return dict(
            sorted(entity_counts.items(), key=lambda x: -x[1])
        )

与 LangSmith / OpenTelemetry 集成:将 Agent 的每一步决策、每一次工具调用、每一个图查询都作为一个 Span 记录到分布式追踪系统中。这样可以清晰地看到:用户查询 -> Planner 决策 -> Retriever 调用 -> Cypher 执行 -> Reasoner 推理 -> 最终回答 的完整链路。

图谱健康度监控:定期统计图谱的关键指标——节点数、关系数、孤立节点比例、平均路径长度、社区数量。如果 Agent 触发的 Knowledge Gap Detector 频繁报告某类缺失,说明图谱在该领域的覆盖度不足,需要主动补充数据。

7.5 扩展性与成本优化

分层缓存:高频查询的结果缓存在 Redis 中,缓存的粒度是 (entity_id, relation_type, depth) 的组合。缓存的 TTL 根据知识的时效性动态调整——变化频繁的实体(如"当前股价")TTL 短,稳定的事实(如"公司成立时间")TTL 长。

异步图更新:Knowledge Gap Detector 产生的更新任务不应阻塞检索流程。使用消息队列(如 Celery + Redis)异步执行图谱更新,更新完成后通知缓存失效。

模型分级:不是所有环节都需要最强的模型。Planner 和 Reasoner 需要强推理能力,使用 GPT-4o 或 Claude Opus;Retriever 的工具调用决策相对简单,可以使用 GPT-4o-mini 或 Claude Haiku;Cypher 生成可以使用经过 fine-tuning 的专用小模型。

模型分级策略:
  Planner Agent:     Premium (GPT-4o / Claude Opus)    -- 需要强推理
  Reasoner Agent:    Premium (GPT-4o / Claude Opus)    -- 需要综合判断
  Retriever Agent:   Balanced (GPT-4o-mini / Haiku)    -- 工具调用为主
  Gap Detector:      Balanced (GPT-4o-mini / Haiku)    -- 模式匹配为主
  Cypher Generator:  Fast (Fine-tuned small model)     -- 固定模式翻译

八、展望:从工具到基础设施

8.1 知识图谱作为 Agent 的操作系统

如果说 LLM 是 Agent 的"大脑",那么知识图谱正在成为 Agent 的"文件系统"。Agent 读取知识图谱就像程序读取文件系统——通过结构化的路径(关系链)访问特定的信息(实体和属性),而不是在一堆无结构的文本中盲目搜索。

这个类比还可以进一步延伸:MCP 之于知识图谱,就像 POSIX 之于文件系统——提供了一套标准化的接口,使得上层应用(Agent)不需要关心底层存储的具体实现。今天用 Neo4j,明天换成 NebulaGraph,只要 MCP Server 的接口不变,Agent 的代码一行不用改。

8.2 多 Agent 图谱协作的未来

当前的多 Agent 系统主要是"一个 Orchestrator + 多个 Worker"的层级结构。未来的方向是 Peer-to-Peer 的 Agent 图谱协作——多个 Agent 各自维护自己领域的子图,通过标准协议(如 A2A, Agent-to-Agent Protocol)交换知识碎片,形成一个去中心化的、不断生长的全局知识图谱。

想象这样一个场景:法务 Agent 在其法规知识图谱中发现了一条新法规,财务 Agent 在其合规知识图谱中发现了一条违规风险,两者通过知识交换协议自动建立了"新法规 -> IMPACTS -> 违规风险"的跨域关系——这是纯粹的人类组织流程需要数周才能完成的工作。

8.3 关键挑战

尽管前景令人兴奋,但多智能体 GraphRAG 仍面临若干关键挑战:

  • 幻觉传播:如果 Agent 基于 LLM 的推理在知识图谱中写入了错误的关系,这个错误会通过图遍历传播到后续的检索结果中。需要设计"知识验证 Agent"来周期性地审查图谱中的新增内容。
  • Schema 演化:当知识图谱的 schema 发生变化(新增节点类型、重命名关系类型),所有依赖旧 schema 的 Agent 都需要更新。MCP 可以缓解这个问题,但不能完全消除。
  • 成本可预测性:Agent 的迭代式检索使得查询成本难以预测。同一个问题在不同的图谱状态下可能触发完全不同数量的查询。需要更精细的成本预估模型。
  • 评测标准:如何评估 Agentic GraphRAG 的质量?不仅要评估最终回答的准确性,还要评估探索路径的效率、知识缺口检测的召回率、以及图谱更新的正确率。

结语

多智能体 GraphRAG 不是一个单一的技术,而是三个领域交汇产生的新范式:Agent 系统的自主推理能力、知识图谱的结构化语义表示、以及 MCP 等标准协议带来的可组合性。它的本质是让 AI 从"被动回答问题"进化到"主动探索知识"——这与人类研究者的工作方式是一致的。

在实际落地中,建议从简单开始:先用 Neo4j MCP Server 给现有的 Agent 添加图谱检索能力,再逐步引入多 Agent 协作、时序感知(Graphiti)、知识缺口检测。每一层都有独立的价值,不需要一次性构建完整的系统。

知识图谱的价值在于关系,Agent 的价值在于推理。当两者结合,AI 系统获得的不只是更好的检索,而是一种全新的"理解"能力——理解实体之间的因果、理解知识随时间的演变、理解跨领域概念之间的隐藏联系。这是向着真正的知识智能迈出的关键一步。


参考资源


Maurice | maurice_wen@proton.me