Agent 记忆系统:从短期到长期
原创
灵阙教研团队
S 精选 进阶 |
约 13 分钟阅读
更新于 2026-02-28 AI 导读
Agent 记忆系统:从短期到长期 构建具有持续记忆能力的 AI Agent 系统 为什么 Agent 需要记忆 大语言模型的原生状态是"无记忆"的——每次请求都是一次全新的开始。Agent 记忆系统的目标是弥补这一缺陷,让 Agent 具备: 会话连续性:在同一任务的多轮交互中保持上下文 跨会话持久性:记住上一次对话的结论和决策 经验积累:从历史任务中学习,避免重复犯错...
Agent 记忆系统:从短期到长期
构建具有持续记忆能力的 AI Agent 系统
为什么 Agent 需要记忆
大语言模型的原生状态是"无记忆"的——每次请求都是一次全新的开始。Agent 记忆系统的目标是弥补这一缺陷,让 Agent 具备:
- 会话连续性:在同一任务的多轮交互中保持上下文
- 跨会话持久性:记住上一次对话的结论和决策
- 经验积累:从历史任务中学习,避免重复犯错
- 知识检索:从大规模文档库中精准提取相关信息
人类记忆的类比:
┌─────────────────────────────────────────────────────────┐
│ │
│ 感觉记忆 短期记忆 长期记忆 │
│ (Sensory) (Working) (Long-term) │
│ ────────── ────────── ────────── │
│ 即时输入 当前对话 跨会话持久 │
│ < 1s 7 +/- 2 项 无限容量 │
│ 自动丢弃 上下文窗口 需要编码与检索 │
│ │
│ 对应 Agent 实现: │
│ ────────────── │
│ 用户输入 Conversation Vector Store │
│ 工具返回值 Buffer Knowledge Graph │
│ 观察结果 Sliding Window Episodic Memory │
│ │
└─────────────────────────────────────────────────────────┘
记忆架构全景
┌──────────────────────────────────────────────────────────────┐
│ Agent Memory System │
├──────────────┬──────────────┬──────────────┬────────────────┤
│ Buffer │ Summary │ Vector │ Episodic │
│ Memory │ Memory │ Memory │ Memory │
├──────────────┼──────────────┼──────────────┼────────────────┤
│ 完整对话历史 │ 压缩后的摘要 │ 语义向量索引 │ 结构化事件记录 │
│ 最近 N 轮 │ 渐进式更新 │ 相似度检索 │ 时间线 + 因果 │
│ FIFO 淘汰 │ 信息密度高 │ 大规模知识库 │ 经验学习 │
├──────────────┼──────────────┼──────────────┼────────────────┤
│ 实现简单 │ Token 效率高 │ 检索精度高 │ 推理能力强 │
│ Token 消耗高 │ 细节丢失 │ 需要嵌入模型 │ 实现复杂 │
└──────────────┴──────────────┴──────────────┴────────────────┘
一、Buffer Memory(缓冲记忆)
最简单的记忆实现:把完整的对话历史作为上下文传递给模型。
基础实现
class BufferMemory:
"""完整对话历史缓冲"""
def __init__(self, max_messages: int = 50):
self.messages: list[dict] = []
self.max_messages = max_messages
def add(self, role: str, content: str):
self.messages.append({"role": role, "content": content})
# FIFO 淘汰:超过上限时移除最早的消息
if len(self.messages) > self.max_messages:
self.messages = self.messages[-self.max_messages:]
def get_context(self) -> list[dict]:
return self.messages.copy()
def token_count(self) -> int:
"""估算当前缓冲区的 Token 数量"""
total_chars = sum(len(m["content"]) for m in self.messages)
return total_chars // 4 # 粗略估算:4 字符约 1 Token
滑动窗口变体
class SlidingWindowMemory:
"""基于 Token 预算的滑动窗口"""
def __init__(self, max_tokens: int = 4000):
self.messages: list[dict] = []
self.max_tokens = max_tokens
def add(self, role: str, content: str):
self.messages.append({"role": role, "content": content})
self._trim()
def _trim(self):
"""从最早的消息开始裁剪,直到总 Token 数在预算内"""
while self._total_tokens() > self.max_tokens and len(self.messages) > 1:
# 保留 system message(如果有的话)
if self.messages[0]["role"] == "system":
self.messages.pop(1)
else:
self.messages.pop(0)
def _total_tokens(self) -> int:
return sum(len(m["content"]) // 4 for m in self.messages)
适用场景:简单对话、短任务、原型验证
核心限制:Token 消耗随对话增长线性增加,长对话不可持续
二、Summary Memory(摘要记忆)
用 LLM 对历史对话进行渐进式摘要,用固定长度的摘要替代完整历史。
渐进式摘要实现
class SummaryMemory:
"""渐进式摘要记忆"""
def __init__(self, llm, summary_threshold: int = 10):
self.llm = llm
self.summary = ""
self.recent_messages: list[dict] = []
self.summary_threshold = summary_threshold # 每 N 条消息触发一次摘要
def add(self, role: str, content: str):
self.recent_messages.append({"role": role, "content": content})
if len(self.recent_messages) >= self.summary_threshold:
self._update_summary()
def _update_summary(self):
"""将 recent_messages 合并进现有 summary"""
recent_text = "\n".join(
f"{m['role']}: {m['content']}" for m in self.recent_messages
)
prompt = f"""请更新以下对话摘要,合并新的对话内容。
保留关键决策、结论和未完成的任务。丢弃寒暄和重复信息。
现有摘要:
{self.summary or '(空)'}
新对话内容:
{recent_text}
更新后的摘要:"""
self.summary = self.llm.generate(prompt)
self.recent_messages = [] # 清空已摘要的消息
def get_context(self) -> list[dict]:
"""返回摘要 + 最近未摘要的消息"""
context = []
if self.summary:
context.append({
"role": "system",
"content": f"对话历史摘要:\n{self.summary}"
})
context.extend(self.recent_messages)
return context
分层摘要策略
原始对话(最近 10 轮)
│ 每 10 轮触发
v
一级摘要(最近 5 个摘要)
│ 每 5 个摘要触发
v
二级摘要(全局概述)
上下文组装 = 二级摘要 + 最近一级摘要 + 最近原始对话
class HierarchicalSummaryMemory:
"""分层摘要记忆"""
def __init__(self, llm):
self.llm = llm
self.raw_messages: list[dict] = [] # 最近原始消息
self.l1_summaries: list[str] = [] # 一级摘要
self.l2_summary: str = "" # 二级摘要(全局)
self.raw_threshold = 10 # 原始消息阈值
self.l1_threshold = 5 # 一级摘要阈值
def add(self, role: str, content: str):
self.raw_messages.append({"role": role, "content": content})
if len(self.raw_messages) >= self.raw_threshold:
l1 = self._summarize(self.raw_messages, level="detail")
self.l1_summaries.append(l1)
self.raw_messages = []
if len(self.l1_summaries) >= self.l1_threshold:
self.l2_summary = self._summarize(
self.l1_summaries, level="overview"
)
self.l1_summaries = self.l1_summaries[-1:] # 保留最近一个
def get_context(self) -> list[dict]:
parts = []
if self.l2_summary:
parts.append(f"[全局概述]\n{self.l2_summary}")
if self.l1_summaries:
parts.append(f"[近期摘要]\n{self.l1_summaries[-1]}")
context_str = "\n\n".join(parts)
result = []
if context_str:
result.append({"role": "system", "content": context_str})
result.extend(self.raw_messages)
return result
适用场景:长对话、客服场景、需要保持长期上下文但对细节精度要求不高的任务
三、Vector Memory(向量记忆)
将信息编码为向量,存入向量数据库,通过语义相似度检索相关记忆。
核心架构
存储流程:
信息片段 ──→ Embedding Model ──→ 向量 ──→ Vector Store
"用户偏好暗色主题" [0.12, -0.34, ...] ChromaDB/Pinecone
检索流程:
查询 ──→ Embedding Model ──→ 查询向量 ──→ 相似度搜索 ──→ Top-K 结果
"界面主题设置" [0.11, -0.32, ...] "用户偏好暗色主题"
实现
import chromadb
from openai import OpenAI
class VectorMemory:
"""基于向量检索的长期记忆"""
def __init__(self, collection_name: str = "agent_memory"):
self.client = chromadb.PersistentClient(path="./memory_db")
self.collection = self.client.get_or_create_collection(
name=collection_name,
metadata={"hnsw:space": "cosine"}
)
self.openai = OpenAI()
def store(self, content: str, metadata: dict = None):
"""存储一条记忆"""
doc_id = f"mem_{hash(content)}_{int(time.time())}"
self.collection.add(
documents=[content],
metadatas=[metadata or {}],
ids=[doc_id]
)
def recall(self, query: str, top_k: int = 5,
filter_metadata: dict = None) -> list[dict]:
"""根据语义相似度检索记忆"""
results = self.collection.query(
query_texts=[query],
n_results=top_k,
where=filter_metadata
)
memories = []
for i, doc in enumerate(results["documents"][0]):
memories.append({
"content": doc,
"metadata": results["metadatas"][0][i],
"distance": results["distances"][0][i]
})
return memories
def forget(self, older_than_days: int = 90):
"""清理过期记忆"""
cutoff = time.time() - older_than_days * 86400
old_ids = self.collection.get(
where={"timestamp": {"$lt": cutoff}}
)["ids"]
if old_ids:
self.collection.delete(ids=old_ids)
记忆分区策略
class PartitionedVectorMemory:
"""分区向量记忆:不同类型的记忆使用不同的检索策略"""
PARTITIONS = {
"facts": {
"description": "用户偏好、环境信息等事实性记忆",
"ttl_days": None, # 永不过期
"max_results": 10
},
"decisions": {
"description": "历史决策和原因",
"ttl_days": 365,
"max_results": 5
},
"conversations": {
"description": "对话片段",
"ttl_days": 30,
"max_results": 3
},
"errors": {
"description": "错误和修复经验",
"ttl_days": 180,
"max_results": 5
}
}
def store(self, content: str, partition: str, metadata: dict = None):
collection = self._get_collection(partition)
collection.add(
documents=[content],
metadatas=[{**(metadata or {}), "partition": partition,
"timestamp": time.time()}],
ids=[f"{partition}_{uuid4()}"]
)
def recall(self, query: str, partitions: list[str] = None) -> list[dict]:
"""跨分区检索,按相关性排序"""
all_results = []
target_partitions = partitions or list(self.PARTITIONS.keys())
for p in target_partitions:
config = self.PARTITIONS[p]
results = self._get_collection(p).query(
query_texts=[query],
n_results=config["max_results"]
)
all_results.extend(self._format_results(results, p))
# 按距离排序(距离越小越相关)
all_results.sort(key=lambda x: x["distance"])
return all_results
四、Episodic Memory(情景记忆)
记录 Agent 的完整执行轨迹,包括决策过程、工具调用、中间结果和最终结论。
数据结构设计
@dataclass
class Episode:
"""一个完整的任务执行记录"""
id: str
task: str # 任务描述
start_time: datetime
end_time: datetime
outcome: str # success / failure / partial
steps: list[EpisodeStep] # 执行步骤序列
lessons: list[str] # 经验教训
tags: list[str] # 标签(用于检索)
@dataclass
class EpisodeStep:
"""单个执行步骤"""
action: str # 采取的动作
reasoning: str # 推理过程
tool_calls: list[dict] # 工具调用记录
observation: str # 观察到的结果
evaluation: str # 对结果的评估
timestamp: datetime
实现
class EpisodicMemory:
"""情景记忆:记录和检索完整任务执行轨迹"""
def __init__(self, storage_path: str = "./episodic_memory"):
self.storage_path = Path(storage_path)
self.storage_path.mkdir(parents=True, exist_ok=True)
self.vector_index = VectorMemory("episodes")
self.episodes: dict[str, Episode] = {}
def start_episode(self, task: str) -> str:
"""开始记录一个新的 Episode"""
episode_id = f"ep_{int(time.time())}_{uuid4().hex[:8]}"
self.episodes[episode_id] = Episode(
id=episode_id,
task=task,
start_time=datetime.now(),
end_time=None,
outcome="in_progress",
steps=[],
lessons=[],
tags=[]
)
return episode_id
def add_step(self, episode_id: str, step: EpisodeStep):
"""记录一个执行步骤"""
self.episodes[episode_id].steps.append(step)
def end_episode(self, episode_id: str, outcome: str,
lessons: list[str] = None):
"""结束 Episode 并索引"""
episode = self.episodes[episode_id]
episode.end_time = datetime.now()
episode.outcome = outcome
episode.lessons = lessons or []
# 持久化存储
self._save_episode(episode)
# 向量索引(用于语义检索)
summary = self._summarize_episode(episode)
self.vector_index.store(
content=summary,
metadata={
"episode_id": episode_id,
"outcome": outcome,
"task": episode.task,
"duration_s": (episode.end_time - episode.start_time).seconds
}
)
def recall_similar(self, task: str, top_k: int = 3) -> list[Episode]:
"""检索与当前任务相似的历史 Episode"""
results = self.vector_index.recall(task, top_k=top_k)
episodes = []
for r in results:
ep_id = r["metadata"]["episode_id"]
episode = self._load_episode(ep_id)
if episode:
episodes.append(episode)
return episodes
def recall_failures(self, task: str) -> list[Episode]:
"""专门检索失败的 Episode(用于避免重蹈覆辙)"""
results = self.vector_index.recall(
task,
top_k=10,
filter_metadata={"outcome": "failure"}
)
return [self._load_episode(r["metadata"]["episode_id"])
for r in results]
经验学习流程
当前任务 ──→ 检索相似 Episode
│
├── 成功的 Episode ──→ 提取可复用的策略和步骤
│
└── 失败的 Episode ──→ 提取应避免的陷阱和教训
│
v
组装为 System Prompt 的一部分:
"你之前处理过类似任务,以下是经验教训:
- 成功策略:...
- 应避免:..."
五、混合记忆架构
实际生产系统通常需要组合多种记忆机制。
统一记忆接口
class HybridMemory:
"""混合记忆系统:统一接口,内部调度"""
def __init__(self, llm):
self.buffer = SlidingWindowMemory(max_tokens=2000)
self.summary = SummaryMemory(llm, summary_threshold=10)
self.vector = PartitionedVectorMemory()
self.episodic = EpisodicMemory()
def add_message(self, role: str, content: str):
"""每条消息同时写入多个记忆层"""
# 1. Buffer:保持最近对话
self.buffer.add(role, content)
# 2. Summary:渐进式摘要
self.summary.add(role, content)
# 3. Vector:提取值得记住的信息
if self._is_memorable(content):
partition = self._classify_partition(content)
self.vector.store(content, partition=partition)
def get_context(self, query: str = None) -> list[dict]:
"""组装完整的上下文"""
context = []
# 层 1:全局摘要(提供大背景)
summary_ctx = self.summary.get_context()
context.extend(summary_ctx)
# 层 2:相关长期记忆(语义检索)
if query:
relevant = self.vector.recall(query, top_k=5)
if relevant:
memory_text = "\n".join(
f"- {m['content']}" for m in relevant
)
context.append({
"role": "system",
"content": f"相关记忆:\n{memory_text}"
})
# 层 3:相似历史任务的经验
if query:
episodes = self.episodic.recall_similar(query, top_k=2)
if episodes:
ep_text = self._format_episodes(episodes)
context.append({
"role": "system",
"content": f"历史经验:\n{ep_text}"
})
# 层 4:最近对话(最新上下文)
context.extend(self.buffer.get_context())
return context
def _is_memorable(self, content: str) -> bool:
"""判断内容是否值得长期记忆"""
# 基于规则的快速判断
memorable_signals = [
"决定", "选择", "偏好", "用户要求",
"错误", "修复", "教训", "结论",
"API key", "配置", "环境"
]
return any(signal in content for signal in memorable_signals)
def _classify_partition(self, content: str) -> str:
"""分类记忆应存入哪个分区"""
if any(w in content for w in ["错误", "失败", "修复", "bug"]):
return "errors"
if any(w in content for w in ["决定", "选择", "方案"]):
return "decisions"
if any(w in content for w in ["偏好", "设置", "环境"]):
return "facts"
return "conversations"
上下文预算分配
总 Token 预算:8000 tokens
│
├── System Prompt:1000 tokens(固定)
│
├── 记忆上下文:3000 tokens
│ ├── 全局摘要:800 tokens
│ ├── 向量检索结果:1200 tokens
│ └── 历史经验:1000 tokens
│
├── 最近对话:3000 tokens
│
└── 预留输出:1000 tokens
class TokenBudgetAllocator:
"""Token 预算分配器"""
def __init__(self, total_budget: int = 8000):
self.total = total_budget
self.allocations = {
"system_prompt": 1000,
"summary": 800,
"vector_recall": 1200,
"episodes": 1000,
"recent_chat": 3000,
"output_reserve": 1000
}
def allocate(self, memory: HybridMemory, query: str) -> list[dict]:
"""在预算内组装上下文"""
context = []
remaining = self.total - self.allocations["output_reserve"]
# 按优先级填充
for source, budget in self.allocations.items():
if source in ("output_reserve", "system_prompt"):
continue
content = self._fetch_source(memory, source, query, budget)
if content:
tokens = self._count_tokens(content)
if tokens <= remaining:
context.extend(content)
remaining -= tokens
return context
记忆系统的持久化
文件驱动记忆(轻量方案)
memory/
MEMORY.md # 长期稳定事实(手动维护)
2026-02-28.md # 日记式增量(自动追加)
2026-02-27.md
episodes/
ep_20260228_abc.json # 情景记忆(JSON 序列化)
vectors/
chromadb/ # 向量数据库(自动管理)
class FileBasedMemory:
"""文件驱动的持久化记忆"""
def __init__(self, base_path: str = "./memory"):
self.base = Path(base_path)
self.base.mkdir(parents=True, exist_ok=True)
def append_daily(self, content: str):
"""追加到当天的日记文件"""
today = datetime.now().strftime("%Y-%m-%d")
daily_file = self.base / f"{today}.md"
timestamp = datetime.now().strftime("%H:%M:%S")
entry = f"\n## {timestamp}\n\n{content}\n"
with open(daily_file, "a", encoding="utf-8") as f:
f.write(entry)
def read_long_term(self) -> str:
"""读取长期记忆"""
memory_file = self.base / "MEMORY.md"
if memory_file.exists():
return memory_file.read_text(encoding="utf-8")
return ""
def update_long_term(self, key: str, value: str):
"""更新长期记忆中的特定条目"""
content = self.read_long_term()
# 查找并替换,或追加
marker = f"## {key}"
if marker in content:
# 替换已有条目
pattern = f"{marker}\n.*?(?=\n## |$)"
replacement = f"{marker}\n{value}"
content = re.sub(pattern, replacement, content, flags=re.DOTALL)
else:
content += f"\n{marker}\n{value}\n"
(self.base / "MEMORY.md").write_text(content, encoding="utf-8")
记忆检索优化
混合检索策略
class HybridRetriever:
"""混合检索:关键词 + 语义 + 时间衰减"""
def retrieve(self, query: str, top_k: int = 10) -> list[dict]:
# 1. 关键词检索(BM25)
keyword_results = self.bm25_search(query, top_k=top_k * 2)
# 2. 语义检索(向量)
semantic_results = self.vector_search(query, top_k=top_k * 2)
# 3. 合并去重
merged = self._merge_results(keyword_results, semantic_results)
# 4. 时间衰减加权
for result in merged:
age_days = (time.time() - result["timestamp"]) / 86400
decay = math.exp(-0.01 * age_days) # 半衰期约 70 天
result["final_score"] = result["relevance_score"] * decay
# 5. 排序返回
merged.sort(key=lambda x: x["final_score"], reverse=True)
return merged[:top_k]
记忆压缩与淘汰
| 策略 | 触发条件 | 操作 |
|---|---|---|
| TTL 淘汰 | 超过保留期限 | 删除过期记忆 |
| 访问频率 | 长期未被检索命中 | 降级或归档 |
| 重复合并 | 多条记忆语义高度重叠 | 合并为一条 |
| 重要性评分 | 低重要性 + 高年龄 | 优先淘汰 |
工程实践建议
- 从 Buffer + Summary 开始:大多数场景这两个就够了,不要过度设计
- 向量记忆的嵌入模型选择:优先选择与任务语言匹配的模型(中文场景用中文嵌入模型)
- 记忆写入要有选择:不是所有对话都值得记住,建立明确的"值得记忆"判断标准
- 定期维护:记忆系统需要定期清理过期数据、合并重复项、验证准确性
- 隐私安全:记忆中不得存储密钥、密码等敏感信息,使用占位符替代
- 可观测性:记录记忆的写入/读取/淘汰日志,便于调试和优化检索质量
参考资料
- LangChain Memory 模块文档
- Mem0:开源的 AI 记忆层
- MemGPT:操作系统风格的 LLM 记忆管理
- Anthropic Memory 研究:Claude 的长期记忆实现思路
- ChromaDB / Pinecone / Weaviate:主流向量数据库
Maurice | maurice_wen@proton.me