知识图谱与 Agent 系统的深度集成

为什么 Agent 需要知识图谱

大语言模型驱动的 Agent 系统面临三个根本性挑战:幻觉(生成不存在的事实)、遗忘(上下文窗口有限)、推理断裂(无法做多跳逻辑推理)。知识图谱为 Agent 提供了结构化的外部记忆和推理基础设施,是 Agent 从"对话玩具"走向"可靠工具"的关键基础设施。


Agent + KG 架构全景

┌─────────────────────────────────────────────────────────────┐
│                    Agent + KG 集成架构                        │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  ┌───────────────────────────────────────────────┐          │
│  │                 Agent Core                     │          │
│  │                                               │          │
│  │  Planning ──→ Action ──→ Observe ──→ Reflect  │          │
│  │     │            │          │          │       │          │
│  └─────┼────────────┼──────────┼──────────┼──────┘          │
│        │            │          │          │                  │
│   ┌────▼────┐  ┌────▼────┐ ┌──▼───┐ ┌───▼──────┐          │
│   │ KG 查询 │  │ KG 写入 │ │ 工具 │ │ KG 反思  │          │
│   │ (检索)  │  │ (学习)  │ │ 调用 │ │ (更新)   │          │
│   └────┬────┘  └────┬────┘ └──────┘ └───┬──────┘          │
│        │            │                    │                  │
│   ┌────▼────────────▼────────────────────▼──────┐          │
│   │              知识图谱层                       │          │
│   │                                              │          │
│   │  ┌──────────┐  ┌──────────┐  ┌──────────┐   │          │
│   │  │ 世界知识  │  │ 对话记忆  │  │ 任务知识  │   │          │
│   │  │ World KG │  │ Memory KG│  │ Task KG  │   │          │
│   │  └──────────┘  └──────────┘  └──────────┘   │          │
│   └──────────────────────────────────────────────┘          │
└─────────────────────────────────────────────────────────────┘

三类知识图谱

图谱类型 内容 更新频率 生命周期
世界知识图谱(World KG) 领域事实、实体关系 低频(天/周) 长期
对话记忆图谱(Memory KG) 用户偏好、历史交互、事实提取 每轮对话 会话/跨会话
任务知识图谱(Task KG) 当前任务的计划、进度、约束 每步执行 单次任务

KG 作为 Agent 长期记忆

传统记忆 vs 图谱记忆

传统记忆(向量存储):
  "用户喜欢Python" → embedding → 向量库 → 语义检索
  问题:只能做语义匹配,不能做关系推理

图谱记忆:
  (用户A)-[:偏好]->(Python)
  (Python)-[:属于]->(编程语言)
  (用户A)-[:最近项目]->(Web开发)
  (Web开发)-[:常用]->(Django)
  → 可推理:用户可能需要 Django 相关帮助

记忆图谱实现

from datetime import datetime
from neo4j import GraphDatabase

class AgentMemoryKG:
    """基于知识图谱的 Agent 长期记忆"""

    def __init__(self, uri: str, auth: tuple):
        self.driver = GraphDatabase.driver(uri, auth=auth)

    def store_fact(self, subject: str, predicate: str, obj: str,
                   session_id: str, confidence: float = 0.9):
        """存储从对话中提取的事实"""
        with self.driver.session() as session:
            session.run(
                """
                MERGE (s:MemoryEntity {name: $subject})
                MERGE (o:MemoryEntity {name: $object})
                MERGE (s)-[r:MEMORY_REL {type: $predicate}]->(o)
                SET r.confidence = $confidence,
                    r.session_id = $session_id,
                    r.created_at = datetime(),
                    r.access_count = coalesce(r.access_count, 0) + 1
                """,
                subject=subject,
                predicate=predicate,
                object=obj,
                confidence=confidence,
                session_id=session_id
            )

    def recall(self, query_entities: list[str], max_hops: int = 2,
               max_facts: int = 20) -> list[dict]:
        """基于实体召回相关记忆"""
        with self.driver.session() as session:
            result = session.run(
                """
                UNWIND $entities AS entity_name
                MATCH (e:MemoryEntity {name: entity_name})
                MATCH path = (e)-[*1..$hops]-(neighbor)
                UNWIND relationships(path) AS r
                WITH DISTINCT startNode(r) AS s, r, endNode(r) AS o
                RETURN s.name AS subject, r.type AS predicate, o.name AS object,
                       r.confidence AS confidence, r.created_at AS created_at
                ORDER BY r.confidence DESC, r.created_at DESC
                LIMIT $limit
                """,
                entities=query_entities,
                hops=max_hops,
                limit=max_facts
            )
            return result.data()

    def forget(self, decay_rate: float = 0.95, min_confidence: float = 0.1):
        """记忆衰减:降低长时间未访问的记忆置信度"""
        with self.driver.session() as session:
            session.run(
                """
                MATCH ()-[r:MEMORY_REL]->()
                WHERE r.created_at < datetime() - duration('P7D')
                  AND r.access_count < 3
                SET r.confidence = r.confidence * $decay_rate
                """,
                decay_rate=decay_rate
            )
            # 删除极低置信度的记忆
            session.run(
                """
                MATCH ()-[r:MEMORY_REL]->()
                WHERE r.confidence < $min_conf
                DELETE r
                """,
                min_conf=min_confidence
            )

    def extract_facts_from_conversation(
        self, messages: list[dict], session_id: str
    ) -> list[dict]:
        """从对话中自动提取事实并入图"""
        conversation_text = "\n".join(
            f"{m['role']}: {m['content']}" for m in messages
        )

        prompt = f"""
从以下对话中提取可持久化的事实知识。

对话:
{conversation_text}

提取规则:
- 只提取明确陈述的事实,不推测
- 关注用户偏好、技能、项目、关系等长期有效信息
- 忽略临时性/一次性的信息

输出JSON:
{{
  "facts": [
    {{"subject": "实体", "predicate": "关系", "object": "实体", "confidence": 0.9}}
  ]
}}
"""

        response = self.llm_client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[
                {"role": "system", "content": "你是事实提取专家。"},
                {"role": "user", "content": prompt}
            ],
            response_format={"type": "json_object"},
            temperature=0.0
        )
        facts = json.loads(response.choices[0].message.content).get("facts", [])

        # 入库
        for fact in facts:
            self.store_fact(
                subject=fact["subject"],
                predicate=fact["predicate"],
                obj=fact["object"],
                session_id=session_id,
                confidence=fact.get("confidence", 0.8)
            )

        return facts

KG 驱动的 Agent 推理

推理增强 Prompt

class KGReasoningAgent:
    """知识图谱增强推理的 Agent"""

    def __init__(self, llm_client, memory_kg: AgentMemoryKG, world_kg_session):
        self.llm = llm_client
        self.memory = memory_kg
        self.world_kg = world_kg_session

    def reason(self, user_query: str, chat_history: list[dict]) -> str:
        """KG 增强推理"""

        # 1. 从查询中提取实体
        entities = self._extract_entities(user_query)

        # 2. 从记忆图谱召回相关事实
        memory_facts = self.memory.recall(entities, max_hops=2, max_facts=15)

        # 3. 从世界知识图谱检索结构化知识
        world_facts = self._query_world_kg(entities)

        # 4. 构建增强 prompt
        memory_context = self._format_facts("用户相关记忆", memory_facts)
        world_context = self._format_facts("相关知识", world_facts)

        system_prompt = f"""你是一个配备知识图谱的智能助手。

以下是你从知识图谱中检索到的相关信息:

{memory_context}

{world_context}

推理规则:
1. 优先使用知识图谱中的事实回答问题
2. 如果图谱中有关系链,可以做多跳推理
3. 如果图谱信息与你的知识冲突,以图谱为准(图谱更新更及时)
4. 如果图谱中没有相关信息,可以使用你的通用知识,但要标注不确定性
"""

        response = self.llm.chat.completions.create(
            model="gpt-4o",
            messages=[
                {"role": "system", "content": system_prompt},
                *chat_history,
                {"role": "user", "content": user_query}
            ],
            temperature=0.3
        )

        return response.choices[0].message.content

    def _query_world_kg(self, entities: list[str]) -> list[dict]:
        """查询世界知识图谱"""
        all_facts = []
        for entity in entities:
            result = self.world_kg.run(
                """
                MATCH (e:Entity)
                WHERE e.name CONTAINS $entity OR $entity IN e.aliases
                MATCH (e)-[r]-(neighbor)
                RETURN e.name AS subject, type(r) AS predicate,
                       neighbor.name AS object, r.confidence AS confidence
                ORDER BY r.confidence DESC
                LIMIT 10
                """,
                entity=entity
            )
            all_facts.extend(result.data())
        return all_facts

    def _extract_entities(self, text: str) -> list[str]:
        """从文本中提取实体"""
        response = self.llm.chat.completions.create(
            model="gpt-4o-mini",
            messages=[
                {"role": "system", "content": "提取文本中的关键实体。输出JSON数组。"},
                {"role": "user", "content": f"文本:{text}\n输出: [\"实体1\", \"实体2\"]"}
            ],
            response_format={"type": "json_object"},
            temperature=0.0
        )
        result = json.loads(response.choices[0].message.content)
        return result.get("entities", result) if isinstance(result, dict) else result

    def _format_facts(self, title: str, facts: list[dict]) -> str:
        """格式化事实为自然语言"""
        if not facts:
            return f"[{title}: 无相关记录]"
        lines = [f"[{title}]"]
        for f in facts:
            conf = f.get("confidence", "N/A")
            lines.append(f"  - {f['subject']} --[{f['predicate']}]--> {f['object']} (置信度: {conf})")
        return "\n".join(lines)

KG 作为 Agent 规划基础

任务图谱(Task KG)

class TaskKG:
    """任务知识图谱:记录 Agent 的规划和执行状态"""

    def __init__(self, graph_session):
        self.session = graph_session

    def create_plan(self, task_id: str, goal: str, steps: list[dict]):
        """创建任务计划图"""
        # 创建任务节点
        self.session.run(
            """
            CREATE (t:Task {
              id: $task_id,
              goal: $goal,
              status: "planning",
              created_at: datetime()
            })
            """,
            task_id=task_id,
            goal=goal
        )

        # 创建步骤节点和顺序关系
        for i, step in enumerate(steps):
            self.session.run(
                """
                MATCH (t:Task {id: $task_id})
                CREATE (s:Step {
                  id: $step_id,
                  description: $desc,
                  tool: $tool,
                  status: "pending",
                  order: $order
                })
                CREATE (t)-[:HAS_STEP]->(s)
                """,
                task_id=task_id,
                step_id=f"{task_id}_step_{i}",
                desc=step["description"],
                tool=step.get("tool", "none"),
                order=i
            )

            # 步骤间的依赖关系
            if i > 0:
                self.session.run(
                    """
                    MATCH (prev:Step {id: $prev_id})
                    MATCH (curr:Step {id: $curr_id})
                    CREATE (curr)-[:DEPENDS_ON]->(prev)
                    """,
                    prev_id=f"{task_id}_step_{i-1}",
                    curr_id=f"{task_id}_step_{i}"
                )

    def update_step_status(self, step_id: str, status: str,
                           result: str = None, error: str = None):
        """更新步骤执行状态"""
        self.session.run(
            """
            MATCH (s:Step {id: $step_id})
            SET s.status = $status,
                s.result = $result,
                s.error = $error,
                s.completed_at = CASE WHEN $status IN ["completed", "failed"]
                                     THEN datetime() ELSE null END
            """,
            step_id=step_id,
            status=status,
            result=result,
            error=error
        )

    def get_next_step(self, task_id: str) -> dict | None:
        """获取下一个可执行的步骤"""
        result = self.session.run(
            """
            MATCH (t:Task {id: $task_id})-[:HAS_STEP]->(s:Step)
            WHERE s.status = "pending"
            AND NOT EXISTS {
              MATCH (s)-[:DEPENDS_ON]->(dep:Step)
              WHERE dep.status <> "completed"
            }
            RETURN s
            ORDER BY s.order ASC
            LIMIT 1
            """,
            task_id=task_id
        ).single()
        return dict(result["s"]) if result else None

    def get_task_summary(self, task_id: str) -> dict:
        """获取任务执行摘要"""
        result = self.session.run(
            """
            MATCH (t:Task {id: $task_id})-[:HAS_STEP]->(s:Step)
            WITH t,
                 count(s) AS total,
                 sum(CASE WHEN s.status = "completed" THEN 1 ELSE 0 END) AS completed,
                 sum(CASE WHEN s.status = "failed" THEN 1 ELSE 0 END) AS failed,
                 sum(CASE WHEN s.status = "pending" THEN 1 ELSE 0 END) AS pending
            RETURN t.goal AS goal, t.status AS status,
                   total, completed, failed, pending
            """,
            task_id=task_id
        ).single()
        return dict(result) if result else {}

反思与自我进化

基于 KG 的 Agent 反思

class ReflectiveAgent:
    """具备反思能力的 KG 增强 Agent"""

    def __init__(self, agent: KGReasoningAgent, task_kg: TaskKG, memory_kg: AgentMemoryKG):
        self.agent = agent
        self.task_kg = task_kg
        self.memory = memory_kg

    def reflect_on_failure(self, task_id: str, step_id: str, error: str):
        """失败时反思并记录经验"""
        # 1. 查询历史类似失败
        similar_failures = self.memory.recall(
            [error, step_id], max_hops=1, max_facts=5
        )

        # 2. 生成反思
        reflection_prompt = f"""
任务步骤失败,请分析原因并提出改进方案。

失败步骤: {step_id}
错误信息: {error}
历史类似失败: {json.dumps(similar_failures, ensure_ascii=False)}

输出JSON:
{{
  "root_cause": "根本原因",
  "lesson_learned": "经验教训",
  "alternative_approach": "替代方案",
  "prevention_rule": "预防规则"
}}
"""
        response = self.agent.llm.chat.completions.create(
            model="gpt-4o",
            messages=[{"role": "user", "content": reflection_prompt}],
            response_format={"type": "json_object"},
            temperature=0.0
        )
        reflection = json.loads(response.choices[0].message.content)

        # 3. 将经验存入记忆图谱
        self.memory.store_fact(
            subject=f"error:{error[:50]}",
            predicate="resolved_by",
            obj=reflection["alternative_approach"],
            session_id=task_id,
            confidence=0.85
        )
        self.memory.store_fact(
            subject=step_id,
            predicate="prevention_rule",
            obj=reflection["prevention_rule"],
            session_id=task_id,
            confidence=0.9
        )

        return reflection

    def evolve(self):
        """自我进化:整理记忆图谱,提炼通用规则"""
        with self.memory.driver.session() as session:
            # 查找高频出现的模式
            patterns = session.run(
                """
                MATCH (e1)-[r:MEMORY_REL]->(e2)
                WHERE r.access_count >= 5
                RETURN r.type AS pattern, count(*) AS frequency,
                       collect(e1.name + " -> " + e2.name)[..3] AS examples
                ORDER BY frequency DESC
                LIMIT 10
                """
            ).data()

            # 高频模式固化为规则
            for pattern in patterns:
                session.run(
                    """
                    MERGE (rule:Rule {pattern: $pattern})
                    SET rule.frequency = $freq,
                        rule.examples = $examples,
                        rule.solidified_at = datetime()
                    """,
                    pattern=pattern["pattern"],
                    freq=pattern["frequency"],
                    examples=pattern["examples"]
                )

            return patterns

多 Agent 协作的共享知识图谱

┌────────────────────────────────────────────────┐
│            共享知识图谱层                        │
│                                                │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐     │
│  │ 世界知识  │  │ 项目知识  │  │ 团队记忆  │     │
│  │ (只读)   │  │ (读写)   │  │ (读写)   │     │
│  └──────────┘  └──────────┘  └──────────┘     │
│       ▲              ▲              ▲          │
│       │              │              │          │
└───────┼──────────────┼──────────────┼──────────┘
        │              │              │
   ┌────┴────┐   ┌─────┴────┐  ┌─────┴────┐
   │Researcher│   │ Developer │  │ Reviewer  │
   │ Agent   │   │  Agent   │  │  Agent   │
   └─────────┘   └──────────┘  └──────────┘

并发写入冲突解决

class SharedKGManager:
    """多 Agent 共享图谱管理器"""

    def __init__(self, graph_session):
        self.session = graph_session

    def write_with_lock(self, agent_id: str, entity_name: str,
                        updates: dict) -> bool:
        """乐观锁写入"""
        try:
            result = self.session.run(
                """
                MATCH (e:Entity {name: $name})
                WHERE e.lock_agent IS NULL OR e.lock_agent = $agent_id
                SET e.lock_agent = $agent_id,
                    e.lock_time = datetime()
                WITH e
                SET e += $updates
                SET e.lock_agent = null
                RETURN e
                """,
                name=entity_name,
                agent_id=agent_id,
                updates=updates
            ).single()
            return result is not None
        except Exception:
            return False

    def merge_conflicting_facts(self, fact_a: dict, fact_b: dict) -> dict:
        """冲突事实合并策略"""
        # 策略:高置信度优先,相同置信度取最新
        if fact_a["confidence"] > fact_b["confidence"]:
            return fact_a
        elif fact_b["confidence"] > fact_a["confidence"]:
            return fact_b
        else:
            # 同等置信度,取最新
            return fact_a if fact_a["created_at"] > fact_b["created_at"] else fact_b

工程实践建议

Agent + KG 集成检查清单

检查项 说明 优先级
记忆入图延迟 事实提取到入库 < 500ms
召回准确率 相关记忆的 Precision@10 > 80%
推理正确率 基于图谱推理的答案准确率 > 90%
记忆衰减 过期记忆定期清理
冲突检测 新事实与已有事实矛盾时告警
图谱规模 控制记忆图谱的节点数(避免无限增长)
隐私保护 敏感信息不入图或加密存储

性能优化

# 批量召回优化:一次查询获取所有需要的记忆
def batch_recall(entities: list[str], session) -> dict:
    """批量召回,减少图谱查询次数"""
    result = session.run(
        """
        UNWIND $entities AS name
        MATCH (e:MemoryEntity {name: name})-[r]-(n)
        RETURN name AS query_entity,
               collect({
                 subject: startNode(r).name,
                 predicate: r.type,
                 object: endNode(r).name,
                 confidence: r.confidence
               })[..10] AS facts
        """,
        entities=entities
    )
    return {r["query_entity"]: r["facts"] for r in result}

总结

知识图谱与 Agent 系统集成的核心价值:

  1. 长期记忆:图谱记忆可跨会话持久化,解决 LLM 的上下文窗口限制
  2. 结构化推理:图谱的关系遍历支持多跳推理,弥补 LLM 的逻辑推理短板
  3. 幻觉抑制:以图谱事实为锚点,约束 LLM 的生成范围
  4. 自我进化:失败经验入图,下次遇到类似问题自动召回解决方案
  5. 多 Agent 协作:共享图谱是多 Agent 之间的知识总线

工程落地建议:从记忆图谱开始(价值最直接),逐步引入世界知识图谱和任务图谱。


Maurice | maurice_wen@proton.me