Agent 记忆系统:从短期到长期

构建具有持续记忆能力的 AI Agent 系统


为什么 Agent 需要记忆

大语言模型的原生状态是"无记忆"的——每次请求都是一次全新的开始。Agent 记忆系统的目标是弥补这一缺陷,让 Agent 具备:

  1. 会话连续性:在同一任务的多轮交互中保持上下文
  2. 跨会话持久性:记住上一次对话的结论和决策
  3. 经验积累:从历史任务中学习,避免重复犯错
  4. 知识检索:从大规模文档库中精准提取相关信息
人类记忆的类比:
┌─────────────────────────────────────────────────────────┐
│                                                         │
│  感觉记忆          短期记忆          长期记忆            │
│  (Sensory)        (Working)        (Long-term)         │
│  ──────────       ──────────       ──────────          │
│  即时输入          当前对话          跨会话持久           │
│  < 1s             7 +/- 2 项       无限容量             │
│  自动丢弃          上下文窗口        需要编码与检索       │
│                                                         │
│  对应 Agent 实现:                                       │
│  ──────────────                                         │
│  用户输入          Conversation     Vector Store         │
│  工具返回值         Buffer          Knowledge Graph      │
│  观察结果          Sliding Window   Episodic Memory      │
│                                                         │
└─────────────────────────────────────────────────────────┘

记忆架构全景

┌──────────────────────────────────────────────────────────────┐
│                    Agent Memory System                        │
├──────────────┬──────────────┬──────────────┬────────────────┤
│   Buffer     │   Summary    │   Vector     │   Episodic     │
│   Memory     │   Memory     │   Memory     │   Memory       │
├──────────────┼──────────────┼──────────────┼────────────────┤
│ 完整对话历史  │ 压缩后的摘要  │ 语义向量索引  │ 结构化事件记录 │
│ 最近 N 轮    │ 渐进式更新    │ 相似度检索    │ 时间线 + 因果  │
│ FIFO 淘汰    │ 信息密度高    │ 大规模知识库  │ 经验学习       │
├──────────────┼──────────────┼──────────────┼────────────────┤
│ 实现简单      │ Token 效率高  │ 检索精度高    │ 推理能力强     │
│ Token 消耗高  │ 细节丢失      │ 需要嵌入模型  │ 实现复杂       │
└──────────────┴──────────────┴──────────────┴────────────────┘

一、Buffer Memory(缓冲记忆)

最简单的记忆实现:把完整的对话历史作为上下文传递给模型。

基础实现

class BufferMemory:
    """完整对话历史缓冲"""

    def __init__(self, max_messages: int = 50):
        self.messages: list[dict] = []
        self.max_messages = max_messages

    def add(self, role: str, content: str):
        self.messages.append({"role": role, "content": content})
        # FIFO 淘汰:超过上限时移除最早的消息
        if len(self.messages) > self.max_messages:
            self.messages = self.messages[-self.max_messages:]

    def get_context(self) -> list[dict]:
        return self.messages.copy()

    def token_count(self) -> int:
        """估算当前缓冲区的 Token 数量"""
        total_chars = sum(len(m["content"]) for m in self.messages)
        return total_chars // 4  # 粗略估算:4 字符约 1 Token

滑动窗口变体

class SlidingWindowMemory:
    """基于 Token 预算的滑动窗口"""

    def __init__(self, max_tokens: int = 4000):
        self.messages: list[dict] = []
        self.max_tokens = max_tokens

    def add(self, role: str, content: str):
        self.messages.append({"role": role, "content": content})
        self._trim()

    def _trim(self):
        """从最早的消息开始裁剪,直到总 Token 数在预算内"""
        while self._total_tokens() > self.max_tokens and len(self.messages) > 1:
            # 保留 system message(如果有的话)
            if self.messages[0]["role"] == "system":
                self.messages.pop(1)
            else:
                self.messages.pop(0)

    def _total_tokens(self) -> int:
        return sum(len(m["content"]) // 4 for m in self.messages)

适用场景:简单对话、短任务、原型验证

核心限制:Token 消耗随对话增长线性增加,长对话不可持续


二、Summary Memory(摘要记忆)

用 LLM 对历史对话进行渐进式摘要,用固定长度的摘要替代完整历史。

渐进式摘要实现

class SummaryMemory:
    """渐进式摘要记忆"""

    def __init__(self, llm, summary_threshold: int = 10):
        self.llm = llm
        self.summary = ""
        self.recent_messages: list[dict] = []
        self.summary_threshold = summary_threshold  # 每 N 条消息触发一次摘要

    def add(self, role: str, content: str):
        self.recent_messages.append({"role": role, "content": content})

        if len(self.recent_messages) >= self.summary_threshold:
            self._update_summary()

    def _update_summary(self):
        """将 recent_messages 合并进现有 summary"""
        recent_text = "\n".join(
            f"{m['role']}: {m['content']}" for m in self.recent_messages
        )

        prompt = f"""请更新以下对话摘要,合并新的对话内容。
保留关键决策、结论和未完成的任务。丢弃寒暄和重复信息。

现有摘要:
{self.summary or '(空)'}

新对话内容:
{recent_text}

更新后的摘要:"""

        self.summary = self.llm.generate(prompt)
        self.recent_messages = []  # 清空已摘要的消息

    def get_context(self) -> list[dict]:
        """返回摘要 + 最近未摘要的消息"""
        context = []
        if self.summary:
            context.append({
                "role": "system",
                "content": f"对话历史摘要:\n{self.summary}"
            })
        context.extend(self.recent_messages)
        return context

分层摘要策略

原始对话(最近 10 轮)
     │  每 10 轮触发
     v
一级摘要(最近 5 个摘要)
     │  每 5 个摘要触发
     v
二级摘要(全局概述)

上下文组装 = 二级摘要 + 最近一级摘要 + 最近原始对话
class HierarchicalSummaryMemory:
    """分层摘要记忆"""

    def __init__(self, llm):
        self.llm = llm
        self.raw_messages: list[dict] = []       # 最近原始消息
        self.l1_summaries: list[str] = []         # 一级摘要
        self.l2_summary: str = ""                 # 二级摘要(全局)

        self.raw_threshold = 10                   # 原始消息阈值
        self.l1_threshold = 5                     # 一级摘要阈值

    def add(self, role: str, content: str):
        self.raw_messages.append({"role": role, "content": content})

        if len(self.raw_messages) >= self.raw_threshold:
            l1 = self._summarize(self.raw_messages, level="detail")
            self.l1_summaries.append(l1)
            self.raw_messages = []

        if len(self.l1_summaries) >= self.l1_threshold:
            self.l2_summary = self._summarize(
                self.l1_summaries, level="overview"
            )
            self.l1_summaries = self.l1_summaries[-1:]  # 保留最近一个

    def get_context(self) -> list[dict]:
        parts = []
        if self.l2_summary:
            parts.append(f"[全局概述]\n{self.l2_summary}")
        if self.l1_summaries:
            parts.append(f"[近期摘要]\n{self.l1_summaries[-1]}")
        context_str = "\n\n".join(parts)

        result = []
        if context_str:
            result.append({"role": "system", "content": context_str})
        result.extend(self.raw_messages)
        return result

适用场景:长对话、客服场景、需要保持长期上下文但对细节精度要求不高的任务


三、Vector Memory(向量记忆)

将信息编码为向量,存入向量数据库,通过语义相似度检索相关记忆。

核心架构

存储流程:
信息片段 ──→ Embedding Model ──→ 向量 ──→ Vector Store
  "用户偏好暗色主题"     [0.12, -0.34, ...]     ChromaDB/Pinecone

检索流程:
查询 ──→ Embedding Model ──→ 查询向量 ──→ 相似度搜索 ──→ Top-K 结果
  "界面主题设置"     [0.11, -0.32, ...]                "用户偏好暗色主题"

实现

import chromadb
from openai import OpenAI

class VectorMemory:
    """基于向量检索的长期记忆"""

    def __init__(self, collection_name: str = "agent_memory"):
        self.client = chromadb.PersistentClient(path="./memory_db")
        self.collection = self.client.get_or_create_collection(
            name=collection_name,
            metadata={"hnsw:space": "cosine"}
        )
        self.openai = OpenAI()

    def store(self, content: str, metadata: dict = None):
        """存储一条记忆"""
        doc_id = f"mem_{hash(content)}_{int(time.time())}"

        self.collection.add(
            documents=[content],
            metadatas=[metadata or {}],
            ids=[doc_id]
        )

    def recall(self, query: str, top_k: int = 5,
               filter_metadata: dict = None) -> list[dict]:
        """根据语义相似度检索记忆"""
        results = self.collection.query(
            query_texts=[query],
            n_results=top_k,
            where=filter_metadata
        )

        memories = []
        for i, doc in enumerate(results["documents"][0]):
            memories.append({
                "content": doc,
                "metadata": results["metadatas"][0][i],
                "distance": results["distances"][0][i]
            })

        return memories

    def forget(self, older_than_days: int = 90):
        """清理过期记忆"""
        cutoff = time.time() - older_than_days * 86400
        old_ids = self.collection.get(
            where={"timestamp": {"$lt": cutoff}}
        )["ids"]

        if old_ids:
            self.collection.delete(ids=old_ids)

记忆分区策略

class PartitionedVectorMemory:
    """分区向量记忆:不同类型的记忆使用不同的检索策略"""

    PARTITIONS = {
        "facts": {
            "description": "用户偏好、环境信息等事实性记忆",
            "ttl_days": None,           # 永不过期
            "max_results": 10
        },
        "decisions": {
            "description": "历史决策和原因",
            "ttl_days": 365,
            "max_results": 5
        },
        "conversations": {
            "description": "对话片段",
            "ttl_days": 30,
            "max_results": 3
        },
        "errors": {
            "description": "错误和修复经验",
            "ttl_days": 180,
            "max_results": 5
        }
    }

    def store(self, content: str, partition: str, metadata: dict = None):
        collection = self._get_collection(partition)
        collection.add(
            documents=[content],
            metadatas=[{**(metadata or {}), "partition": partition,
                       "timestamp": time.time()}],
            ids=[f"{partition}_{uuid4()}"]
        )

    def recall(self, query: str, partitions: list[str] = None) -> list[dict]:
        """跨分区检索,按相关性排序"""
        all_results = []
        target_partitions = partitions or list(self.PARTITIONS.keys())

        for p in target_partitions:
            config = self.PARTITIONS[p]
            results = self._get_collection(p).query(
                query_texts=[query],
                n_results=config["max_results"]
            )
            all_results.extend(self._format_results(results, p))

        # 按距离排序(距离越小越相关)
        all_results.sort(key=lambda x: x["distance"])
        return all_results

四、Episodic Memory(情景记忆)

记录 Agent 的完整执行轨迹,包括决策过程、工具调用、中间结果和最终结论。

数据结构设计

@dataclass
class Episode:
    """一个完整的任务执行记录"""
    id: str
    task: str                        # 任务描述
    start_time: datetime
    end_time: datetime
    outcome: str                     # success / failure / partial
    steps: list[EpisodeStep]         # 执行步骤序列
    lessons: list[str]               # 经验教训
    tags: list[str]                  # 标签(用于检索)

@dataclass
class EpisodeStep:
    """单个执行步骤"""
    action: str                      # 采取的动作
    reasoning: str                   # 推理过程
    tool_calls: list[dict]           # 工具调用记录
    observation: str                 # 观察到的结果
    evaluation: str                  # 对结果的评估
    timestamp: datetime

实现

class EpisodicMemory:
    """情景记忆:记录和检索完整任务执行轨迹"""

    def __init__(self, storage_path: str = "./episodic_memory"):
        self.storage_path = Path(storage_path)
        self.storage_path.mkdir(parents=True, exist_ok=True)
        self.vector_index = VectorMemory("episodes")
        self.episodes: dict[str, Episode] = {}

    def start_episode(self, task: str) -> str:
        """开始记录一个新的 Episode"""
        episode_id = f"ep_{int(time.time())}_{uuid4().hex[:8]}"
        self.episodes[episode_id] = Episode(
            id=episode_id,
            task=task,
            start_time=datetime.now(),
            end_time=None,
            outcome="in_progress",
            steps=[],
            lessons=[],
            tags=[]
        )
        return episode_id

    def add_step(self, episode_id: str, step: EpisodeStep):
        """记录一个执行步骤"""
        self.episodes[episode_id].steps.append(step)

    def end_episode(self, episode_id: str, outcome: str,
                    lessons: list[str] = None):
        """结束 Episode 并索引"""
        episode = self.episodes[episode_id]
        episode.end_time = datetime.now()
        episode.outcome = outcome
        episode.lessons = lessons or []

        # 持久化存储
        self._save_episode(episode)

        # 向量索引(用于语义检索)
        summary = self._summarize_episode(episode)
        self.vector_index.store(
            content=summary,
            metadata={
                "episode_id": episode_id,
                "outcome": outcome,
                "task": episode.task,
                "duration_s": (episode.end_time - episode.start_time).seconds
            }
        )

    def recall_similar(self, task: str, top_k: int = 3) -> list[Episode]:
        """检索与当前任务相似的历史 Episode"""
        results = self.vector_index.recall(task, top_k=top_k)
        episodes = []
        for r in results:
            ep_id = r["metadata"]["episode_id"]
            episode = self._load_episode(ep_id)
            if episode:
                episodes.append(episode)
        return episodes

    def recall_failures(self, task: str) -> list[Episode]:
        """专门检索失败的 Episode(用于避免重蹈覆辙)"""
        results = self.vector_index.recall(
            task,
            top_k=10,
            filter_metadata={"outcome": "failure"}
        )
        return [self._load_episode(r["metadata"]["episode_id"])
                for r in results]

经验学习流程

当前任务 ──→ 检索相似 Episode
                │
                ├── 成功的 Episode ──→ 提取可复用的策略和步骤
                │
                └── 失败的 Episode ──→ 提取应避免的陷阱和教训
                          │
                          v
              组装为 System Prompt 的一部分:
              "你之前处理过类似任务,以下是经验教训:
               - 成功策略:...
               - 应避免:..."

五、混合记忆架构

实际生产系统通常需要组合多种记忆机制。

统一记忆接口

class HybridMemory:
    """混合记忆系统:统一接口,内部调度"""

    def __init__(self, llm):
        self.buffer = SlidingWindowMemory(max_tokens=2000)
        self.summary = SummaryMemory(llm, summary_threshold=10)
        self.vector = PartitionedVectorMemory()
        self.episodic = EpisodicMemory()

    def add_message(self, role: str, content: str):
        """每条消息同时写入多个记忆层"""
        # 1. Buffer:保持最近对话
        self.buffer.add(role, content)

        # 2. Summary:渐进式摘要
        self.summary.add(role, content)

        # 3. Vector:提取值得记住的信息
        if self._is_memorable(content):
            partition = self._classify_partition(content)
            self.vector.store(content, partition=partition)

    def get_context(self, query: str = None) -> list[dict]:
        """组装完整的上下文"""
        context = []

        # 层 1:全局摘要(提供大背景)
        summary_ctx = self.summary.get_context()
        context.extend(summary_ctx)

        # 层 2:相关长期记忆(语义检索)
        if query:
            relevant = self.vector.recall(query, top_k=5)
            if relevant:
                memory_text = "\n".join(
                    f"- {m['content']}" for m in relevant
                )
                context.append({
                    "role": "system",
                    "content": f"相关记忆:\n{memory_text}"
                })

        # 层 3:相似历史任务的经验
        if query:
            episodes = self.episodic.recall_similar(query, top_k=2)
            if episodes:
                ep_text = self._format_episodes(episodes)
                context.append({
                    "role": "system",
                    "content": f"历史经验:\n{ep_text}"
                })

        # 层 4:最近对话(最新上下文)
        context.extend(self.buffer.get_context())

        return context

    def _is_memorable(self, content: str) -> bool:
        """判断内容是否值得长期记忆"""
        # 基于规则的快速判断
        memorable_signals = [
            "决定", "选择", "偏好", "用户要求",
            "错误", "修复", "教训", "结论",
            "API key", "配置", "环境"
        ]
        return any(signal in content for signal in memorable_signals)

    def _classify_partition(self, content: str) -> str:
        """分类记忆应存入哪个分区"""
        if any(w in content for w in ["错误", "失败", "修复", "bug"]):
            return "errors"
        if any(w in content for w in ["决定", "选择", "方案"]):
            return "decisions"
        if any(w in content for w in ["偏好", "设置", "环境"]):
            return "facts"
        return "conversations"

上下文预算分配

总 Token 预算:8000 tokens
     │
     ├── System Prompt:1000 tokens(固定)
     │
     ├── 记忆上下文:3000 tokens
     │    ├── 全局摘要:800 tokens
     │    ├── 向量检索结果:1200 tokens
     │    └── 历史经验:1000 tokens
     │
     ├── 最近对话:3000 tokens
     │
     └── 预留输出:1000 tokens
class TokenBudgetAllocator:
    """Token 预算分配器"""

    def __init__(self, total_budget: int = 8000):
        self.total = total_budget
        self.allocations = {
            "system_prompt": 1000,
            "summary": 800,
            "vector_recall": 1200,
            "episodes": 1000,
            "recent_chat": 3000,
            "output_reserve": 1000
        }

    def allocate(self, memory: HybridMemory, query: str) -> list[dict]:
        """在预算内组装上下文"""
        context = []
        remaining = self.total - self.allocations["output_reserve"]

        # 按优先级填充
        for source, budget in self.allocations.items():
            if source in ("output_reserve", "system_prompt"):
                continue
            content = self._fetch_source(memory, source, query, budget)
            if content:
                tokens = self._count_tokens(content)
                if tokens <= remaining:
                    context.extend(content)
                    remaining -= tokens

        return context

记忆系统的持久化

文件驱动记忆(轻量方案)

memory/
  MEMORY.md              # 长期稳定事实(手动维护)
  2026-02-28.md          # 日记式增量(自动追加)
  2026-02-27.md
  episodes/
    ep_20260228_abc.json  # 情景记忆(JSON 序列化)
  vectors/
    chromadb/             # 向量数据库(自动管理)
class FileBasedMemory:
    """文件驱动的持久化记忆"""

    def __init__(self, base_path: str = "./memory"):
        self.base = Path(base_path)
        self.base.mkdir(parents=True, exist_ok=True)

    def append_daily(self, content: str):
        """追加到当天的日记文件"""
        today = datetime.now().strftime("%Y-%m-%d")
        daily_file = self.base / f"{today}.md"

        timestamp = datetime.now().strftime("%H:%M:%S")
        entry = f"\n## {timestamp}\n\n{content}\n"

        with open(daily_file, "a", encoding="utf-8") as f:
            f.write(entry)

    def read_long_term(self) -> str:
        """读取长期记忆"""
        memory_file = self.base / "MEMORY.md"
        if memory_file.exists():
            return memory_file.read_text(encoding="utf-8")
        return ""

    def update_long_term(self, key: str, value: str):
        """更新长期记忆中的特定条目"""
        content = self.read_long_term()
        # 查找并替换,或追加
        marker = f"## {key}"
        if marker in content:
            # 替换已有条目
            pattern = f"{marker}\n.*?(?=\n## |$)"
            replacement = f"{marker}\n{value}"
            content = re.sub(pattern, replacement, content, flags=re.DOTALL)
        else:
            content += f"\n{marker}\n{value}\n"

        (self.base / "MEMORY.md").write_text(content, encoding="utf-8")

记忆检索优化

混合检索策略

class HybridRetriever:
    """混合检索:关键词 + 语义 + 时间衰减"""

    def retrieve(self, query: str, top_k: int = 10) -> list[dict]:
        # 1. 关键词检索(BM25)
        keyword_results = self.bm25_search(query, top_k=top_k * 2)

        # 2. 语义检索(向量)
        semantic_results = self.vector_search(query, top_k=top_k * 2)

        # 3. 合并去重
        merged = self._merge_results(keyword_results, semantic_results)

        # 4. 时间衰减加权
        for result in merged:
            age_days = (time.time() - result["timestamp"]) / 86400
            decay = math.exp(-0.01 * age_days)  # 半衰期约 70 天
            result["final_score"] = result["relevance_score"] * decay

        # 5. 排序返回
        merged.sort(key=lambda x: x["final_score"], reverse=True)
        return merged[:top_k]

记忆压缩与淘汰

策略 触发条件 操作
TTL 淘汰 超过保留期限 删除过期记忆
访问频率 长期未被检索命中 降级或归档
重复合并 多条记忆语义高度重叠 合并为一条
重要性评分 低重要性 + 高年龄 优先淘汰

工程实践建议

  1. 从 Buffer + Summary 开始:大多数场景这两个就够了,不要过度设计
  2. 向量记忆的嵌入模型选择:优先选择与任务语言匹配的模型(中文场景用中文嵌入模型)
  3. 记忆写入要有选择:不是所有对话都值得记住,建立明确的"值得记忆"判断标准
  4. 定期维护:记忆系统需要定期清理过期数据、合并重复项、验证准确性
  5. 隐私安全:记忆中不得存储密钥、密码等敏感信息,使用占位符替代
  6. 可观测性:记录记忆的写入/读取/淘汰日志,便于调试和优化检索质量

参考资料

  • LangChain Memory 模块文档
  • Mem0:开源的 AI 记忆层
  • MemGPT:操作系统风格的 LLM 记忆管理
  • Anthropic Memory 研究:Claude 的长期记忆实现思路
  • ChromaDB / Pinecone / Weaviate:主流向量数据库

Maurice | maurice_wen@proton.me