时序知识图谱:动态关系建模
AI 导读
时序知识图谱:动态关系建模 时序三元组、时间感知嵌入、事件图谱、变化检测与时序推理的工程实践 引言 传统知识图谱是静态的——它记录"张三在A公司工作",却无法表达"张三2020年到2023年在A公司工作,2023年后跳槽到B公司"。现实世界的知识天然具有时间维度:关系会建立和终止,属性会变化,事件有先后因果。时序知识图谱(Temporal Knowledge Graph,...
时序知识图谱:动态关系建模
时序三元组、时间感知嵌入、事件图谱、变化检测与时序推理的工程实践
引言
传统知识图谱是静态的——它记录"张三在A公司工作",却无法表达"张三2020年到2023年在A公司工作,2023年后跳槽到B公司"。现实世界的知识天然具有时间维度:关系会建立和终止,属性会变化,事件有先后因果。时序知识图谱(Temporal Knowledge Graph, TKG)通过引入时间维度,让图谱能够建模动态变化的世界。本文将系统阐述时序知识图谱的数据模型、存储方案、嵌入方法和推理能力。
时序数据模型
从静态三元组到时序三元组
静态 vs 时序知识表示
静态三元组:
(张三, 任职于, A公司) # 无时间信息
时序三元组(四元组):
(张三, 任职于, A公司, [2020-01, 2023-06]) # 时间区间
(张三, 任职于, B公司, [2023-07, now]) # 当前有效
时序五元组:
(张三, 任职于, A公司, 2020-01, 2023-06) # 开始+结束分离
事件化三元组:
Event_001: {
subject: 张三,
predicate: 入职,
object: A公司,
timestamp: 2020-01-15,
type: point_event
}
Event_002: {
subject: 张三,
predicate: 离职,
object: A公司,
timestamp: 2023-06-30,
type: point_event,
caused_by: Event_003
}
Event_003: {
subject: 张三,
predicate: 入职,
object: B公司,
timestamp: 2023-07-01,
type: point_event
}
时间模型分类
| 时间模型 | 表示方式 | 适用场景 | 复杂度 |
|---|---|---|---|
| 时间点(Point) | (s, p, o, t) |
事件发生、交易记录 | 低 |
| 时间区间(Interval) | (s, p, o, [t_start, t_end]) |
任职、合同有效期 | 中 |
| 事件图(Event Graph) | Event{subject, predicate, object, time, metadata} |
因果推理、过程建模 | 高 |
| 版本化(Versioned) | (s, p, o, version_id) |
知识演化追踪 | 中 |
| 周期性(Recurring) | (s, p, o, cron_expr) |
周期性事件(每季度财报) | 高 |
存储方案
方案一:图数据库原生时间属性
// Neo4j: 关系上附加时间属性
CREATE (p:Person {name: 'Zhang San'})
CREATE (c:Company {name: 'TechCorp'})
CREATE (p)-[:WORKS_AT {
start_date: date('2020-01-15'),
end_date: date('2023-06-30'),
role: 'Engineer',
is_current: false
}]->(c)
// 查询:某人在某时间点的任职信息
MATCH (p:Person {name: 'Zhang San'})-[r:WORKS_AT]->(c:Company)
WHERE r.start_date <= date('2022-06-01')
AND (r.end_date IS NULL OR r.end_date >= date('2022-06-01'))
RETURN c.name, r.role, r.start_date, r.end_date
// 查询:某公司在某时间段内的所有员工变动
MATCH (p:Person)-[r:WORKS_AT]->(c:Company {name: 'TechCorp'})
WHERE r.start_date >= date('2023-01-01')
OR r.end_date >= date('2023-01-01')
RETURN p.name,
r.start_date,
r.end_date,
CASE WHEN r.end_date IS NOT NULL THEN 'departed' ELSE 'active' END AS status
ORDER BY r.start_date
方案二:事件节点模式(Reification)
// 将关系提升为事件节点,支持更丰富的时序元数据
CREATE (e:Event:Employment {
event_id: 'EVT-001',
event_type: 'hire',
timestamp: datetime('2020-01-15T09:00:00'),
source: 'HR_system',
confidence: 0.99
})
CREATE (p:Person {name: 'Zhang San'})
CREATE (c:Company {name: 'TechCorp'})
CREATE (e)-[:SUBJECT]->(p)
CREATE (e)-[:OBJECT]->(c)
CREATE (e)-[:HAS_ROLE]->(:Role {title: 'Engineer', level: 'L5'})
// 事件因果链
MATCH (hire:Event {event_id: 'EVT-001'})
CREATE (resign:Event:Employment {
event_id: 'EVT-002',
event_type: 'resign',
timestamp: datetime('2023-06-30T18:00:00')
})
CREATE (resign)-[:CAUSED_BY]->(hire)
CREATE (resign)-[:PRECEDED_BY]->(hire)
// 查询:某人的完整职业时间线
MATCH (p:Person {name: 'Zhang San'})<-[:SUBJECT]-(e:Event:Employment)
OPTIONAL MATCH (e)-[:OBJECT]->(c:Company)
OPTIONAL MATCH (e)-[:HAS_ROLE]->(r:Role)
RETURN e.event_type, e.timestamp, c.name, r.title
ORDER BY e.timestamp
方案三:Python 时序图谱管理
from dataclasses import dataclass, field
from datetime import datetime, date
from typing import Optional
import json
@dataclass
class TemporalTriple:
"""A knowledge triple with temporal validity."""
subject: str
predicate: str
object: str
valid_from: datetime
valid_to: Optional[datetime] = None
source: str = ""
confidence: float = 1.0
@property
def is_current(self) -> bool:
return self.valid_to is None or self.valid_to > datetime.now()
def overlaps(self, start: datetime, end: datetime) -> bool:
"""Check if this triple is valid during [start, end]."""
triple_end = self.valid_to or datetime.max
return self.valid_from <= end and triple_end >= start
class TemporalKnowledgeGraph:
"""In-memory temporal knowledge graph with time-travel queries."""
def __init__(self):
self.triples: list[TemporalTriple] = []
self._index_subject: dict[str, list[int]] = {}
self._index_predicate: dict[str, list[int]] = {}
def add(self, triple: TemporalTriple) -> int:
"""Add a temporal triple. Auto-close previous if conflicting."""
# Check for conflicting current triples
if triple.is_current:
self._close_conflicting(triple)
idx = len(self.triples)
self.triples.append(triple)
# Update indexes
self._index_subject.setdefault(triple.subject, []).append(idx)
self._index_predicate.setdefault(triple.predicate, []).append(idx)
return idx
def _close_conflicting(self, new_triple: TemporalTriple):
"""Close existing triples that conflict with the new one."""
for idx in self._index_subject.get(new_triple.subject, []):
existing = self.triples[idx]
if (existing.predicate == new_triple.predicate
and existing.is_current
and existing.object != new_triple.object):
existing.valid_to = new_triple.valid_from
def query_at(self, timestamp: datetime,
subject: str = None,
predicate: str = None) -> list[TemporalTriple]:
"""Time-travel query: what was true at a specific time?"""
results = []
candidates = range(len(self.triples))
if subject:
candidates = self._index_subject.get(subject, [])
for idx in candidates:
t = self.triples[idx]
t_end = t.valid_to or datetime.max
if t.valid_from <= timestamp <= t_end:
if predicate is None or t.predicate == predicate:
results.append(t)
return results
def query_range(self, start: datetime, end: datetime,
subject: str = None) -> list[TemporalTriple]:
"""Range query: all triples valid during [start, end]."""
results = []
candidates = range(len(self.triples))
if subject:
candidates = self._index_subject.get(subject, [])
for idx in candidates:
t = self.triples[idx]
if t.overlaps(start, end):
results.append(t)
return results
def detect_changes(self, subject: str,
predicate: str) -> list[dict]:
"""Detect all changes for a subject-predicate pair."""
relevant = [
t for t in self.triples
if t.subject == subject and t.predicate == predicate
]
relevant.sort(key=lambda t: t.valid_from)
changes = []
for i in range(1, len(relevant)):
prev, curr = relevant[i - 1], relevant[i]
changes.append({
"timestamp": curr.valid_from.isoformat(),
"old_value": prev.object,
"new_value": curr.object,
"duration_days": (curr.valid_from - prev.valid_from).days,
})
return changes
def timeline(self, subject: str) -> list[dict]:
"""Generate a complete timeline for an entity."""
triples = sorted(
[t for t in self.triples if t.subject == subject],
key=lambda t: t.valid_from,
)
return [
{
"predicate": t.predicate,
"object": t.object,
"from": t.valid_from.isoformat(),
"to": t.valid_to.isoformat() if t.valid_to else "present",
"current": t.is_current,
}
for t in triples
]
# Usage
tkg = TemporalKnowledgeGraph()
tkg.add(TemporalTriple(
"Zhang San", "works_at", "CompanyA",
valid_from=datetime(2020, 1, 15),
valid_to=datetime(2023, 6, 30),
))
tkg.add(TemporalTriple(
"Zhang San", "works_at", "CompanyB",
valid_from=datetime(2023, 7, 1),
))
tkg.add(TemporalTriple(
"Zhang San", "title", "Engineer",
valid_from=datetime(2020, 1, 15),
valid_to=datetime(2022, 3, 1),
))
tkg.add(TemporalTriple(
"Zhang San", "title", "Senior Engineer",
valid_from=datetime(2022, 3, 1),
))
# Time-travel: where did Zhang San work in 2021?
results = tkg.query_at(datetime(2021, 6, 1), subject="Zhang San", predicate="works_at")
# -> [TemporalTriple(Zhang San, works_at, CompanyA, ...)]
# Change detection
changes = tkg.detect_changes("Zhang San", "works_at")
# -> [{"timestamp": "2023-07-01...", "old_value": "CompanyA", "new_value": "CompanyB", ...}]
时间感知嵌入
时序知识图谱嵌入方法对比
| 方法 | 时间建模 | 基础模型 | 核心思想 | 适用场景 |
|---|---|---|---|---|
| TTransE | 时间向量加法 | TransE | h + r + t_vec ≈ tail |
简单时序关系 |
| HyTE | 时间超平面投影 | TransE | 将实体投影到时间超平面 | 时间区间关系 |
| DE-SimplE | 时间函数参数化 | SimplE | 嵌入随时间连续变化 | 平滑演化 |
| TNTComplEx | 时间张量分解 | ComplEx | 四阶张量分解 | 复杂时序模式 |
| TeLM | 时间线性变换 | LLM | 时间作为线性映射 | 大规模时序KG |
时间感知嵌入实现
import numpy as np
class TemporalTransE:
"""Time-aware TransE embedding model.
Score function: ||h + r + t_time - tail||
where t_time is a learned time embedding.
"""
def __init__(self, n_entities: int, n_relations: int,
n_timestamps: int, dim: int = 128):
self.dim = dim
# Entity, relation, and time embeddings
self.entity_emb = np.random.randn(n_entities, dim) * 0.1
self.relation_emb = np.random.randn(n_relations, dim) * 0.1
self.time_emb = np.random.randn(n_timestamps, dim) * 0.1
def score(self, head: int, relation: int,
tail: int, timestamp: int) -> float:
"""Compute plausibility score for a temporal triple."""
h = self.entity_emb[head]
r = self.relation_emb[relation]
t = self.entity_emb[tail]
tau = self.time_emb[timestamp]
# h + r + tau should be close to t
return -float(np.linalg.norm(h + r + tau - t))
def predict_tail(self, head: int, relation: int,
timestamp: int, top_k: int = 10) -> list[tuple[int, float]]:
"""Predict most likely tail entities at a given time."""
h = self.entity_emb[head]
r = self.relation_emb[relation]
tau = self.time_emb[timestamp]
query = h + r + tau
distances = np.linalg.norm(self.entity_emb - query, axis=1)
top_indices = np.argsort(distances)[:top_k]
return [(int(idx), -float(distances[idx])) for idx in top_indices]
def predict_time(self, head: int, relation: int,
tail: int, top_k: int = 5) -> list[tuple[int, float]]:
"""Predict most likely timestamps for a triple."""
h = self.entity_emb[head]
r = self.relation_emb[relation]
t = self.entity_emb[tail]
target = t - h - r # tau should be close to this
distances = np.linalg.norm(self.time_emb - target, axis=1)
top_indices = np.argsort(distances)[:top_k]
return [(int(idx), -float(distances[idx])) for idx in top_indices]
class HyTE:
"""Hyperplane-based Temporally-aware Embedding.
Projects entities onto time-specific hyperplanes before scoring.
"""
def __init__(self, n_entities: int, n_relations: int,
n_timestamps: int, dim: int = 128):
self.dim = dim
self.entity_emb = np.random.randn(n_entities, dim) * 0.1
self.relation_emb = np.random.randn(n_relations, dim) * 0.1
# Normal vectors for time-specific hyperplanes
self.time_normal = np.random.randn(n_timestamps, dim)
# Normalize
norms = np.linalg.norm(self.time_normal, axis=1, keepdims=True)
self.time_normal = self.time_normal / (norms + 1e-8)
def _project(self, emb: np.ndarray, normal: np.ndarray) -> np.ndarray:
"""Project embedding onto the hyperplane defined by normal."""
return emb - np.dot(emb, normal) * normal
def score(self, head: int, relation: int,
tail: int, timestamp: int) -> float:
h = self.entity_emb[head]
r = self.relation_emb[relation]
t = self.entity_emb[tail]
n = self.time_normal[timestamp]
h_proj = self._project(h, n)
t_proj = self._project(t, n)
return -float(np.linalg.norm(h_proj + r - t_proj))
事件图谱与因果推理
事件图谱架构
事件图谱 (Event Knowledge Graph)
┌─────────────────┐
│ Event Layer │
│ (事件节点层) │
└────────┬────────┘
│
┌───────────────────┼───────────────────┐
│ │ │
┌────▼────┐ ┌────▼────┐ ┌────▼────┐
│ Event A │──causes──▶ Event B │──causes──▶ Event C │
│ 产品发布 │ │ 市场反应 │ │ 股价变动 │
│ t=2024Q1│ │ t=2024Q2│ │ t=2024Q2│
└────┬────┘ └────┬────┘ └────┬────┘
│ │ │
┌────▼────┐ ┌────▼────┐ ┌────▼────┐
│ Entity │ │ Entity │ │ Entity │
│ Layer │ │ Layer │ │ Layer │
│ 公司/产品│ │ 用户/评论│ │ 公司/股票│
└─────────┘ └─────────┘ └─────────┘
事件图谱实现
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
class EventRelation(Enum):
CAUSES = "causes"
PRECEDES = "precedes"
ENABLES = "enables"
PREVENTS = "prevents"
CORRELATES = "correlates"
@dataclass
class Event:
event_id: str
event_type: str
timestamp: datetime
description: str
entities: list[str] = field(default_factory=list)
properties: dict = field(default_factory=dict)
confidence: float = 1.0
@dataclass
class EventLink:
source_event: str
target_event: str
relation: EventRelation
confidence: float = 1.0
evidence: str = ""
class EventGraph:
"""Event-centric temporal knowledge graph."""
def __init__(self):
self.events: dict[str, Event] = {}
self.links: list[EventLink] = []
self._entity_events: dict[str, list[str]] = {}
def add_event(self, event: Event):
self.events[event.event_id] = event
for entity in event.entities:
self._entity_events.setdefault(entity, []).append(event.event_id)
def add_link(self, link: EventLink):
self.links.append(link)
def causal_chain(self, event_id: str,
max_depth: int = 5) -> list[list[str]]:
"""Find all causal chains starting from an event."""
chains = []
self._dfs_causal(event_id, [event_id], chains, max_depth)
return chains
def _dfs_causal(self, current: str, path: list[str],
chains: list, max_depth: int):
if len(path) > max_depth:
return
found_next = False
for link in self.links:
if (link.source_event == current
and link.relation == EventRelation.CAUSES
and link.target_event not in path):
found_next = True
new_path = path + [link.target_event]
self._dfs_causal(link.target_event, new_path, chains, max_depth)
if not found_next and len(path) > 1:
chains.append(path)
def entity_timeline(self, entity: str) -> list[Event]:
"""Get all events involving an entity, sorted by time."""
event_ids = self._entity_events.get(entity, [])
events = [self.events[eid] for eid in event_ids if eid in self.events]
return sorted(events, key=lambda e: e.timestamp)
def detect_patterns(self, event_type_sequence: list[str],
entity: str = None) -> list[list[Event]]:
"""Find recurring event patterns (e.g., A->B->C sequences)."""
if entity:
timeline = self.entity_timeline(entity)
else:
timeline = sorted(self.events.values(), key=lambda e: e.timestamp)
matches = []
seq_len = len(event_type_sequence)
for i in range(len(timeline) - seq_len + 1):
window = timeline[i:i + seq_len]
if all(w.event_type == s for w, s in zip(window, event_type_sequence)):
matches.append(window)
return matches
变化检测与快照对比
图谱差异计算
class TemporalDiff:
"""Compute differences between knowledge graph snapshots."""
@staticmethod
def compute_diff(snapshot_old: set[tuple], snapshot_new: set[tuple]) -> dict:
"""Compare two graph snapshots (sets of (s, p, o) triples).
Returns added, removed, and unchanged triples.
"""
added = snapshot_new - snapshot_old
removed = snapshot_old - snapshot_new
unchanged = snapshot_old & snapshot_new
return {
"added": list(added),
"removed": list(removed),
"unchanged_count": len(unchanged),
"change_rate": len(added | removed) / max(len(snapshot_old | snapshot_new), 1),
"summary": {
"added_count": len(added),
"removed_count": len(removed),
"net_change": len(added) - len(removed),
},
}
@staticmethod
def entity_change_report(diffs: list[dict],
timestamps: list[str]) -> dict:
"""Generate per-entity change report over time."""
entity_changes: dict[str, list] = {}
for diff, ts in zip(diffs, timestamps):
for s, p, o in diff["added"]:
entity_changes.setdefault(s, []).append({
"time": ts, "type": "added", "triple": (s, p, o)
})
for s, p, o in diff["removed"]:
entity_changes.setdefault(s, []).append({
"time": ts, "type": "removed", "triple": (s, p, o)
})
# Rank by change frequency
ranked = sorted(
entity_changes.items(),
key=lambda x: len(x[1]),
reverse=True,
)
return {
"most_changed_entities": [
{"entity": e, "change_count": len(c), "changes": c[:5]}
for e, c in ranked[:20]
],
"total_entities_changed": len(entity_changes),
}
时序推理
时序规则与预测
| 推理类型 | 示例 | 方法 |
|---|---|---|
| 时间约束推理 | 某人出生在X年→不可能在X-1年工作 | 约束传播 |
| 持续性推理 | 某关系通常持续N年 | 统计分布 |
| 周期性推理 | 财报每季度发布 | 周期检测 |
| 因果推理 | A事件通常导致B事件 | 因果图挖掘 |
| 趋势预测 | 基于历史变化预测未来 | 时间序列+KGE |
class TemporalReasoner:
"""Rule-based temporal reasoning over TKG."""
def __init__(self, tkg: TemporalKnowledgeGraph):
self.tkg = tkg
def check_temporal_consistency(self) -> list[dict]:
"""Find temporally inconsistent triples."""
violations = []
# Rule: birth_date must precede all other events
for t in self.tkg.triples:
if t.predicate == "born_on":
birth_time = t.valid_from
for other in self.tkg.triples:
if other.subject == t.subject and other != t:
if other.valid_from < birth_time:
violations.append({
"type": "pre_birth_event",
"entity": t.subject,
"birth": birth_time.isoformat(),
"event": other.predicate,
"event_time": other.valid_from.isoformat(),
})
return violations
def predict_duration(self, predicate: str) -> dict:
"""Predict typical duration for a relation type."""
durations = []
for t in self.tkg.triples:
if t.predicate == predicate and t.valid_to:
days = (t.valid_to - t.valid_from).days
durations.append(days)
if not durations:
return {"predicate": predicate, "data_points": 0}
arr = np.array(durations)
return {
"predicate": predicate,
"data_points": len(durations),
"mean_days": float(arr.mean()),
"median_days": float(np.median(arr)),
"std_days": float(arr.std()),
"p25_days": float(np.percentile(arr, 25)),
"p75_days": float(np.percentile(arr, 75)),
}
def find_periodic_patterns(self, subject: str,
predicate: str,
tolerance_days: int = 30) -> dict:
"""Detect if events occur periodically."""
triples = sorted(
[t for t in self.tkg.triples
if t.subject == subject and t.predicate == predicate],
key=lambda t: t.valid_from,
)
if len(triples) < 3:
return {"periodic": False, "reason": "insufficient data"}
intervals = []
for i in range(1, len(triples)):
delta = (triples[i].valid_from - triples[i - 1].valid_from).days
intervals.append(delta)
arr = np.array(intervals)
mean_interval = float(arr.mean())
std_interval = float(arr.std())
is_periodic = std_interval < tolerance_days
return {
"periodic": is_periodic,
"mean_interval_days": mean_interval,
"std_interval_days": std_interval,
"next_expected": (
triples[-1].valid_from.isoformat()
+ f" + ~{int(mean_interval)} days"
) if is_periodic else None,
"data_points": len(intervals),
}
Graphiti 框架简介
Graphiti 是 Zep 团队开源的时序知识图谱框架,专为 AI Agent 的长期记忆设计。其核心理念是将 Agent 的对话历史和外部事件转化为持续更新的时序知识图谱。
Graphiti 架构
Episode (对话/事件)
│
▼
┌──────────────┐
│ Entity Extractor │ LLM 抽取实体
└──────┬───────┘
│
▼
┌──────────────┐
│ Relation Extractor │ LLM 抽取关系
└──────┬───────┘
│
▼
┌──────────────┐
│ Temporal Resolver │ 时间归一化 + 冲突消解
└──────┬───────┘
│
▼
┌──────────────┐
│ Neo4j Storage │ 图存储 + 向量索引
└──────────────┘
特性:
- 增量更新:每次对话后即时更新图谱
- 冲突消解:新信息自动覆盖/补充旧信息
- 双重检索:结构化图查询 + 向量语义搜索
- Agent 原生:直接作为 Agent 的记忆后端
结论
时序知识图谱将"时间"从属性提升为一等公民,使得图谱能够建模真实世界的动态性。在工程实践中,选择何种时序模型取决于业务需求:简单的有效期管理用时间区间即可,复杂的因果分析需要完整的事件图谱。存储方案上,Neo4j的关系属性模式适合中等规模场景,事件节点模式(Reification)适合需要丰富事件元数据的场景。时序嵌入方法正在快速发展,TTransE和HyTE是入门的好起点,而与LLM结合的时序推理则代表了这一领域的前沿方向。
Maurice | maurice_wen@proton.me