知识图谱与 Agent 系统的深度集成
原创
灵阙教研团队
S 精选 提升 |
约 11 分钟阅读
更新于 2026-02-28 AI 导读
知识图谱与 Agent 系统的深度集成 为什么 Agent 需要知识图谱 大语言模型驱动的 Agent 系统面临三个根本性挑战:幻觉(生成不存在的事实)、遗忘(上下文窗口有限)、推理断裂(无法做多跳逻辑推理)。知识图谱为 Agent 提供了结构化的外部记忆和推理基础设施,是 Agent 从"对话玩具"走向"可靠工具"的关键基础设施。 Agent + KG 架构全景...
知识图谱与 Agent 系统的深度集成
为什么 Agent 需要知识图谱
大语言模型驱动的 Agent 系统面临三个根本性挑战:幻觉(生成不存在的事实)、遗忘(上下文窗口有限)、推理断裂(无法做多跳逻辑推理)。知识图谱为 Agent 提供了结构化的外部记忆和推理基础设施,是 Agent 从"对话玩具"走向"可靠工具"的关键基础设施。
Agent + KG 架构全景
┌─────────────────────────────────────────────────────────────┐
│ Agent + KG 集成架构 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌───────────────────────────────────────────────┐ │
│ │ Agent Core │ │
│ │ │ │
│ │ Planning ──→ Action ──→ Observe ──→ Reflect │ │
│ │ │ │ │ │ │ │
│ └─────┼────────────┼──────────┼──────────┼──────┘ │
│ │ │ │ │ │
│ ┌────▼────┐ ┌────▼────┐ ┌──▼───┐ ┌───▼──────┐ │
│ │ KG 查询 │ │ KG 写入 │ │ 工具 │ │ KG 反思 │ │
│ │ (检索) │ │ (学习) │ │ 调用 │ │ (更新) │ │
│ └────┬────┘ └────┬────┘ └──────┘ └───┬──────┘ │
│ │ │ │ │
│ ┌────▼────────────▼────────────────────▼──────┐ │
│ │ 知识图谱层 │ │
│ │ │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ 世界知识 │ │ 对话记忆 │ │ 任务知识 │ │ │
│ │ │ World KG │ │ Memory KG│ │ Task KG │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ │ │
│ └──────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
三类知识图谱
| 图谱类型 | 内容 | 更新频率 | 生命周期 |
|---|---|---|---|
| 世界知识图谱(World KG) | 领域事实、实体关系 | 低频(天/周) | 长期 |
| 对话记忆图谱(Memory KG) | 用户偏好、历史交互、事实提取 | 每轮对话 | 会话/跨会话 |
| 任务知识图谱(Task KG) | 当前任务的计划、进度、约束 | 每步执行 | 单次任务 |
KG 作为 Agent 长期记忆
传统记忆 vs 图谱记忆
传统记忆(向量存储):
"用户喜欢Python" → embedding → 向量库 → 语义检索
问题:只能做语义匹配,不能做关系推理
图谱记忆:
(用户A)-[:偏好]->(Python)
(Python)-[:属于]->(编程语言)
(用户A)-[:最近项目]->(Web开发)
(Web开发)-[:常用]->(Django)
→ 可推理:用户可能需要 Django 相关帮助
记忆图谱实现
from datetime import datetime
from neo4j import GraphDatabase
class AgentMemoryKG:
"""基于知识图谱的 Agent 长期记忆"""
def __init__(self, uri: str, auth: tuple):
self.driver = GraphDatabase.driver(uri, auth=auth)
def store_fact(self, subject: str, predicate: str, obj: str,
session_id: str, confidence: float = 0.9):
"""存储从对话中提取的事实"""
with self.driver.session() as session:
session.run(
"""
MERGE (s:MemoryEntity {name: $subject})
MERGE (o:MemoryEntity {name: $object})
MERGE (s)-[r:MEMORY_REL {type: $predicate}]->(o)
SET r.confidence = $confidence,
r.session_id = $session_id,
r.created_at = datetime(),
r.access_count = coalesce(r.access_count, 0) + 1
""",
subject=subject,
predicate=predicate,
object=obj,
confidence=confidence,
session_id=session_id
)
def recall(self, query_entities: list[str], max_hops: int = 2,
max_facts: int = 20) -> list[dict]:
"""基于实体召回相关记忆"""
with self.driver.session() as session:
result = session.run(
"""
UNWIND $entities AS entity_name
MATCH (e:MemoryEntity {name: entity_name})
MATCH path = (e)-[*1..$hops]-(neighbor)
UNWIND relationships(path) AS r
WITH DISTINCT startNode(r) AS s, r, endNode(r) AS o
RETURN s.name AS subject, r.type AS predicate, o.name AS object,
r.confidence AS confidence, r.created_at AS created_at
ORDER BY r.confidence DESC, r.created_at DESC
LIMIT $limit
""",
entities=query_entities,
hops=max_hops,
limit=max_facts
)
return result.data()
def forget(self, decay_rate: float = 0.95, min_confidence: float = 0.1):
"""记忆衰减:降低长时间未访问的记忆置信度"""
with self.driver.session() as session:
session.run(
"""
MATCH ()-[r:MEMORY_REL]->()
WHERE r.created_at < datetime() - duration('P7D')
AND r.access_count < 3
SET r.confidence = r.confidence * $decay_rate
""",
decay_rate=decay_rate
)
# 删除极低置信度的记忆
session.run(
"""
MATCH ()-[r:MEMORY_REL]->()
WHERE r.confidence < $min_conf
DELETE r
""",
min_conf=min_confidence
)
def extract_facts_from_conversation(
self, messages: list[dict], session_id: str
) -> list[dict]:
"""从对话中自动提取事实并入图"""
conversation_text = "\n".join(
f"{m['role']}: {m['content']}" for m in messages
)
prompt = f"""
从以下对话中提取可持久化的事实知识。
对话:
{conversation_text}
提取规则:
- 只提取明确陈述的事实,不推测
- 关注用户偏好、技能、项目、关系等长期有效信息
- 忽略临时性/一次性的信息
输出JSON:
{{
"facts": [
{{"subject": "实体", "predicate": "关系", "object": "实体", "confidence": 0.9}}
]
}}
"""
response = self.llm_client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "你是事实提取专家。"},
{"role": "user", "content": prompt}
],
response_format={"type": "json_object"},
temperature=0.0
)
facts = json.loads(response.choices[0].message.content).get("facts", [])
# 入库
for fact in facts:
self.store_fact(
subject=fact["subject"],
predicate=fact["predicate"],
obj=fact["object"],
session_id=session_id,
confidence=fact.get("confidence", 0.8)
)
return facts
KG 驱动的 Agent 推理
推理增强 Prompt
class KGReasoningAgent:
"""知识图谱增强推理的 Agent"""
def __init__(self, llm_client, memory_kg: AgentMemoryKG, world_kg_session):
self.llm = llm_client
self.memory = memory_kg
self.world_kg = world_kg_session
def reason(self, user_query: str, chat_history: list[dict]) -> str:
"""KG 增强推理"""
# 1. 从查询中提取实体
entities = self._extract_entities(user_query)
# 2. 从记忆图谱召回相关事实
memory_facts = self.memory.recall(entities, max_hops=2, max_facts=15)
# 3. 从世界知识图谱检索结构化知识
world_facts = self._query_world_kg(entities)
# 4. 构建增强 prompt
memory_context = self._format_facts("用户相关记忆", memory_facts)
world_context = self._format_facts("相关知识", world_facts)
system_prompt = f"""你是一个配备知识图谱的智能助手。
以下是你从知识图谱中检索到的相关信息:
{memory_context}
{world_context}
推理规则:
1. 优先使用知识图谱中的事实回答问题
2. 如果图谱中有关系链,可以做多跳推理
3. 如果图谱信息与你的知识冲突,以图谱为准(图谱更新更及时)
4. 如果图谱中没有相关信息,可以使用你的通用知识,但要标注不确定性
"""
response = self.llm.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": system_prompt},
*chat_history,
{"role": "user", "content": user_query}
],
temperature=0.3
)
return response.choices[0].message.content
def _query_world_kg(self, entities: list[str]) -> list[dict]:
"""查询世界知识图谱"""
all_facts = []
for entity in entities:
result = self.world_kg.run(
"""
MATCH (e:Entity)
WHERE e.name CONTAINS $entity OR $entity IN e.aliases
MATCH (e)-[r]-(neighbor)
RETURN e.name AS subject, type(r) AS predicate,
neighbor.name AS object, r.confidence AS confidence
ORDER BY r.confidence DESC
LIMIT 10
""",
entity=entity
)
all_facts.extend(result.data())
return all_facts
def _extract_entities(self, text: str) -> list[str]:
"""从文本中提取实体"""
response = self.llm.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "提取文本中的关键实体。输出JSON数组。"},
{"role": "user", "content": f"文本:{text}\n输出: [\"实体1\", \"实体2\"]"}
],
response_format={"type": "json_object"},
temperature=0.0
)
result = json.loads(response.choices[0].message.content)
return result.get("entities", result) if isinstance(result, dict) else result
def _format_facts(self, title: str, facts: list[dict]) -> str:
"""格式化事实为自然语言"""
if not facts:
return f"[{title}: 无相关记录]"
lines = [f"[{title}]"]
for f in facts:
conf = f.get("confidence", "N/A")
lines.append(f" - {f['subject']} --[{f['predicate']}]--> {f['object']} (置信度: {conf})")
return "\n".join(lines)
KG 作为 Agent 规划基础
任务图谱(Task KG)
class TaskKG:
"""任务知识图谱:记录 Agent 的规划和执行状态"""
def __init__(self, graph_session):
self.session = graph_session
def create_plan(self, task_id: str, goal: str, steps: list[dict]):
"""创建任务计划图"""
# 创建任务节点
self.session.run(
"""
CREATE (t:Task {
id: $task_id,
goal: $goal,
status: "planning",
created_at: datetime()
})
""",
task_id=task_id,
goal=goal
)
# 创建步骤节点和顺序关系
for i, step in enumerate(steps):
self.session.run(
"""
MATCH (t:Task {id: $task_id})
CREATE (s:Step {
id: $step_id,
description: $desc,
tool: $tool,
status: "pending",
order: $order
})
CREATE (t)-[:HAS_STEP]->(s)
""",
task_id=task_id,
step_id=f"{task_id}_step_{i}",
desc=step["description"],
tool=step.get("tool", "none"),
order=i
)
# 步骤间的依赖关系
if i > 0:
self.session.run(
"""
MATCH (prev:Step {id: $prev_id})
MATCH (curr:Step {id: $curr_id})
CREATE (curr)-[:DEPENDS_ON]->(prev)
""",
prev_id=f"{task_id}_step_{i-1}",
curr_id=f"{task_id}_step_{i}"
)
def update_step_status(self, step_id: str, status: str,
result: str = None, error: str = None):
"""更新步骤执行状态"""
self.session.run(
"""
MATCH (s:Step {id: $step_id})
SET s.status = $status,
s.result = $result,
s.error = $error,
s.completed_at = CASE WHEN $status IN ["completed", "failed"]
THEN datetime() ELSE null END
""",
step_id=step_id,
status=status,
result=result,
error=error
)
def get_next_step(self, task_id: str) -> dict | None:
"""获取下一个可执行的步骤"""
result = self.session.run(
"""
MATCH (t:Task {id: $task_id})-[:HAS_STEP]->(s:Step)
WHERE s.status = "pending"
AND NOT EXISTS {
MATCH (s)-[:DEPENDS_ON]->(dep:Step)
WHERE dep.status <> "completed"
}
RETURN s
ORDER BY s.order ASC
LIMIT 1
""",
task_id=task_id
).single()
return dict(result["s"]) if result else None
def get_task_summary(self, task_id: str) -> dict:
"""获取任务执行摘要"""
result = self.session.run(
"""
MATCH (t:Task {id: $task_id})-[:HAS_STEP]->(s:Step)
WITH t,
count(s) AS total,
sum(CASE WHEN s.status = "completed" THEN 1 ELSE 0 END) AS completed,
sum(CASE WHEN s.status = "failed" THEN 1 ELSE 0 END) AS failed,
sum(CASE WHEN s.status = "pending" THEN 1 ELSE 0 END) AS pending
RETURN t.goal AS goal, t.status AS status,
total, completed, failed, pending
""",
task_id=task_id
).single()
return dict(result) if result else {}
反思与自我进化
基于 KG 的 Agent 反思
class ReflectiveAgent:
"""具备反思能力的 KG 增强 Agent"""
def __init__(self, agent: KGReasoningAgent, task_kg: TaskKG, memory_kg: AgentMemoryKG):
self.agent = agent
self.task_kg = task_kg
self.memory = memory_kg
def reflect_on_failure(self, task_id: str, step_id: str, error: str):
"""失败时反思并记录经验"""
# 1. 查询历史类似失败
similar_failures = self.memory.recall(
[error, step_id], max_hops=1, max_facts=5
)
# 2. 生成反思
reflection_prompt = f"""
任务步骤失败,请分析原因并提出改进方案。
失败步骤: {step_id}
错误信息: {error}
历史类似失败: {json.dumps(similar_failures, ensure_ascii=False)}
输出JSON:
{{
"root_cause": "根本原因",
"lesson_learned": "经验教训",
"alternative_approach": "替代方案",
"prevention_rule": "预防规则"
}}
"""
response = self.agent.llm.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": reflection_prompt}],
response_format={"type": "json_object"},
temperature=0.0
)
reflection = json.loads(response.choices[0].message.content)
# 3. 将经验存入记忆图谱
self.memory.store_fact(
subject=f"error:{error[:50]}",
predicate="resolved_by",
obj=reflection["alternative_approach"],
session_id=task_id,
confidence=0.85
)
self.memory.store_fact(
subject=step_id,
predicate="prevention_rule",
obj=reflection["prevention_rule"],
session_id=task_id,
confidence=0.9
)
return reflection
def evolve(self):
"""自我进化:整理记忆图谱,提炼通用规则"""
with self.memory.driver.session() as session:
# 查找高频出现的模式
patterns = session.run(
"""
MATCH (e1)-[r:MEMORY_REL]->(e2)
WHERE r.access_count >= 5
RETURN r.type AS pattern, count(*) AS frequency,
collect(e1.name + " -> " + e2.name)[..3] AS examples
ORDER BY frequency DESC
LIMIT 10
"""
).data()
# 高频模式固化为规则
for pattern in patterns:
session.run(
"""
MERGE (rule:Rule {pattern: $pattern})
SET rule.frequency = $freq,
rule.examples = $examples,
rule.solidified_at = datetime()
""",
pattern=pattern["pattern"],
freq=pattern["frequency"],
examples=pattern["examples"]
)
return patterns
多 Agent 协作的共享知识图谱
┌────────────────────────────────────────────────┐
│ 共享知识图谱层 │
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ 世界知识 │ │ 项目知识 │ │ 团队记忆 │ │
│ │ (只读) │ │ (读写) │ │ (读写) │ │
│ └──────────┘ └──────────┘ └──────────┘ │
│ ▲ ▲ ▲ │
│ │ │ │ │
└───────┼──────────────┼──────────────┼──────────┘
│ │ │
┌────┴────┐ ┌─────┴────┐ ┌─────┴────┐
│Researcher│ │ Developer │ │ Reviewer │
│ Agent │ │ Agent │ │ Agent │
└─────────┘ └──────────┘ └──────────┘
并发写入冲突解决
class SharedKGManager:
"""多 Agent 共享图谱管理器"""
def __init__(self, graph_session):
self.session = graph_session
def write_with_lock(self, agent_id: str, entity_name: str,
updates: dict) -> bool:
"""乐观锁写入"""
try:
result = self.session.run(
"""
MATCH (e:Entity {name: $name})
WHERE e.lock_agent IS NULL OR e.lock_agent = $agent_id
SET e.lock_agent = $agent_id,
e.lock_time = datetime()
WITH e
SET e += $updates
SET e.lock_agent = null
RETURN e
""",
name=entity_name,
agent_id=agent_id,
updates=updates
).single()
return result is not None
except Exception:
return False
def merge_conflicting_facts(self, fact_a: dict, fact_b: dict) -> dict:
"""冲突事实合并策略"""
# 策略:高置信度优先,相同置信度取最新
if fact_a["confidence"] > fact_b["confidence"]:
return fact_a
elif fact_b["confidence"] > fact_a["confidence"]:
return fact_b
else:
# 同等置信度,取最新
return fact_a if fact_a["created_at"] > fact_b["created_at"] else fact_b
工程实践建议
Agent + KG 集成检查清单
| 检查项 | 说明 | 优先级 |
|---|---|---|
| 记忆入图延迟 | 事实提取到入库 < 500ms | 高 |
| 召回准确率 | 相关记忆的 Precision@10 > 80% | 高 |
| 推理正确率 | 基于图谱推理的答案准确率 > 90% | 高 |
| 记忆衰减 | 过期记忆定期清理 | 中 |
| 冲突检测 | 新事实与已有事实矛盾时告警 | 中 |
| 图谱规模 | 控制记忆图谱的节点数(避免无限增长) | 中 |
| 隐私保护 | 敏感信息不入图或加密存储 | 高 |
性能优化
# 批量召回优化:一次查询获取所有需要的记忆
def batch_recall(entities: list[str], session) -> dict:
"""批量召回,减少图谱查询次数"""
result = session.run(
"""
UNWIND $entities AS name
MATCH (e:MemoryEntity {name: name})-[r]-(n)
RETURN name AS query_entity,
collect({
subject: startNode(r).name,
predicate: r.type,
object: endNode(r).name,
confidence: r.confidence
})[..10] AS facts
""",
entities=entities
)
return {r["query_entity"]: r["facts"] for r in result}
总结
知识图谱与 Agent 系统集成的核心价值:
- 长期记忆:图谱记忆可跨会话持久化,解决 LLM 的上下文窗口限制
- 结构化推理:图谱的关系遍历支持多跳推理,弥补 LLM 的逻辑推理短板
- 幻觉抑制:以图谱事实为锚点,约束 LLM 的生成范围
- 自我进化:失败经验入图,下次遇到类似问题自动召回解决方案
- 多 Agent 协作:共享图谱是多 Agent 之间的知识总线
工程落地建议:从记忆图谱开始(价值最直接),逐步引入世界知识图谱和任务图谱。
Maurice | maurice_wen@proton.me