Microsoft GraphRAG 工程化实践:从论文到生产部署

本文从工程视角系统拆解 Microsoft Research 提出的 GraphRAG 架构,覆盖论文核心思想、四阶段 Pipeline、与传统 RAG 的量化对比、LangChain/LlamaIndex 实现、Neo4j + 向量数据库双后端生产架构、性能调优策略,以及企业级落地案例。目标读者:已有 RAG 经验、正在评估或实施 GraphRAG 的工程师与技术负责人。


一、GraphRAG 架构全景:为什么纯向量 RAG 不够

1.1 问题根源:向量 RAG 的天花板

传统 RAG(Retrieval-Augmented Generation)的核心流程是:文档切块 -> 向量化 -> 相似度检索 -> LLM 生成。这套流程在「局部事实查询」上表现出色,例如「某公司 2024 年 Q3 营收是多少?」这类问题,向量检索可以精准命中对应段落。

然而,当查询涉及「全局性/主题性问题」时,向量 RAG 的局限性暴露无遗。典型的失败场景包括:

  • 跨文档聚合查询:「本季度所有项目的主要风险是什么?」需要遍历数十份项目报告,提取风险条目后归纳总结。向量检索只能返回与「风险」语义最近的若干 chunk,无法覆盖全局。
  • 隐式关系推理:「A 公司的供应商中,哪些同时也是 B 公司的客户?」这类多跳关系查询,需要在实体之间做图遍历,而非简单的向量相似度匹配。
  • 主题归纳:「这批文档的核心主题是什么?」属于 Query-Focused Summarization(QFS)任务,需要对全局语料做抽象归纳,而非检索具体段落。

Microsoft Research 在论文 "From Local to Global: A Graph RAG Approach to Query-Focused Summarization" 中提出了 GraphRAG,核心洞察是:将非结构化文本转化为知识图谱,通过图结构上的社区检测和层级摘要,构建「全局语义索引」,使 LLM 能够回答纯向量检索无法触达的全局性问题

1.2 GraphRAG 的核心设计哲学

GraphRAG 并非要替代向量 RAG,而是在向量检索的基础上增加了一层「结构化语义层」。其设计哲学可以概括为三点:

第一,知识图谱作为中间表示。将非结构化文本中的实体(Entity)和关系(Relationship)提取出来,构建一张有向图。图中的节点是实体(人、组织、事件、概念等),边是实体间的语义关系。这张图不是手工构建的本体,而是由 LLM 自动抽取生成的。

第二,社区结构作为全局索引。在知识图谱上运行 Leiden 社区检测算法,将密切关联的实体聚类成「社区」(Community)。每个社区生成一份摘要报告,描述该社区涉及的核心实体、关系和主题。这些社区摘要就是全局语义索引的基础。

第三,双搜索策略。Local Search 用于回答「特定实体相关」的局部查询,通过图遍历找到目标实体及其邻域信息;Global Search 用于回答「全局性/主题性」的宏观查询,通过检索社区摘要来获取对整个语料库的全局理解。

graph TB
    subgraph "GraphRAG 架构全景"
        A[非结构化文档] --> B[文本切块 Text Units]
        B --> C[实体/关系抽取 Entity & Relationship Extraction]
        C --> D[知识图谱 Knowledge Graph]
        D --> E[社区检测 Leiden Algorithm]
        E --> F[层级社区 Hierarchical Communities]
        F --> G[社区摘要 Community Summaries]

        H[用户查询 User Query] --> I{查询路由 Query Router}
        I -->|局部查询| J[Local Search]
        I -->|全局查询| K[Global Search]

        J --> L[图遍历 + 实体上下文]
        K --> M[社区摘要检索 + Map-Reduce]

        L --> N[LLM 生成]
        M --> N
        N --> O[最终回答]
    end

    style D fill:#e1f5fe
    style F fill:#fff3e0
    style N fill:#e8f5e9

二、核心流程:四阶段 Pipeline 详解

GraphRAG 的索引(Indexing)过程是一条四阶段 Pipeline,每一阶段都有明确的输入输出和可调参数。

2.1 第一阶段:实体与关系抽取(Entity & Relationship Extraction)

输入:文本切块(Text Units),每个 chunk 通常 300-1200 tokens。

处理:使用 LLM 从每个 chunk 中提取命名实体和它们之间的关系。Microsoft 的实现采用了一种「多轮抽取」(Gleaning)策略:第一轮抽取后,LLM 会被提示「是否还有遗漏的实体?」,然后进行补充抽取。这个过程可以重复多轮,由 max_gleanings 参数控制。

输出:三元组列表 (subject, predicate, object) 以及每个实体和关系的描述文本。

关键 Prompt 结构

ENTITY_EXTRACTION_PROMPT = """
-Goal-
Given a text document that is potentially relevant to this activity and a list
of entity types, identify all entities of those types from the text and all
relationships among the identified entities.

-Steps-
1. Identify all entities. For each identified entity, extract the following:
- entity_name: Name of the entity, capitalized
- entity_type: One of the following types: [{entity_types}]
- entity_description: Comprehensive description of the entity's attributes
  and activities

2. From the entities identified in step 1, identify all pairs of (source_entity,
   target_entity) that are *clearly related* to each other.
For each pair of related entities, extract the following:
- source_entity: name of the source entity
- target_entity: name of the target entity
- relationship_description: explanation of why these entities are related
- relationship_strength: a numeric score (1-10) indicating strength

3. Return output in JSON format.

######################
-Examples-
######################
{examples}

######################
-Real Data-
######################
Entity_types: {entity_types}
Text: {input_text}
######################
Output:
"""

工程要点

  • entity_types 必须根据领域定制。默认的 [organization, person, geo, event] 适用于通用场景;合规文档应增加 [regulation, clause, penalty, deadline];技术文档应增加 [technology, api, protocol, component]
  • max_gleanings 设为 1 是性价比最高的选择。设为 0 会漏掉约 15-20% 的实体;设为 2 以上的边际收益递减,但 token 消耗线性增长。
  • 对大规模语料,建议使用性价比更高的模型(如 gpt-4o-mini)做抽取,用高端模型(如 gpt-4o)做后续摘要。

2.2 第二阶段:社区检测(Community Detection via Leiden Algorithm)

输入:由第一阶段构建的知识图谱(实体为节点、关系为边)。

处理:运行 Hierarchical Leiden Algorithm,递归地将图划分为不同粒度的社区。

Leiden 算法是 Louvain 算法的改进版本,核心优势在于它保证每个社区内部是连通的(Louvain 可能产生不连通的社区)。算法通过最大化「模块度」(Modularity)来确定最优的社区划分。

层级结构:Leiden 算法可以产生多层级的社区划分。Level 0 是最细粒度(小社区),Level 1 是中粒度,Level 2 是粗粒度(大社区)。不同层级的社区摘要服务于不同粒度的查询。

输出:每个节点的社区归属信息,以及社区的层级树结构。

import graspologic
import networkx as nx

def detect_communities(graph: nx.Graph, max_community_size: int = 10) -> dict:
    """
    使用 Hierarchical Leiden 算法进行社区检测。

    Args:
        graph: NetworkX 图对象
        max_community_size: 最大社区规模阈值,超过此值则递归细分

    Returns:
        node_to_community: 节点到社区 ID 的映射
    """
    # 使用 graspologic 库的 Leiden 实现
    community_mapping = graspologic.partition.hierarchical_leiden(
        graph,
        max_cluster_size=max_community_size,
        random_seed=42
    )

    # 构建层级社区结构
    levels = {}
    for node, community_path in community_mapping.items():
        for level, community_id in enumerate(community_path):
            if level not in levels:
                levels[level] = {}
            levels[level][node] = community_id

    return levels

关键参数

  • max_cluster_size:控制社区的最大规模。设置过大,社区摘要会过于泛化;设置过小,社区数量爆炸,LLM 摘要成本飙升。经验值在 10-30 之间。
  • resolution:Leiden 算法的分辨率参数。值越大,产生的社区越多、粒度越细;值越小,社区越少、粒度越粗。默认值 1.0 适用于大多数场景。

2.3 第三阶段:社区摘要生成(Community Summarization)

输入:每个社区内的实体列表、关系列表及其描述文本。

处理:将社区内的所有实体和关系描述拼接后,提示 LLM 生成一份结构化的社区报告。报告通常包含:

  1. Executive Summary:社区的核心主题概述(2-3 句话)
  2. Key Entities:社区中的核心实体及其角色
  3. Key Relationships:实体之间的重要关系
  4. Implications / Risks:该社区揭示的潜在含义或风险点

输出:社区摘要报告(Community Report),存储为结构化文本。

COMMUNITY_REPORT_PROMPT = """
You are an AI assistant that helps a human analyst to perform information
discovery. Given a community of entities and relationships, write a
comprehensive report of this community.

The report should include:
- A title that represents the community's key entities
- A summary (2-3 sentences) of the community's overall structure and significance
- Detailed findings about the key entities and their relationships
- Potential implications or risks identified in the community

# Community Data
Entities: {entities}
Relationships: {relationships}

# Report
"""

工程要点

  • 社区报告是 GraphRAG 最核心的产物。其质量直接决定 Global Search 的效果。
  • 对于大型社区(实体数 > 30),实体和关系描述的拼接可能超过 LLM 的上下文窗口。此时需要采用分批摘要 + 合并的策略。
  • 社区报告生成是索引阶段最大的 token 消耗项,通常占总索引成本的 40-60%。

2.4 第四阶段:双搜索策略(Dual Search: Local + Global)

flowchart LR
    subgraph "查询阶段 Query Time"
        Q[用户查询] --> R{查询分类}

        R -->|局部查询| LS[Local Search]
        R -->|全局查询| GS[Global Search]

        subgraph "Local Search 流程"
            LS --> LS1[向量检索相关实体]
            LS1 --> LS2[图遍历 K-hop 邻域]
            LS2 --> LS3[收集实体描述 + 关系 + 源文本]
            LS3 --> LS4[拼接上下文送入 LLM]
        end

        subgraph "Global Search 流程"
            GS --> GS1[检索相关社区摘要]
            GS1 --> GS2["Map: 每个摘要独立生成 partial answer"]
            GS2 --> GS3["Reduce: 合并所有 partial answers"]
            GS3 --> GS4[生成最终全局回答]
        end

        LS4 --> A[最终回答]
        GS4 --> A
    end

Local Search(局部搜索)

适用场景:回答与特定实体相关的查询,例如「公司 X 的主要竞争对手有哪些?」

流程:

  1. 将查询向量化,在实体 Embedding 中做相似度检索,找到最相关的 Top-K 实体
  2. 从这些实体出发,在知识图谱上做 K-hop 遍历,收集邻域内的实体和关系
  3. 拉取这些实体/关系对应的源文本(Text Units)
  4. 将实体描述、关系描述、源文本拼接为上下文,送入 LLM 生成回答
def local_search(query: str, graph, vector_index, llm, top_k: int = 10,
                 hops: int = 2) -> str:
    """
    Local Search:基于实体邻域的局部检索。
    """
    # Step 1: 向量检索相关实体
    query_embedding = embed(query)
    candidate_entities = vector_index.search(query_embedding, top_k=top_k)

    # Step 2: 图遍历收集邻域
    context_entities = set()
    context_relationships = []
    for entity in candidate_entities:
        neighbors = graph.k_hop_neighbors(entity.id, k=hops)
        context_entities.update(neighbors)
        for neighbor in neighbors:
            edges = graph.edges_between(entity.id, neighbor.id)
            context_relationships.extend(edges)

    # Step 3: 拼接上下文
    context = build_context(
        entities=context_entities,
        relationships=context_relationships,
        include_source_text=True,
        max_tokens=12000
    )

    # Step 4: LLM 生成
    response = llm.generate(
        system="You are a helpful assistant answering questions based on "
               "the provided knowledge graph context.",
        user=f"Context:\n{context}\n\nQuestion: {query}"
    )
    return response

Global Search(全局搜索)

适用场景:回答需要对整个语料库进行宏观理解的查询,例如「这些文档中反复出现的合规风险主题有哪些?」

流程(Map-Reduce 模式):

  1. 检索与查询相关的社区摘要(通常按 Level 选择粒度)
  2. Map 阶段:将每个社区摘要独立送入 LLM,生成 partial answer 和 relevance score
  3. Reduce 阶段:将所有 partial answers 按 relevance score 排序,拼接后送入 LLM 做最终归纳
def global_search(query: str, community_reports: list, llm,
                  community_level: int = 1) -> str:
    """
    Global Search:基于社区摘要的 Map-Reduce 全局检索。
    """
    # 过滤指定层级的社区摘要
    reports = [r for r in community_reports if r.level == community_level]

    # Map 阶段:每个社区独立生成 partial answer
    partial_answers = []
    for report in reports:
        result = llm.generate(
            system="Analyze the community report and generate key points "
                   "relevant to the query. Rate relevance 0-100.",
            user=f"Community Report:\n{report.content}\n\nQuery: {query}"
        )
        partial_answers.append({
            "answer": result.text,
            "score": result.relevance_score,
            "community_id": report.community_id
        })

    # Reduce 阶段:按相关性排序并合并
    partial_answers.sort(key=lambda x: x["score"], reverse=True)
    top_answers = partial_answers[:20]  # 取 Top-20

    combined_context = "\n\n".join([
        f"[Community {a['community_id']}] (Score: {a['score']})\n{a['answer']}"
        for a in top_answers
    ])

    # 最终归纳
    final_response = llm.generate(
        system="Synthesize the following partial answers into a comprehensive "
               "response. Cite community sources where appropriate.",
        user=f"Partial Answers:\n{combined_context}\n\nOriginal Query: {query}"
    )
    return final_response

关键设计决策

  • community_level 的选择至关重要。Level 0(细粒度社区)适合回答相对具体的全局问题;Level 2(粗粒度社区)适合回答高度抽象的主题性问题。生产环境中通常需要多层级联合检索。
  • Map 阶段的并行度直接影响延迟。社区数量从几十到上千不等,Map 阶段应使用异步并发调用 LLM。
  • Reduce 阶段的 Top-K 选择需要平衡上下文窗口限制和覆盖率。取太少会丢信息,取太多会超出 LLM 上下文限制。

三、与传统 RAG 的量化对比

3.1 三种 RAG 范式对比

维度 Naive RAG Advanced RAG GraphRAG
检索策略 纯向量相似度(单路召回) 向量 + 关键词混合检索 + Reranker 向量 + 图遍历 + 社区摘要
支持的查询类型 局部事实查询(单跳) 局部事实 + 简单多跳 局部 + 全局 + 主题归纳 + 多跳推理
上下文质量 依赖 chunk 切分质量,易断裂 通过 Reranker 提升精度 结构化上下文(实体-关系-社区),语义连贯性高
幻觉率 较高(缺乏关系约束) 中等(Reranker 可过滤噪声) 较低(图结构提供事实约束)
索引成本 低(仅 Embedding) 中(Embedding + Reranker 训练) 高(LLM 抽取 + 社区检测 + 摘要生成)
查询延迟 低(单次向量检索) 中(检索 + Reranking) 中高(Local Search 中等,Global Search 较高)
增量更新 简单(追加 chunk) 简单(追加 + 重算索引) 复杂(需要重算图结构和社区)
可解释性 低(仅返回相似 chunk) 高(可追溯实体、关系、社区来源)
适用场景 文档 QA、客服问答 精度要求较高的文档检索 跨文档分析、主题归纳、合规审计、情报分析

3.2 Benchmark 数据参考

基于 2025 年多项评测研究(包括 FalkorDB 的 KG-LM Benchmark 和 arXiv:2502.11371),以下是关键性能数据:

查询准确率对比(以 FalkorDB KG-LM Benchmark 为参考):

查询复杂度 Vector RAG 准确率 GraphRAG 准确率 GraphRAG 提升倍数
单实体查询(1 entity) 72% 85% 1.2x
多实体查询(3-5 entities) 38% 79% 2.1x
复杂关系查询(5+ entities) 0-12% 71% 6x+
Schema-bound 查询(KPI/预测) 0% 65% N/A
全局主题归纳 15-25% 72% 3-5x

关键发现

  • GraphRAG 在多实体查询和关系推理上有绝对优势。当查询涉及 5 个以上实体时,纯向量 RAG 的准确率下降到接近随机水平,而 GraphRAG 仍能维持 70% 以上。
  • 在简单的单跳事实查询上,GraphRAG 与 Advanced RAG 的差距并不大,甚至在某些基准测试上 Advanced RAG 更优(因为图索引可能引入抽取噪声)。
  • GraphRAG 的索引成本显著高于向量 RAG。对 100 万字文档,GraphRAG 的索引成本约为向量 RAG 的 10-50 倍(主要是 LLM 抽取和摘要的 API 调用费用)。

3.3 什么时候不应该用 GraphRAG

GraphRAG 并非万能方案。以下场景应谨慎评估:

  • 文档量极小(< 10 篇):图结构过于稀疏,社区检测无法产生有意义的聚类。此时简单的向量 RAG 足够。
  • 查询模式单一(纯事实查找):如果 95% 的查询都是「某条款的具体规定是什么?」这种单跳查询,GraphRAG 的额外复杂度不划算。
  • 实时性要求极高:Global Search 的 Map-Reduce 模式需要多次 LLM 调用,延迟通常在 5-30 秒之间,不适合需要毫秒级响应的场景。
  • 预算有限:GraphRAG 的索引和查询成本都显著高于向量 RAG。对于 token 预算敏感的场景,应做 ROI 分析后再决定。

四、工程化实现

4.1 使用 Microsoft GraphRAG 官方框架

Microsoft 的开源框架 graphrag 是最直接的实现路径。以下是基于官方框架的完整工程流程。

安装与初始化

# 安装
pip install graphrag

# 初始化项目
graphrag init --root ./my-graphrag-project

# 目录结构
# my-graphrag-project/
# ├── settings.yaml          # 核心配置
# ├── .env                   # API Key(gitignored)
# ├── input/                 # 源文档目录
# │   ├── doc1.txt
# │   └── doc2.txt
# └── output/                # 索引输出目录

核心配置 settings.yaml

# settings.yaml - GraphRAG 核心配置

# LLM 配置
models:
  default_chat_model:
    type: openai_chat
    model: gpt-4o-mini          # 抽取用小模型降低成本
    api_key: ${GRAPHRAG_API_KEY}
    max_tokens: 4096
    temperature: 0

  default_embedding_model:
    type: openai_embedding
    model: text-embedding-3-small
    api_key: ${GRAPHRAG_API_KEY}

# 文本切块配置
chunks:
  size: 1200                    # chunk 大小(tokens)
  overlap: 100                  # chunk 重叠

# 实体抽取配置
entity_extraction:
  entity_types:
    - organization
    - person
    - technology
    - regulation              # 合规场景定制
    - event
    - location
  max_gleanings: 1            # 补充抽取轮次

# 社区检测配置
community_detection:
  algorithm: leiden
  max_cluster_size: 10        # 最大社区规模
  seed: 42

# 社区摘要配置
summarize_descriptions:
  max_length: 500

# 搜索配置
local_search:
  max_tokens: 12000
  top_k_entities: 10
  top_k_relationships: 10

global_search:
  max_tokens: 12000
  map_max_tokens: 1000
  reduce_max_tokens: 2000
  community_level: 1          # 使用的社区层级

运行索引 Pipeline

# 自动调优 Prompt(根据语料领域生成定制 Prompt)
graphrag prompt-tune \
  --root ./my-graphrag-project \
  --language Chinese \
  --output ./my-graphrag-project/prompts

# 运行索引
graphrag index --root ./my-graphrag-project

# 查询(Local Search)
graphrag query \
  --root ./my-graphrag-project \
  --method local \
  --query "公司A的主要供应商有哪些?"

# 查询(Global Search)
graphrag query \
  --root ./my-graphrag-project \
  --method global \
  --query "这些文档中反复出现的合规风险主题是什么?"

4.2 使用 LlamaIndex 构建 GraphRAG Pipeline

LlamaIndex 提供了 PropertyGraph 抽象,可以更灵活地构建 GraphRAG Pipeline,特别是在需要与自有系统集成时。

"""
GraphRAG Pipeline 实现(基于 LlamaIndex PropertyGraph)
"""
from llama_index.core import (
    SimpleDirectoryReader,
    PropertyGraphIndex,
    Settings,
)
from llama_index.core.indices.property_graph import (
    SimpleLLMPathExtractor,
    ImplicitPathExtractor,
)
from llama_index.llms.openai import OpenAI
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.graph_stores.neo4j import Neo4jPropertyGraphStore

# ---- 配置 ----
Settings.llm = OpenAI(model="gpt-4o-mini", temperature=0)
Settings.embed_model = OpenAIEmbedding(model="text-embedding-3-small")
Settings.chunk_size = 1024
Settings.chunk_overlap = 20

# ---- 加载文档 ----
documents = SimpleDirectoryReader("./data/compliance_docs").load_data()

# ---- 配置 Graph Store(Neo4j 后端)----
graph_store = Neo4jPropertyGraphStore(
    username="neo4j",
    password="your-password",
    url="bolt://localhost:7687",
    database="graphrag",
)

# ---- 构建 PropertyGraph 索引 ----
# SimpleLLMPathExtractor: 使用 LLM 抽取实体和关系
# ImplicitPathExtractor: 抽取隐式关系(共现、引用等)
index = PropertyGraphIndex.from_documents(
    documents,
    property_graph_store=graph_store,
    kg_extractors=[
        SimpleLLMPathExtractor(
            llm=Settings.llm,
            max_paths_per_chunk=20,
            num_workers=4,
        ),
        ImplicitPathExtractor(),
    ],
    embed_kg_nodes=True,           # 为节点生成 Embedding
    show_progress=True,
)

# ---- 查询 ----
query_engine = index.as_query_engine(
    include_text=True,              # 包含源文本
    similarity_top_k=10,
    response_mode="tree_summarize", # 层级摘要模式
)

response = query_engine.query(
    "电子商务平台的用户数据合规要求有哪些关键条款?"
)
print(response)

自定义 GraphRAG 抽取器(带社区检测)

"""
自定义 GraphRAGExtractor 和 GraphRAGStore,
实现完整的 Community Detection + Summarization。
"""
import networkx as nx
from graspologic.partition import hierarchical_leiden
from llama_index.core.graph_stores import SimplePropertyGraphStore

class GraphRAGStore(SimplePropertyGraphStore):
    """
    扩展 SimplePropertyGraphStore,增加社区检测和摘要功能。
    """
    community_summary = {}
    max_cluster_size = 10

    def build_communities(self):
        """运行 Leiden 社区检测并生成摘要。"""
        # Step 1: 构建 NetworkX 图
        nx_graph = self._create_nx_graph()

        # Step 2: Leiden 社区检测
        community_mapping = hierarchical_leiden(
            nx_graph,
            max_cluster_size=self.max_cluster_size,
            random_seed=42
        )

        # Step 3: 收集每个社区的实体和关系
        communities = self._collect_community_info(community_mapping)

        # Step 4: 为每个社区生成摘要
        for community_id, info in communities.items():
            summary = self._generate_community_summary(
                entities=info["entities"],
                relationships=info["relationships"],
            )
            self.community_summary[community_id] = summary

    def _create_nx_graph(self) -> nx.Graph:
        """将 PropertyGraph 转换为 NetworkX 图。"""
        nx_graph = nx.Graph()
        triples = self.get_all_triples()
        for subj, rel, obj in triples:
            nx_graph.add_node(subj.name, description=subj.description)
            nx_graph.add_node(obj.name, description=obj.description)
            nx_graph.add_edge(
                subj.name, obj.name,
                relationship=rel.label,
                description=rel.description,
            )
        return nx_graph

    def _generate_community_summary(self, entities: list,
                                     relationships: list) -> str:
        """使用 LLM 生成社区摘要报告。"""
        entity_text = "\n".join([
            f"- {e.name} ({e.type}): {e.description}" for e in entities
        ])
        rel_text = "\n".join([
            f"- {r.source} --[{r.label}]--> {r.target}: {r.description}"
            for r in relationships
        ])

        prompt = f"""Analyze the following community of entities and relationships.
Generate a comprehensive summary report including:
1. A title for this community
2. An executive summary (2-3 sentences)
3. Key findings about entities and their relationships
4. Potential implications

Entities:
{entity_text}

Relationships:
{rel_text}

Report:"""

        response = Settings.llm.complete(prompt)
        return response.text

4.3 使用 LangChain 构建 GraphRAG

LangChain 提供了 LLMGraphTransformer 用于图构建,结合 Neo4j 的 Neo4jGraph 实现端到端 Pipeline。

"""
GraphRAG 实现(基于 LangChain + Neo4j)
"""
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_community.graphs import Neo4jGraph
from langchain_experimental.graph_transformers import LLMGraphTransformer
from langchain_community.vectorstores import Neo4jVector
from langchain_core.documents import Document
from langchain.text_splitter import RecursiveCharacterTextSplitter

# ---- 初始化组件 ----
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
graph = Neo4jGraph(
    url="bolt://localhost:7687",
    username="neo4j",
    password="your-password"
)

# ---- 文档加载与切块 ----
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=1200,
    chunk_overlap=100,
)
raw_docs = [...]  # 加载原始文档
chunks = text_splitter.split_documents(raw_docs)

# ---- 使用 LLM 构建知识图谱 ----
graph_transformer = LLMGraphTransformer(
    llm=llm,
    allowed_nodes=[
        "Organization", "Person", "Technology",
        "Regulation", "Event", "Location"
    ],
    allowed_relationships=[
        "OWNS", "WORKS_FOR", "REGULATES", "USES",
        "COMPETES_WITH", "PARTNERS_WITH", "VIOLATES"
    ],
    node_properties=["description"],
    relationship_properties=["description", "strength"],
)

# 批量抽取并写入 Neo4j
for i in range(0, len(chunks), 10):
    batch = chunks[i:i+10]
    graph_documents = graph_transformer.convert_to_graph_documents(batch)
    graph.add_graph_documents(
        graph_documents,
        baseEntityLabel=True,     # 添加 __Entity__ 基础标签
        include_source=True,      # 关联源文档
    )

# ---- 创建向量索引(混合检索)----
vector_index = Neo4jVector.from_existing_graph(
    embeddings,
    url="bolt://localhost:7687",
    username="neo4j",
    password="your-password",
    index_name="entity_embeddings",
    node_label="__Entity__",
    text_node_properties=["id", "description"],
    embedding_node_property="embedding",
)

# ---- 混合检索(向量 + 图遍历)----
def hybrid_retrieval(query: str, top_k: int = 5) -> str:
    """结合向量检索和图遍历的混合检索策略。"""
    # Step 1: 向量检索找到入口实体
    vector_results = vector_index.similarity_search(query, k=top_k)

    # Step 2: 从入口实体出发做图遍历
    entity_names = [doc.metadata.get("id", "") for doc in vector_results]

    # Cypher 查询:2-hop 邻域
    cypher_query = """
    MATCH (e:__Entity__)-[r]-(neighbor:__Entity__)
    WHERE e.id IN $entity_names
    OPTIONAL MATCH (neighbor)-[r2]-(second_hop:__Entity__)
    RETURN e.id AS source,
           type(r) AS relationship,
           neighbor.id AS target,
           neighbor.description AS target_desc,
           type(r2) AS rel2,
           second_hop.id AS second_target
    LIMIT 100
    """

    graph_context = graph.query(cypher_query, {"entity_names": entity_names})

    # Step 3: 拼接上下文
    context_parts = []
    for record in graph_context:
        context_parts.append(
            f"{record['source']} --[{record['relationship']}]--> "
            f"{record['target']}: {record['target_desc']}"
        )

    return "\n".join(context_parts)

五、生产部署架构

5.1 架构总览:Neo4j + 向量数据库双后端

生产环境中的 GraphRAG 系统需要两个核心存储后端:

  • 图数据库(Neo4j):存储知识图谱(实体、关系、社区结构、社区摘要)
  • 向量数据库(Weaviate / Qdrant / Milvus):存储实体 Embedding 和原始 chunk Embedding,支持高性能向量检索
graph TB
    subgraph "数据摄入层 Ingestion Layer"
        S1[文档源 Document Sources] --> S2[文档加载器 Loader]
        S2 --> S3[文本切块 Chunking]
        S3 --> S4[LLM 实体/关系抽取]
        S4 --> S5[实体消解 Entity Resolution]
        S5 --> S6[Leiden 社区检测]
        S6 --> S7[社区摘要生成]
    end

    subgraph "存储层 Storage Layer"
        S5 --> DB1[(Neo4j Graph DB)]
        S7 --> DB1
        S3 --> DB2[(Vector DB: Qdrant)]
        S5 -->|Entity Embeddings| DB2
    end

    subgraph "查询层 Query Layer"
        Q[API Gateway] --> QR[Query Router]
        QR -->|Local| LS[Local Search Engine]
        QR -->|Global| GS[Global Search Engine]
        QR -->|Hybrid| HS[Hybrid Search Engine]

        LS --> DB1
        LS --> DB2
        GS --> DB1
        HS --> DB1
        HS --> DB2
    end

    subgraph "生成层 Generation Layer"
        LS --> LLM[LLM Service]
        GS --> LLM
        HS --> LLM
        LLM --> R[Response + Sources]
    end

    subgraph "运维层 Operations"
        MON[监控 Prometheus + Grafana]
        CACHE[缓存 Redis]
        QUEUE[任务队列 Celery / RQ]

        S4 -.-> QUEUE
        LS -.-> CACHE
        GS -.-> CACHE
    end

    style DB1 fill:#d4edda
    style DB2 fill:#cce5ff
    style LLM fill:#fff3cd

5.2 组件选型与部署

Neo4j 部署

# docker-compose.yml - Neo4j 集群部署
version: '3.8'
services:
  neo4j:
    image: neo4j:5.26-enterprise
    ports:
      - "7474:7474"   # HTTP
      - "7687:7687"   # Bolt
    environment:
      NEO4J_AUTH: neo4j/your-secure-password
      NEO4J_ACCEPT_LICENSE_AGREEMENT: "yes"
      NEO4J_PLUGINS: '["apoc", "graph-data-science"]'
      NEO4J_server_memory_heap_initial__size: 2g
      NEO4J_server_memory_heap_max__size: 4g
      NEO4J_server_memory_pagecache_size: 2g
      # 全文索引支持
      NEO4J_dbms_security_procedures_unrestricted: "apoc.*,gds.*"
    volumes:
      - neo4j_data:/data
      - neo4j_logs:/logs

  qdrant:
    image: qdrant/qdrant:v1.12.0
    ports:
      - "6333:6333"   # REST API
      - "6334:6334"   # gRPC
    volumes:
      - qdrant_data:/qdrant/storage
    environment:
      QDRANT__SERVICE__GRPC_PORT: 6334

volumes:
  neo4j_data:
  neo4j_logs:
  qdrant_data:

Neo4j Schema 设计

// ---- 创建约束和索引 ----

// 实体唯一性约束
CREATE CONSTRAINT entity_id IF NOT EXISTS
FOR (e:__Entity__) REQUIRE e.id IS UNIQUE;

// 社区唯一性约束
CREATE CONSTRAINT community_id IF NOT EXISTS
FOR (c:__Community__) REQUIRE c.id IS UNIQUE;

// 全文索引(用于模糊检索)
CREATE FULLTEXT INDEX entity_fulltext IF NOT EXISTS
FOR (e:__Entity__) ON EACH [e.id, e.description];

// 向量索引(Neo4j 5.x 原生向量索引)
CREATE VECTOR INDEX entity_vector IF NOT EXISTS
FOR (e:__Entity__) ON (e.embedding)
OPTIONS {
  indexConfig: {
    `vector.dimensions`: 1536,
    `vector.similarity_function`: 'cosine'
  }
};

// ---- 数据模型示例 ----

// 实体节点
CREATE (e:__Entity__:Organization {
  id: "microsoft_corp",
  name: "Microsoft Corporation",
  description: "全球领先的科技公司,总部位于华盛顿州雷德蒙德",
  entity_type: "Organization",
  source_ids: ["chunk_001", "chunk_042"],
  embedding: $embedding_vector,
  created_at: datetime(),
  updated_at: datetime()
})

// 关系
MATCH (a:__Entity__ {id: "microsoft_corp"})
MATCH (b:__Entity__ {id: "graphrag_project"})
CREATE (a)-[:DEVELOPS {
  description: "Microsoft Research 开发了 GraphRAG 项目",
  strength: 9,
  source_ids: ["chunk_001"]
}]->(b)

// 社区节点
CREATE (c:__Community__ {
  id: "community_001",
  level: 1,
  title: "Microsoft AI Research Ecosystem",
  summary: "该社区围绕 Microsoft 的 AI 研究生态展开...",
  member_count: 15,
  created_at: datetime()
})

// 实体-社区归属关系
MATCH (e:__Entity__ {id: "microsoft_corp"})
MATCH (c:__Community__ {id: "community_001"})
CREATE (e)-[:BELONGS_TO]->(c)

5.3 索引 Pipeline 生产化

生产环境的索引 Pipeline 需要解决以下问题:容错与重试、进度追踪、成本控制、增量更新。

"""
生产级索引 Pipeline(带容错、进度追踪和增量更新)
"""
import hashlib
import json
import logging
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import Optional

from celery import Celery
from redis import Redis

logger = logging.getLogger(__name__)

app = Celery('graphrag_indexer', broker='redis://localhost:6379/0')
redis_client = Redis(host='localhost', port=6379, db=1)

@dataclass
class IndexingJob:
    job_id: str
    document_path: str
    status: str           # pending | extracting | detecting | summarizing | done | failed
    total_chunks: int
    processed_chunks: int
    error: Optional[str] = None
    started_at: Optional[datetime] = None
    completed_at: Optional[datetime] = None


def compute_document_hash(filepath: str) -> str:
    """计算文档内容 hash,用于增量更新判断。"""
    with open(filepath, 'rb') as f:
        return hashlib.sha256(f.read()).hexdigest()


class IncrementalIndexer:
    """
    增量索引器:只处理新增或变更的文档。
    """

    def __init__(self, graph_store, vector_store, llm):
        self.graph_store = graph_store
        self.vector_store = vector_store
        self.llm = llm
        self.hash_registry = {}  # doc_path -> content_hash

    def should_reindex(self, doc_path: str) -> bool:
        """判断文档是否需要重新索引。"""
        current_hash = compute_document_hash(doc_path)
        stored_hash = self.hash_registry.get(doc_path)

        if stored_hash is None:
            # 新文档
            logger.info(f"New document detected: {doc_path}")
            self.hash_registry[doc_path] = current_hash
            return True
        elif stored_hash != current_hash:
            # 文档内容变更
            logger.info(f"Document changed: {doc_path}")
            self.hash_registry[doc_path] = current_hash
            return True
        else:
            logger.info(f"Document unchanged, skipping: {doc_path}")
            return False

    def reindex_document(self, doc_path: str):
        """重新索引单个文档(删旧 + 建新)。"""
        doc_id = Path(doc_path).stem

        # Step 1: 删除旧数据
        self.graph_store.delete_entities_by_source(doc_id)
        self.vector_store.delete_by_metadata({"source_doc": doc_id})

        # Step 2: 重新抽取和索引
        chunks = self._chunk_document(doc_path)
        entities, relationships = self._extract_entities(chunks)

        # Step 3: 写入图数据库
        self.graph_store.upsert_entities(entities)
        self.graph_store.upsert_relationships(relationships)

        # Step 4: 写入向量数据库
        embeddings = self._generate_embeddings(entities)
        self.vector_store.upsert(embeddings)

        return len(entities), len(relationships)

    def rebuild_communities(self):
        """
        重建社区结构。
        NOTE: 增量更新文档后,社区结构需要全局重算。
        这是 GraphRAG 增量更新最大的瓶颈。
        """
        # 从图数据库导出全量图
        full_graph = self.graph_store.export_networkx()

        # 重新运行 Leiden
        communities = hierarchical_leiden(
            full_graph,
            max_cluster_size=10,
            random_seed=42
        )

        # 更新社区归属
        self.graph_store.update_communities(communities)

        # 重新生成社区摘要
        for community_id, members in communities.items():
            summary = self._generate_community_summary(community_id, members)
            self.graph_store.update_community_summary(community_id, summary)

    def _chunk_document(self, doc_path: str) -> list:
        """文档切块。"""
        # 实现省略
        pass

    def _extract_entities(self, chunks: list) -> tuple:
        """LLM 实体抽取(带重试)。"""
        # 实现省略
        pass

    def _generate_embeddings(self, entities: list) -> list:
        """生成实体 Embedding。"""
        # 实现省略
        pass

    def _generate_community_summary(self, community_id, members) -> str:
        """生成社区摘要。"""
        # 实现省略
        pass


# ---- Celery 任务定义 ----

@app.task(bind=True, max_retries=3, default_retry_delay=60)
def index_document_task(self, doc_path: str, job_id: str):
    """异步索引单个文档(带重试)。"""
    try:
        indexer = IncrementalIndexer(
            graph_store=get_graph_store(),
            vector_store=get_vector_store(),
            llm=get_llm(),
        )

        # 更新状态
        redis_client.hset(f"job:{job_id}", "status", "extracting")

        if indexer.should_reindex(doc_path):
            n_entities, n_rels = indexer.reindex_document(doc_path)
            redis_client.hset(f"job:{job_id}", mapping={
                "status": "done",
                "entities": n_entities,
                "relationships": n_rels,
            })
        else:
            redis_client.hset(f"job:{job_id}", "status", "skipped")

    except Exception as exc:
        redis_client.hset(f"job:{job_id}", mapping={
            "status": "failed",
            "error": str(exc),
        })
        raise self.retry(exc=exc)


@app.task
def rebuild_communities_task():
    """异步重建社区(在批量文档索引完成后触发)。"""
    indexer = IncrementalIndexer(
        graph_store=get_graph_store(),
        vector_store=get_vector_store(),
        llm=get_llm(),
    )
    indexer.rebuild_communities()

5.4 增量更新策略

GraphRAG 的增量更新是生产化中最复杂的环节。核心挑战在于:文档级的增量变更会影响全局的图结构和社区划分

三种策略对比

策略 实现复杂度 一致性 延迟 适用场景
全量重建 高(小时级) 文档量 < 1 万,更新频率 < 每天
局部重建 中(分钟级) 文档量 1-10 万,更新频率 < 每小时
增量融合 低(秒级) 文档量 > 10 万,需要准实时

推荐的混合策略

  1. 文档级增量:新增/变更的文档单独做实体抽取,直接 Upsert 到图数据库
  2. 社区延迟重算:累积一定数量的增量变更后(例如每小时或每 100 个文档),批量触发社区重新检测和摘要生成
  3. 定期全量校准:每周或每月做一次全量重建,消除增量漂移
class IncrementalUpdatePolicy:
    """增量更新策略管理。"""

    def __init__(self,
                 batch_threshold: int = 100,
                 time_threshold_hours: int = 1):
        self.pending_changes = 0
        self.last_community_rebuild = datetime.now()
        self.batch_threshold = batch_threshold
        self.time_threshold_hours = time_threshold_hours

    def on_document_indexed(self):
        """文档索引完成后的回调。"""
        self.pending_changes += 1

        if self.should_rebuild_communities():
            rebuild_communities_task.delay()
            self.pending_changes = 0
            self.last_community_rebuild = datetime.now()

    def should_rebuild_communities(self) -> bool:
        """判断是否需要重建社区。"""
        # 条件 1: 累积变更超过阈值
        if self.pending_changes >= self.batch_threshold:
            return True

        # 条件 2: 距离上次重建超过时间阈值
        hours_since_last = (
            datetime.now() - self.last_community_rebuild
        ).total_seconds() / 3600

        if hours_since_last >= self.time_threshold_hours and self.pending_changes > 0:
            return True

        return False

六、性能调优

6.1 Community Resolution 调优

社区分辨率(Resolution)是影响 GraphRAG 效果最关键的超参数之一。它决定了 Leiden 算法的社区粒度。

调优策略

def tune_community_resolution(graph: nx.Graph,
                               test_queries: list,
                               resolutions: list = [0.5, 1.0, 1.5, 2.0, 3.0]):
    """
    通过 Grid Search 寻找最优社区分辨率。

    评估指标:
    - 社区数量与文档量的比值(避免过多/过少)
    - 社区内聚度(Modularity Score)
    - 下游查询的 Answer Relevance Score
    """
    results = []

    for res in resolutions:
        # 运行 Leiden
        communities = hierarchical_leiden(
            graph,
            max_cluster_size=10,
            resolution=res,
            random_seed=42
        )

        n_communities = len(set(communities.values()))
        n_nodes = graph.number_of_nodes()
        ratio = n_communities / n_nodes

        # 计算模块度
        modularity = nx.community.modularity(
            graph,
            _to_partition(communities)
        )

        # 评估下游查询质量(需要实际运行查询)
        avg_relevance = evaluate_queries(communities, test_queries)

        results.append({
            "resolution": res,
            "n_communities": n_communities,
            "ratio": ratio,
            "modularity": modularity,
            "avg_relevance": avg_relevance,
        })

        logger.info(
            f"Resolution={res}: "
            f"communities={n_communities}, "
            f"ratio={ratio:.3f}, "
            f"modularity={modularity:.3f}, "
            f"relevance={avg_relevance:.3f}"
        )

    return results

经验法则

  • 社区数量与文档数的比值在 0.1-0.3 之间通常效果最好
  • 模块度(Modularity)> 0.3 表明社区划分质量较好
  • 如果大量节点被归入单一巨型社区,说明 Resolution 过低
  • 如果几乎每个节点自成一个社区,说明 Resolution 过高

6.2 Chunk Size 与图密度的权衡

Chunk Size 对 GraphRAG 的影响比对向量 RAG 更显著,因为它直接决定了实体抽取的质量和图的密度。

实验观察

Chunk Size (tokens) 平均实体数/chunk 平均关系数/chunk 图密度 抽取质量 LLM 成本
300 2-3 1-2 稀疏 高(聚焦) 高(chunk 多)
600 4-6 3-5 中等 中高
1200 8-12 6-10 密集 中(可能漏抽) 低(chunk 少)
2400 15-20 12-18 过密 低(信息过载) 最低

推荐配置

  • 通用场景:chunk_size=1200, overlap=100。这是 Microsoft 官方的默认配置,在抽取质量和成本之间取得了最佳平衡。
  • 高精度场景(合规审计、法律文档):chunk_size=600, overlap=50, max_gleanings=2。更细的切块确保不遗漏关键实体,多轮抽取提升召回率。
  • 大规模语料(> 100 万字):chunk_size=1200, overlap=100, max_gleanings=0。牺牲部分抽取召回率换取成本控制。

6.3 缓存策略

GraphRAG 的查询链条较长(向量检索 -> 图遍历 -> LLM 生成),合理的缓存策略可以大幅降低延迟和成本。

"""
三级缓存策略:Query Cache -> Context Cache -> LLM Response Cache
"""
import hashlib
import json
from functools import lru_cache
from redis import Redis

redis = Redis(host='localhost', port=6379, db=2)

class GraphRAGCache:
    """GraphRAG 三级缓存管理。"""

    # ---- Level 1: Query Embedding 缓存 ----
    # 相同查询的 Embedding 不需要重复计算

    @staticmethod
    def get_query_embedding(query: str):
        cache_key = f"emb:{hashlib.md5(query.encode()).hexdigest()}"
        cached = redis.get(cache_key)
        if cached:
            return json.loads(cached)
        return None

    @staticmethod
    def set_query_embedding(query: str, embedding: list, ttl: int = 86400):
        cache_key = f"emb:{hashlib.md5(query.encode()).hexdigest()}"
        redis.setex(cache_key, ttl, json.dumps(embedding))

    # ---- Level 2: Graph Context 缓存 ----
    # 相同入口实体集合的图遍历结果可以复用

    @staticmethod
    def get_graph_context(entity_ids: list):
        sorted_ids = sorted(entity_ids)
        cache_key = f"ctx:{hashlib.md5(str(sorted_ids).encode()).hexdigest()}"
        cached = redis.get(cache_key)
        if cached:
            return json.loads(cached)
        return None

    @staticmethod
    def set_graph_context(entity_ids: list, context: dict, ttl: int = 3600):
        sorted_ids = sorted(entity_ids)
        cache_key = f"ctx:{hashlib.md5(str(sorted_ids).encode()).hexdigest()}"
        redis.setex(cache_key, ttl, json.dumps(context))

    # ---- Level 3: 完整回答缓存 ----
    # 精确匹配的查询直接返回缓存结果

    @staticmethod
    def get_response(query: str, search_mode: str):
        cache_key = f"resp:{search_mode}:{hashlib.md5(query.encode()).hexdigest()}"
        cached = redis.get(cache_key)
        if cached:
            return json.loads(cached)
        return None

    @staticmethod
    def set_response(query: str, search_mode: str, response: dict,
                     ttl: int = 1800):
        cache_key = f"resp:{search_mode}:{hashlib.md5(query.encode()).hexdigest()}"
        redis.setex(cache_key, ttl, json.dumps(response))

    # ---- 缓存失效策略 ----

    @staticmethod
    def invalidate_on_graph_update():
        """图更新后失效相关缓存。"""
        # Level 2 和 Level 3 缓存失效
        # Level 1(Embedding)不受图更新影响
        for key in redis.scan_iter("ctx:*"):
            redis.delete(key)
        for key in redis.scan_iter("resp:*"):
            redis.delete(key)

缓存 TTL 建议

缓存层级 TTL 失效条件
Query Embedding 24 小时 Embedding 模型更换
Graph Context 1 小时 图数据更新
LLM Response 30 分钟 图数据更新 / LLM 模型更换

6.4 Global Search 延迟优化

Global Search 的 Map-Reduce 模式是延迟的主要瓶颈。以下是优化手段:

1. 异步并发 Map 阶段

import asyncio
from openai import AsyncOpenAI

async def parallel_map_search(community_reports: list, query: str,
                               max_concurrency: int = 20) -> list:
    """并发执行 Map 阶段,显著降低延迟。"""
    client = AsyncOpenAI()
    semaphore = asyncio.Semaphore(max_concurrency)

    async def process_one(report):
        async with semaphore:
            response = await client.chat.completions.create(
                model="gpt-4o-mini",
                messages=[
                    {"role": "system", "content": MAP_SYSTEM_PROMPT},
                    {"role": "user", "content": f"Report:\n{report.content}\n\nQuery: {query}"}
                ],
                temperature=0,
                max_tokens=500,
            )
            return {
                "community_id": report.id,
                "answer": response.choices[0].message.content,
            }

    tasks = [process_one(r) for r in community_reports]
    results = await asyncio.gather(*tasks, return_exceptions=True)

    # 过滤失败的请求
    valid_results = [r for r in results if not isinstance(r, Exception)]
    return valid_results

2. 社区预过滤

在 Map 阶段之前,先用 Embedding 相似度快速过滤掉明显无关的社区,减少 LLM 调用次数。

def pre_filter_communities(query: str, community_reports: list,
                           threshold: float = 0.3) -> list:
    """
    用 Embedding 余弦相似度预过滤社区,
    只对可能相关的社区执行 Map 阶段。
    """
    query_emb = embed(query)

    filtered = []
    for report in community_reports:
        similarity = cosine_similarity(query_emb, report.embedding)
        if similarity >= threshold:
            filtered.append(report)

    # 按相似度排序,取 Top-N
    filtered.sort(key=lambda r: cosine_similarity(query_emb, r.embedding),
                  reverse=True)
    return filtered[:50]  # 最多处理 50 个社区

3. 分级社区搜索

先搜索粗粒度社区(Level 2),如果结果不充分,再下沉到细粒度社区(Level 0/1)。


七、企业级落地案例

7.1 案例一:企业内部文档智能问答

业务场景:某大型制造企业有超过 5 万份内部技术文档(设计规范、工艺流程、质量标准、会议纪要等),工程师需要跨文档查找技术方案和历史决策。

痛点分析

  • 传统关键词搜索覆盖率不足,工程师平均需要 45 分钟找到所需信息
  • 向量 RAG 可以回答「某产品的热处理温度参数是什么?」但无法回答「过去三年我们在热处理工艺上做过哪些改进?改进的原因是什么?」
  • 技术知识散落在数百份文档中,新员工入职培训耗时长

GraphRAG 方案设计

文档摄入 Pipeline:
  ┌─ 技术文档 (.docx/.pdf) ─→ 文本提取 ─→ 切块 (chunk_size=800)
  ├─ 会议纪要 (.md/.txt)   ─→ 文本提取 ─→ 切块 (chunk_size=600)
  └─ 邮件归档 (.eml)       ─→ 文本提取 ─→ 切块 (chunk_size=400)
       │
       ▼
  实体类型定制:
    [Product, Component, Material, Process, Standard,
     Problem, Solution, Person, Department, Decision]
       │
       ▼
  知识图谱 (Neo4j) + 向量索引 (Qdrant)
       │
       ▼
  Leiden 社区检测 (resolution=1.5, max_cluster_size=15)
       │
       ▼
  社区摘要 (约 320 个社区)

查询示例与效果

查询类型 查询示例 使用的搜索模式 效果
局部事实 "产品 X 的 PCB 板材规格是什么?" Local Search 准确率 92%,响应 2.1s
多跳关系 "产品 X 用到的芯片供应商也为哪些竞品供货?" Local Search (3-hop) 准确率 78%,响应 3.5s
全局归纳 "过去两年我们的产品良率问题主要集中在哪些环节?" Global Search (Level 1) 覆盖率 85%,响应 8.2s
主题趋势 "公司的技术战略方向在最近5年发生了哪些变化?" Global Search (Level 2) 覆盖率 72%,响应 12.5s

关键指标

  • 工程师平均查找时间从 45 分钟降至 3 分钟
  • 索引总成本约 800 美元(5 万份文档,平均每份 3 页)
  • 月度查询成本约 200 美元(日均 500 次查询)
  • 增量更新延迟 < 30 分钟(新文档上传后可查询)

7.2 案例二:合规文档分析

业务场景:金融机构需要对监管政策文件、内部合规手册、审计报告等进行交叉分析,识别合规风险和政策变更的影响。

挑战

  • 监管政策文件之间存在复杂的引用和替代关系(新政策替代旧政策、政策之间相互引用)
  • 合规团队需要回答「新出的某某政策对我们现有的哪些业务流程有影响?」这类跨文档影响分析问题
  • 审计报告中的发现需要与对应的监管条款关联

实体类型定制

COMPLIANCE_ENTITY_TYPES = [
    "Regulation",        # 监管政策/法规
    "Clause",            # 具体条款
    "BusinessProcess",   # 业务流程
    "RiskItem",          # 风险项
    "ControlMeasure",    # 控制措施
    "Department",        # 责任部门
    "Deadline",          # 合规期限
    "Penalty",           # 处罚措施
    "AuditFinding",      # 审计发现
]

COMPLIANCE_RELATIONSHIPS = [
    "SUPERSEDES",        # 新政策替代旧政策
    "REFERENCES",        # 政策互相引用
    "REQUIRES",          # 政策要求某项控制措施
    "APPLIES_TO",        # 政策适用于某业务流程
    "MITIGATES",         # 控制措施缓解某风险
    "VIOLATES",          # 审计发现违反某条款
    "RESPONSIBLE_FOR",   # 部门负责某控制措施
    "HAS_DEADLINE",      # 合规要求有截止日期
]

查询应用场景

场景 A:政策变更影响分析(Global Search)

查询:「最新发布的《数据安全法实施条例》对我们的客户数据处理流程有哪些影响?需要调整哪些控制措施?」

GraphRAG 的处理流程:

  1. Global Search 检索包含「数据安全法」「客户数据」「控制措施」相关社区的摘要
  2. 在社区摘要中识别受影响的业务流程和控制措施
  3. 通过 Local Search 深入查找具体条款与现有控制措施的映射关系
  4. 生成影响分析报告,包含:受影响的业务流程清单、需要新增/修改的控制措施、合规整改时间要求

场景 B:审计发现溯源(Local Search)

查询:「审计报告中提到的'客户身份验证流程不合规'具体违反了哪些条款?同类问题在其他部门是否也存在?」

GraphRAG 的处理流程:

  1. Local Search 找到「客户身份验证流程不合规」这个 AuditFinding 实体
  2. 通过 VIOLATES 关系找到关联的 Clause 实体
  3. 通过 APPLIES_TO 关系找到同一 Clause 适用的其他 BusinessProcess
  4. 检查这些 BusinessProcess 是否也有关联的 AuditFinding(同类问题扫描)
# 审计发现溯源的 Cypher 查询
AUDIT_TRACE_QUERY = """
// Step 1: 找到审计发现及其违反的条款
MATCH (finding:AuditFinding {description: $finding_desc})
      -[:VIOLATES]->(clause:Clause)

// Step 2: 找到同一条款适用的其他业务流程
MATCH (clause)-[:APPLIES_TO]->(other_process:BusinessProcess)

// Step 3: 检查其他流程是否有类似问题
OPTIONAL MATCH (other_finding:AuditFinding)-[:VIOLATES]->(clause)
WHERE other_finding <> finding

RETURN clause.id AS violated_clause,
       clause.description AS clause_desc,
       collect(DISTINCT other_process.name) AS affected_processes,
       collect(DISTINCT other_finding.description) AS similar_findings
"""

系统效果

  • 政策变更影响分析时间从 2-3 个工作日降至 15 分钟
  • 审计发现溯源和同类问题扫描效率提升 10 倍
  • 合规团队每月可多处理 3 倍的政策变更评估任务
  • 每季度审计准备时间缩短 40%

八、踩坑总结与最佳实践

8.1 常见陷阱

陷阱一:实体消解(Entity Resolution)被低估

同一实体在不同文档中可能有多种称呼(如「微软」「Microsoft」「MSFT」「微软公司」),如果不做实体消解,知识图谱中会出现大量冗余节点,社区检测的质量也会下降。

建议方案:在实体抽取后增加一轮 LLM 实体消解,或使用 Embedding 相似度 + 规则(缩写展开、别名映射)做合并。

陷阱二:社区摘要的上下文窗口溢出

大型社区可能包含数百个实体和上千条关系,拼接后的上下文远超 LLM 的窗口限制。

建议方案:实现分批摘要策略。将社区成员分成若干子批次,每批次生成一个子摘要,最后再生成一个总摘要。

陷阱三:索引成本失控

GraphRAG 的索引成本主要来自 LLM API 调用。以 GPT-4o 为抽取模型、100 万字文档为例,索引成本可达 500-2000 美元。

建议方案:抽取阶段使用 gpt-4o-mini(成本降低约 90%);社区摘要阶段可选用 Claude Haiku 或 Gemini Flash;仅在查询阶段使用高端模型。

陷阱四:增量更新导致社区漂移

增量新增文档后,如果只做局部图更新而不重算社区,社区结构会逐渐偏离最优划分(社区漂移)。

建议方案:设置社区重算的触发阈值,并定期做全量校准。

8.2 生产清单

以下是 GraphRAG 系统上线前的检查清单:

GraphRAG 生产就绪检查清单:

-- 索引 Pipeline --
- [ ] entity_types 已根据领域定制并验证
- [ ] chunk_size 和 overlap 经过实验调优
- [ ] max_gleanings 设置合理(推荐 1)
- [ ] 实体消解已实现并验证
- [ ] 社区检测 resolution 经过调优
- [ ] 索引成本预估并在预算内
- [ ] 增量更新策略已设计并测试
- [ ] 索引 Pipeline 有容错和重试机制
- [ ] 索引进度可监控

-- 存储 --
- [ ] Neo4j 已配置资源限制(heap/pagecache)
- [ ] 向量数据库已配置索引(HNSW/IVF)
- [ ] 数据库备份策略已就绪
- [ ] Schema 约束和索引已创建

-- 查询 --
- [ ] Local Search 和 Global Search 均已测试
- [ ] 查询缓存策略已实现
- [ ] Global Search 的并发度已调优
- [ ] 查询超时保护已设置
- [ ] 查询结果的来源追溯(可解释性)已实现

-- 监控 --
- [ ] 索引 Pipeline 成功率和耗时监控
- [ ] 查询延迟和成功率监控
- [ ] LLM API 调用量和成本监控
- [ ] 图数据库资源使用监控
- [ ] 缓存命中率监控

-- 安全 --
- [ ] API Key 不暴露在配置文件中
- [ ] 查询结果不泄露敏感实体
- [ ] 访问控制(谁能查询哪些文档的图谱)已实现

九、未来演进方向

9.1 Agentic GraphRAG

2025-2026 年的趋势是将 GraphRAG 与 Agent 框架结合。Agent 不再只是被动地检索和生成,而是主动地规划检索路径、评估结果质量、动态切换搜索策略。

核心思路:

  • Agent 根据查询复杂度自动选择 Local Search、Global Search 或混合模式
  • 第一轮检索结果不充分时,Agent 自动调整检索参数(扩大 hop 数、切换社区层级)重新检索
  • Agent 可以向用户追问,收集更多上下文来优化检索

9.2 多模态 GraphRAG

知识图谱不再局限于文本。图片、表格、流程图中的实体和关系也可以被 Vision LLM 抽取并纳入图谱。这对于工程文档(含大量技术图纸)和财报(含大量表格)场景尤为重要。

9.3 Graph + Vector 混合索引的标准化

Neo4j 5.x 已原生支持向量索引,Weaviate 也在增加图遍历能力。未来的趋势是图数据库和向量数据库的边界逐渐模糊,GraphRAG 的存储层会进一步简化。


参考资料


Maurice | maurice_wen@proton.me