时序知识图谱:动态关系建模

时序三元组、时间感知嵌入、事件图谱、变化检测与时序推理的工程实践

引言

传统知识图谱是静态的——它记录"张三在A公司工作",却无法表达"张三2020年到2023年在A公司工作,2023年后跳槽到B公司"。现实世界的知识天然具有时间维度:关系会建立和终止,属性会变化,事件有先后因果。时序知识图谱(Temporal Knowledge Graph, TKG)通过引入时间维度,让图谱能够建模动态变化的世界。本文将系统阐述时序知识图谱的数据模型、存储方案、嵌入方法和推理能力。

时序数据模型

从静态三元组到时序三元组

静态 vs 时序知识表示

静态三元组:
  (张三, 任职于, A公司)                    # 无时间信息

时序三元组(四元组):
  (张三, 任职于, A公司, [2020-01, 2023-06])  # 时间区间
  (张三, 任职于, B公司, [2023-07, now])       # 当前有效

时序五元组:
  (张三, 任职于, A公司, 2020-01, 2023-06)    # 开始+结束分离

事件化三元组:
  Event_001: {
    subject: 张三,
    predicate: 入职,
    object: A公司,
    timestamp: 2020-01-15,
    type: point_event
  }
  Event_002: {
    subject: 张三,
    predicate: 离职,
    object: A公司,
    timestamp: 2023-06-30,
    type: point_event,
    caused_by: Event_003
  }
  Event_003: {
    subject: 张三,
    predicate: 入职,
    object: B公司,
    timestamp: 2023-07-01,
    type: point_event
  }

时间模型分类

时间模型 表示方式 适用场景 复杂度
时间点(Point) (s, p, o, t) 事件发生、交易记录
时间区间(Interval) (s, p, o, [t_start, t_end]) 任职、合同有效期
事件图(Event Graph) Event{subject, predicate, object, time, metadata} 因果推理、过程建模
版本化(Versioned) (s, p, o, version_id) 知识演化追踪
周期性(Recurring) (s, p, o, cron_expr) 周期性事件(每季度财报)

存储方案

方案一:图数据库原生时间属性

// Neo4j: 关系上附加时间属性
CREATE (p:Person {name: 'Zhang San'})
CREATE (c:Company {name: 'TechCorp'})
CREATE (p)-[:WORKS_AT {
    start_date: date('2020-01-15'),
    end_date: date('2023-06-30'),
    role: 'Engineer',
    is_current: false
}]->(c)

// 查询:某人在某时间点的任职信息
MATCH (p:Person {name: 'Zhang San'})-[r:WORKS_AT]->(c:Company)
WHERE r.start_date <= date('2022-06-01')
  AND (r.end_date IS NULL OR r.end_date >= date('2022-06-01'))
RETURN c.name, r.role, r.start_date, r.end_date

// 查询:某公司在某时间段内的所有员工变动
MATCH (p:Person)-[r:WORKS_AT]->(c:Company {name: 'TechCorp'})
WHERE r.start_date >= date('2023-01-01')
   OR r.end_date >= date('2023-01-01')
RETURN p.name,
       r.start_date,
       r.end_date,
       CASE WHEN r.end_date IS NOT NULL THEN 'departed' ELSE 'active' END AS status
ORDER BY r.start_date

方案二:事件节点模式(Reification)

// 将关系提升为事件节点,支持更丰富的时序元数据
CREATE (e:Event:Employment {
    event_id: 'EVT-001',
    event_type: 'hire',
    timestamp: datetime('2020-01-15T09:00:00'),
    source: 'HR_system',
    confidence: 0.99
})
CREATE (p:Person {name: 'Zhang San'})
CREATE (c:Company {name: 'TechCorp'})
CREATE (e)-[:SUBJECT]->(p)
CREATE (e)-[:OBJECT]->(c)
CREATE (e)-[:HAS_ROLE]->(:Role {title: 'Engineer', level: 'L5'})

// 事件因果链
MATCH (hire:Event {event_id: 'EVT-001'})
CREATE (resign:Event:Employment {
    event_id: 'EVT-002',
    event_type: 'resign',
    timestamp: datetime('2023-06-30T18:00:00')
})
CREATE (resign)-[:CAUSED_BY]->(hire)
CREATE (resign)-[:PRECEDED_BY]->(hire)

// 查询:某人的完整职业时间线
MATCH (p:Person {name: 'Zhang San'})<-[:SUBJECT]-(e:Event:Employment)
OPTIONAL MATCH (e)-[:OBJECT]->(c:Company)
OPTIONAL MATCH (e)-[:HAS_ROLE]->(r:Role)
RETURN e.event_type, e.timestamp, c.name, r.title
ORDER BY e.timestamp

方案三:Python 时序图谱管理

from dataclasses import dataclass, field
from datetime import datetime, date
from typing import Optional
import json

@dataclass
class TemporalTriple:
    """A knowledge triple with temporal validity."""
    subject: str
    predicate: str
    object: str
    valid_from: datetime
    valid_to: Optional[datetime] = None
    source: str = ""
    confidence: float = 1.0

    @property
    def is_current(self) -> bool:
        return self.valid_to is None or self.valid_to > datetime.now()

    def overlaps(self, start: datetime, end: datetime) -> bool:
        """Check if this triple is valid during [start, end]."""
        triple_end = self.valid_to or datetime.max
        return self.valid_from <= end and triple_end >= start


class TemporalKnowledgeGraph:
    """In-memory temporal knowledge graph with time-travel queries."""

    def __init__(self):
        self.triples: list[TemporalTriple] = []
        self._index_subject: dict[str, list[int]] = {}
        self._index_predicate: dict[str, list[int]] = {}

    def add(self, triple: TemporalTriple) -> int:
        """Add a temporal triple. Auto-close previous if conflicting."""
        # Check for conflicting current triples
        if triple.is_current:
            self._close_conflicting(triple)

        idx = len(self.triples)
        self.triples.append(triple)

        # Update indexes
        self._index_subject.setdefault(triple.subject, []).append(idx)
        self._index_predicate.setdefault(triple.predicate, []).append(idx)
        return idx

    def _close_conflicting(self, new_triple: TemporalTriple):
        """Close existing triples that conflict with the new one."""
        for idx in self._index_subject.get(new_triple.subject, []):
            existing = self.triples[idx]
            if (existing.predicate == new_triple.predicate
                    and existing.is_current
                    and existing.object != new_triple.object):
                existing.valid_to = new_triple.valid_from

    def query_at(self, timestamp: datetime,
                 subject: str = None,
                 predicate: str = None) -> list[TemporalTriple]:
        """Time-travel query: what was true at a specific time?"""
        results = []
        candidates = range(len(self.triples))

        if subject:
            candidates = self._index_subject.get(subject, [])

        for idx in candidates:
            t = self.triples[idx]
            t_end = t.valid_to or datetime.max
            if t.valid_from <= timestamp <= t_end:
                if predicate is None or t.predicate == predicate:
                    results.append(t)
        return results

    def query_range(self, start: datetime, end: datetime,
                    subject: str = None) -> list[TemporalTriple]:
        """Range query: all triples valid during [start, end]."""
        results = []
        candidates = range(len(self.triples))
        if subject:
            candidates = self._index_subject.get(subject, [])

        for idx in candidates:
            t = self.triples[idx]
            if t.overlaps(start, end):
                results.append(t)
        return results

    def detect_changes(self, subject: str,
                       predicate: str) -> list[dict]:
        """Detect all changes for a subject-predicate pair."""
        relevant = [
            t for t in self.triples
            if t.subject == subject and t.predicate == predicate
        ]
        relevant.sort(key=lambda t: t.valid_from)

        changes = []
        for i in range(1, len(relevant)):
            prev, curr = relevant[i - 1], relevant[i]
            changes.append({
                "timestamp": curr.valid_from.isoformat(),
                "old_value": prev.object,
                "new_value": curr.object,
                "duration_days": (curr.valid_from - prev.valid_from).days,
            })
        return changes

    def timeline(self, subject: str) -> list[dict]:
        """Generate a complete timeline for an entity."""
        triples = sorted(
            [t for t in self.triples if t.subject == subject],
            key=lambda t: t.valid_from,
        )
        return [
            {
                "predicate": t.predicate,
                "object": t.object,
                "from": t.valid_from.isoformat(),
                "to": t.valid_to.isoformat() if t.valid_to else "present",
                "current": t.is_current,
            }
            for t in triples
        ]


# Usage
tkg = TemporalKnowledgeGraph()

tkg.add(TemporalTriple(
    "Zhang San", "works_at", "CompanyA",
    valid_from=datetime(2020, 1, 15),
    valid_to=datetime(2023, 6, 30),
))
tkg.add(TemporalTriple(
    "Zhang San", "works_at", "CompanyB",
    valid_from=datetime(2023, 7, 1),
))
tkg.add(TemporalTriple(
    "Zhang San", "title", "Engineer",
    valid_from=datetime(2020, 1, 15),
    valid_to=datetime(2022, 3, 1),
))
tkg.add(TemporalTriple(
    "Zhang San", "title", "Senior Engineer",
    valid_from=datetime(2022, 3, 1),
))

# Time-travel: where did Zhang San work in 2021?
results = tkg.query_at(datetime(2021, 6, 1), subject="Zhang San", predicate="works_at")
# -> [TemporalTriple(Zhang San, works_at, CompanyA, ...)]

# Change detection
changes = tkg.detect_changes("Zhang San", "works_at")
# -> [{"timestamp": "2023-07-01...", "old_value": "CompanyA", "new_value": "CompanyB", ...}]

时间感知嵌入

时序知识图谱嵌入方法对比

方法 时间建模 基础模型 核心思想 适用场景
TTransE 时间向量加法 TransE h + r + t_vec ≈ tail 简单时序关系
HyTE 时间超平面投影 TransE 将实体投影到时间超平面 时间区间关系
DE-SimplE 时间函数参数化 SimplE 嵌入随时间连续变化 平滑演化
TNTComplEx 时间张量分解 ComplEx 四阶张量分解 复杂时序模式
TeLM 时间线性变换 LLM 时间作为线性映射 大规模时序KG

时间感知嵌入实现

import numpy as np

class TemporalTransE:
    """Time-aware TransE embedding model.

    Score function: ||h + r + t_time - tail||
    where t_time is a learned time embedding.
    """

    def __init__(self, n_entities: int, n_relations: int,
                 n_timestamps: int, dim: int = 128):
        self.dim = dim
        # Entity, relation, and time embeddings
        self.entity_emb = np.random.randn(n_entities, dim) * 0.1
        self.relation_emb = np.random.randn(n_relations, dim) * 0.1
        self.time_emb = np.random.randn(n_timestamps, dim) * 0.1

    def score(self, head: int, relation: int,
              tail: int, timestamp: int) -> float:
        """Compute plausibility score for a temporal triple."""
        h = self.entity_emb[head]
        r = self.relation_emb[relation]
        t = self.entity_emb[tail]
        tau = self.time_emb[timestamp]

        # h + r + tau should be close to t
        return -float(np.linalg.norm(h + r + tau - t))

    def predict_tail(self, head: int, relation: int,
                     timestamp: int, top_k: int = 10) -> list[tuple[int, float]]:
        """Predict most likely tail entities at a given time."""
        h = self.entity_emb[head]
        r = self.relation_emb[relation]
        tau = self.time_emb[timestamp]

        query = h + r + tau
        distances = np.linalg.norm(self.entity_emb - query, axis=1)
        top_indices = np.argsort(distances)[:top_k]

        return [(int(idx), -float(distances[idx])) for idx in top_indices]

    def predict_time(self, head: int, relation: int,
                     tail: int, top_k: int = 5) -> list[tuple[int, float]]:
        """Predict most likely timestamps for a triple."""
        h = self.entity_emb[head]
        r = self.relation_emb[relation]
        t = self.entity_emb[tail]

        target = t - h - r  # tau should be close to this
        distances = np.linalg.norm(self.time_emb - target, axis=1)
        top_indices = np.argsort(distances)[:top_k]

        return [(int(idx), -float(distances[idx])) for idx in top_indices]


class HyTE:
    """Hyperplane-based Temporally-aware Embedding.

    Projects entities onto time-specific hyperplanes before scoring.
    """

    def __init__(self, n_entities: int, n_relations: int,
                 n_timestamps: int, dim: int = 128):
        self.dim = dim
        self.entity_emb = np.random.randn(n_entities, dim) * 0.1
        self.relation_emb = np.random.randn(n_relations, dim) * 0.1
        # Normal vectors for time-specific hyperplanes
        self.time_normal = np.random.randn(n_timestamps, dim)
        # Normalize
        norms = np.linalg.norm(self.time_normal, axis=1, keepdims=True)
        self.time_normal = self.time_normal / (norms + 1e-8)

    def _project(self, emb: np.ndarray, normal: np.ndarray) -> np.ndarray:
        """Project embedding onto the hyperplane defined by normal."""
        return emb - np.dot(emb, normal) * normal

    def score(self, head: int, relation: int,
              tail: int, timestamp: int) -> float:
        h = self.entity_emb[head]
        r = self.relation_emb[relation]
        t = self.entity_emb[tail]
        n = self.time_normal[timestamp]

        h_proj = self._project(h, n)
        t_proj = self._project(t, n)

        return -float(np.linalg.norm(h_proj + r - t_proj))

事件图谱与因果推理

事件图谱架构

事件图谱 (Event Knowledge Graph)

                    ┌─────────────────┐
                    │   Event Layer   │
                    │ (事件节点层)     │
                    └────────┬────────┘
                             │
         ┌───────────────────┼───────────────────┐
         │                   │                   │
    ┌────▼────┐        ┌────▼────┐        ┌────▼────┐
    │ Event A │──causes──▶ Event B │──causes──▶ Event C │
    │ 产品发布 │        │ 市场反应 │        │ 股价变动 │
    │ t=2024Q1│        │ t=2024Q2│        │ t=2024Q2│
    └────┬────┘        └────┬────┘        └────┬────┘
         │                   │                   │
    ┌────▼────┐        ┌────▼────┐        ┌────▼────┐
    │ Entity  │        │ Entity  │        │ Entity  │
    │ Layer   │        │ Layer   │        │ Layer   │
    │ 公司/产品│        │ 用户/评论│        │ 公司/股票│
    └─────────┘        └─────────┘        └─────────┘

事件图谱实现

from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum

class EventRelation(Enum):
    CAUSES = "causes"
    PRECEDES = "precedes"
    ENABLES = "enables"
    PREVENTS = "prevents"
    CORRELATES = "correlates"

@dataclass
class Event:
    event_id: str
    event_type: str
    timestamp: datetime
    description: str
    entities: list[str] = field(default_factory=list)
    properties: dict = field(default_factory=dict)
    confidence: float = 1.0

@dataclass
class EventLink:
    source_event: str
    target_event: str
    relation: EventRelation
    confidence: float = 1.0
    evidence: str = ""

class EventGraph:
    """Event-centric temporal knowledge graph."""

    def __init__(self):
        self.events: dict[str, Event] = {}
        self.links: list[EventLink] = []
        self._entity_events: dict[str, list[str]] = {}

    def add_event(self, event: Event):
        self.events[event.event_id] = event
        for entity in event.entities:
            self._entity_events.setdefault(entity, []).append(event.event_id)

    def add_link(self, link: EventLink):
        self.links.append(link)

    def causal_chain(self, event_id: str,
                     max_depth: int = 5) -> list[list[str]]:
        """Find all causal chains starting from an event."""
        chains = []
        self._dfs_causal(event_id, [event_id], chains, max_depth)
        return chains

    def _dfs_causal(self, current: str, path: list[str],
                    chains: list, max_depth: int):
        if len(path) > max_depth:
            return
        found_next = False
        for link in self.links:
            if (link.source_event == current
                    and link.relation == EventRelation.CAUSES
                    and link.target_event not in path):
                found_next = True
                new_path = path + [link.target_event]
                self._dfs_causal(link.target_event, new_path, chains, max_depth)
        if not found_next and len(path) > 1:
            chains.append(path)

    def entity_timeline(self, entity: str) -> list[Event]:
        """Get all events involving an entity, sorted by time."""
        event_ids = self._entity_events.get(entity, [])
        events = [self.events[eid] for eid in event_ids if eid in self.events]
        return sorted(events, key=lambda e: e.timestamp)

    def detect_patterns(self, event_type_sequence: list[str],
                        entity: str = None) -> list[list[Event]]:
        """Find recurring event patterns (e.g., A->B->C sequences)."""
        if entity:
            timeline = self.entity_timeline(entity)
        else:
            timeline = sorted(self.events.values(), key=lambda e: e.timestamp)

        matches = []
        seq_len = len(event_type_sequence)

        for i in range(len(timeline) - seq_len + 1):
            window = timeline[i:i + seq_len]
            if all(w.event_type == s for w, s in zip(window, event_type_sequence)):
                matches.append(window)

        return matches

变化检测与快照对比

图谱差异计算

class TemporalDiff:
    """Compute differences between knowledge graph snapshots."""

    @staticmethod
    def compute_diff(snapshot_old: set[tuple], snapshot_new: set[tuple]) -> dict:
        """Compare two graph snapshots (sets of (s, p, o) triples).

        Returns added, removed, and unchanged triples.
        """
        added = snapshot_new - snapshot_old
        removed = snapshot_old - snapshot_new
        unchanged = snapshot_old & snapshot_new

        return {
            "added": list(added),
            "removed": list(removed),
            "unchanged_count": len(unchanged),
            "change_rate": len(added | removed) / max(len(snapshot_old | snapshot_new), 1),
            "summary": {
                "added_count": len(added),
                "removed_count": len(removed),
                "net_change": len(added) - len(removed),
            },
        }

    @staticmethod
    def entity_change_report(diffs: list[dict],
                              timestamps: list[str]) -> dict:
        """Generate per-entity change report over time."""
        entity_changes: dict[str, list] = {}

        for diff, ts in zip(diffs, timestamps):
            for s, p, o in diff["added"]:
                entity_changes.setdefault(s, []).append({
                    "time": ts, "type": "added", "triple": (s, p, o)
                })
            for s, p, o in diff["removed"]:
                entity_changes.setdefault(s, []).append({
                    "time": ts, "type": "removed", "triple": (s, p, o)
                })

        # Rank by change frequency
        ranked = sorted(
            entity_changes.items(),
            key=lambda x: len(x[1]),
            reverse=True,
        )
        return {
            "most_changed_entities": [
                {"entity": e, "change_count": len(c), "changes": c[:5]}
                for e, c in ranked[:20]
            ],
            "total_entities_changed": len(entity_changes),
        }

时序推理

时序规则与预测

推理类型 示例 方法
时间约束推理 某人出生在X年→不可能在X-1年工作 约束传播
持续性推理 某关系通常持续N年 统计分布
周期性推理 财报每季度发布 周期检测
因果推理 A事件通常导致B事件 因果图挖掘
趋势预测 基于历史变化预测未来 时间序列+KGE
class TemporalReasoner:
    """Rule-based temporal reasoning over TKG."""

    def __init__(self, tkg: TemporalKnowledgeGraph):
        self.tkg = tkg

    def check_temporal_consistency(self) -> list[dict]:
        """Find temporally inconsistent triples."""
        violations = []

        # Rule: birth_date must precede all other events
        for t in self.tkg.triples:
            if t.predicate == "born_on":
                birth_time = t.valid_from
                for other in self.tkg.triples:
                    if other.subject == t.subject and other != t:
                        if other.valid_from < birth_time:
                            violations.append({
                                "type": "pre_birth_event",
                                "entity": t.subject,
                                "birth": birth_time.isoformat(),
                                "event": other.predicate,
                                "event_time": other.valid_from.isoformat(),
                            })
        return violations

    def predict_duration(self, predicate: str) -> dict:
        """Predict typical duration for a relation type."""
        durations = []
        for t in self.tkg.triples:
            if t.predicate == predicate and t.valid_to:
                days = (t.valid_to - t.valid_from).days
                durations.append(days)

        if not durations:
            return {"predicate": predicate, "data_points": 0}

        arr = np.array(durations)
        return {
            "predicate": predicate,
            "data_points": len(durations),
            "mean_days": float(arr.mean()),
            "median_days": float(np.median(arr)),
            "std_days": float(arr.std()),
            "p25_days": float(np.percentile(arr, 25)),
            "p75_days": float(np.percentile(arr, 75)),
        }

    def find_periodic_patterns(self, subject: str,
                                predicate: str,
                                tolerance_days: int = 30) -> dict:
        """Detect if events occur periodically."""
        triples = sorted(
            [t for t in self.tkg.triples
             if t.subject == subject and t.predicate == predicate],
            key=lambda t: t.valid_from,
        )

        if len(triples) < 3:
            return {"periodic": False, "reason": "insufficient data"}

        intervals = []
        for i in range(1, len(triples)):
            delta = (triples[i].valid_from - triples[i - 1].valid_from).days
            intervals.append(delta)

        arr = np.array(intervals)
        mean_interval = float(arr.mean())
        std_interval = float(arr.std())

        is_periodic = std_interval < tolerance_days

        return {
            "periodic": is_periodic,
            "mean_interval_days": mean_interval,
            "std_interval_days": std_interval,
            "next_expected": (
                triples[-1].valid_from.isoformat()
                + f" + ~{int(mean_interval)} days"
            ) if is_periodic else None,
            "data_points": len(intervals),
        }

Graphiti 框架简介

Graphiti 是 Zep 团队开源的时序知识图谱框架,专为 AI Agent 的长期记忆设计。其核心理念是将 Agent 的对话历史和外部事件转化为持续更新的时序知识图谱。

Graphiti 架构

Episode (对话/事件)
    │
    ▼
┌──────────────┐
│ Entity Extractor │  LLM 抽取实体
└──────┬───────┘
       │
       ▼
┌──────────────┐
│ Relation Extractor │  LLM 抽取关系
└──────┬───────┘
       │
       ▼
┌──────────────┐
│ Temporal Resolver │  时间归一化 + 冲突消解
└──────┬───────┘
       │
       ▼
┌──────────────┐
│ Neo4j Storage │  图存储 + 向量索引
└──────────────┘

特性:
- 增量更新:每次对话后即时更新图谱
- 冲突消解:新信息自动覆盖/补充旧信息
- 双重检索:结构化图查询 + 向量语义搜索
- Agent 原生:直接作为 Agent 的记忆后端

结论

时序知识图谱将"时间"从属性提升为一等公民,使得图谱能够建模真实世界的动态性。在工程实践中,选择何种时序模型取决于业务需求:简单的有效期管理用时间区间即可,复杂的因果分析需要完整的事件图谱。存储方案上,Neo4j的关系属性模式适合中等规模场景,事件节点模式(Reification)适合需要丰富事件元数据的场景。时序嵌入方法正在快速发展,TTransE和HyTE是入门的好起点,而与LLM结合的时序推理则代表了这一领域的前沿方向。


Maurice | maurice_wen@proton.me