知识图谱质量评估与治理框架

为什么质量治理是生死线

知识图谱的价值完全建立在数据质量之上。一个包含大量错误三元组的图谱不仅无用,还会误导下游应用(如 RAG 生成错误答案、风控系统漏判风险)。工业级知识图谱必须建立系统化的质量评估与持续治理机制。


质量维度体系

六维质量模型

┌─────────────────────────────────────────────────────────┐
│                 知识图谱质量六维模型                       │
├─────────────────────────────────────────────────────────┤
│                                                         │
│   ┌──────────┐  ┌──────────┐  ┌──────────┐             │
│   │ 完整性    │  │ 准确性    │  │ 一致性    │             │
│   │Completeness│ │ Accuracy │  │Consistency│             │
│   │           │  │          │  │          │             │
│   │ 覆盖了多少 │  │ 对了多少  │  │ 有没有矛盾│             │
│   └──────────┘  └──────────┘  └──────────┘             │
│                                                         │
│   ┌──────────┐  ┌──────────┐  ┌──────────┐             │
│   │ 时效性    │  │ 可溯源性  │  │ 语义规范性│             │
│   │ Timeliness│  │Provenance│  │ Conformity│             │
│   │           │  │          │  │          │             │
│   │ 是否过时   │  │ 能否追溯  │  │ 是否规范  │             │
│   └──────────┘  └──────────┘  └──────────┘             │
└─────────────────────────────────────────────────────────┘

各维度详细定义与指标

维度 定义 核心指标 计算方式
完整性 图谱对真实世界的覆盖程度 Schema 完整率、实体覆盖率、属性填充率 已覆盖 / 应覆盖
准确性 图谱中信息的正确程度 三元组准确率、属性值正确率 抽样验证正确数 / 抽样总数
一致性 图谱内部是否自洽 类型冲突率、关系矛盾率、重复率 冲突数 / 总三元组数
时效性 信息是否为最新 数据新鲜度、更新频率 过期数据比例
可溯源性 信息是否可追溯到原始来源 来源标注率、来源可信度 有来源标注的比例
语义规范性 是否遵循本体和命名规范 本体覆盖率、命名规范率 规范实体数 / 总实体数

完整性评估

Schema 完整性

def evaluate_schema_completeness(graph_session, ontology: dict) -> dict:
    """评估 Schema 完整性:图谱中实际使用的类型/关系是否覆盖了本体定义"""

    # 本体定义的实体类型
    expected_types = set(ontology["entity_types"])
    actual_types_result = graph_session.run(
        "MATCH (n) RETURN DISTINCT labels(n) AS labels"
    )
    actual_types = set()
    for record in actual_types_result:
        actual_types.update(record["labels"])

    # 本体定义的关系类型
    expected_rels = set(ontology["relation_types"])
    actual_rels_result = graph_session.run(
        "MATCH ()-[r]->() RETURN DISTINCT type(r) AS rel_type"
    )
    actual_rels = {record["rel_type"] for record in actual_rels_result}

    return {
        "entity_type_coverage": len(actual_types & expected_types) / len(expected_types),
        "missing_entity_types": list(expected_types - actual_types),
        "relation_type_coverage": len(actual_rels & expected_rels) / len(expected_rels),
        "missing_relation_types": list(expected_rels - actual_rels)
    }

属性填充率

def evaluate_attribute_completeness(graph_session) -> dict:
    """评估属性填充率"""

    # 定义必填属性
    required_attrs = {
        "Entity": ["name", "type", "description"],
        "Person": ["name", "birth_date", "nationality"],
        "Company": ["name", "founded_date", "industry", "headquarters"]
    }

    results = {}
    for label, attrs in required_attrs.items():
        for attr in attrs:
            total = graph_session.run(
                f"MATCH (n:{label}) RETURN count(n) AS cnt"
            ).single()["cnt"]

            filled = graph_session.run(
                f"MATCH (n:{label}) WHERE n.{attr} IS NOT NULL RETURN count(n) AS cnt"
            ).single()["cnt"]

            fill_rate = filled / total if total > 0 else 0
            results[f"{label}.{attr}"] = {
                "total": total,
                "filled": filled,
                "fill_rate": round(fill_rate, 4)
            }

    return results

准确性评估

抽样验证框架

import random

class AccuracyEvaluator:
    """准确性评估器:基于分层抽样"""

    def __init__(self, graph_session, sample_size: int = 500):
        self.session = graph_session
        self.sample_size = sample_size

    def stratified_sample(self) -> list[dict]:
        """分层抽样:按实体类型和关系类型分层"""
        # 获取各类型的三元组数量
        type_counts = self.session.run(
            """
            MATCH (s)-[r]->(o)
            RETURN labels(s)[0] AS s_type, type(r) AS rel, labels(o)[0] AS o_type,
                   count(*) AS cnt
            ORDER BY cnt DESC
            """
        ).data()

        # 按比例分配样本量
        total = sum(tc["cnt"] for tc in type_counts)
        samples = []

        for tc in type_counts:
            stratum_size = max(1, int(self.sample_size * tc["cnt"] / total))
            stratum_samples = self.session.run(
                f"""
                MATCH (s:{tc['s_type']})-[r:{tc['rel']}]->(o:{tc['o_type']})
                RETURN s.name AS subject, type(r) AS predicate, o.name AS object,
                       rand() AS rnd
                ORDER BY rnd
                LIMIT $limit
                """,
                limit=stratum_size
            ).data()
            samples.extend(stratum_samples)

        return samples[:self.sample_size]

    def auto_verify(self, triple: dict) -> dict:
        """自动验证:使用 LLM 判断三元组是否正确"""
        prompt = f"""
判断以下三元组是否事实正确:
主语: {triple['subject']}
关系: {triple['predicate']}
宾语: {triple['object']}

输出 JSON 格式:
{{"correct": true/false, "confidence": 0.0-1.0, "reason": "判断依据"}}
"""
        response = client.chat.completions.create(
            model="gpt-4o",
            messages=[{"role": "user", "content": prompt}],
            response_format={"type": "json_object"},
            temperature=0.0
        )
        return json.loads(response.choices[0].message.content)

    def batch_evaluate(self) -> dict:
        """批量评估"""
        samples = self.stratified_sample()
        correct_count = 0
        results = []

        for triple in samples:
            verification = self.auto_verify(triple)
            results.append({**triple, **verification})
            if verification.get("correct"):
                correct_count += 1

        return {
            "accuracy": correct_count / len(samples),
            "sample_size": len(samples),
            "details": results
        }

一致性检测

类型约束检查

-- 检查关系的领域/值域约束违反
-- 例如:"创立"关系的主语应该是 Person,宾语应该是 Organization
MATCH (s)-[r:创立]->(o)
WHERE NOT s:Person OR NOT o:Organization
RETURN s.name, type(r), o.name,
       labels(s) AS s_labels, labels(o) AS o_labels;

矛盾检测

def detect_contradictions(graph_session) -> list[dict]:
    """检测图谱中的矛盾三元组"""
    contradictions = []

    # 1. 函数属性矛盾(如一个人不能有两个出生日期)
    functional_attrs = ["birth_date", "founded_date", "ceo"]
    for attr in functional_attrs:
        result = graph_session.run(
            f"""
            MATCH (n)
            WHERE n.{attr} IS NOT NULL
            WITH n, collect(n.{attr}) AS values
            WHERE size(values) > 1
            RETURN n.name, values
            """
        ).data()
        for r in result:
            contradictions.append({
                "type": "functional_attribute_conflict",
                "entity": r["n.name"],
                "attribute": attr,
                "conflicting_values": r["values"]
            })

    # 2. 互斥关系矛盾(如同时是竞争对手又是母子公司)
    mutually_exclusive = [("竞争", "隶属"), ("合作", "诉讼")]
    for rel1, rel2 in mutually_exclusive:
        result = graph_session.run(
            f"""
            MATCH (a)-[:{rel1}]->(b)
            WHERE EXISTS((a)-[:{rel2}]->(b))
            RETURN a.name, b.name
            """
        ).data()
        for r in result:
            contradictions.append({
                "type": "mutually_exclusive_relation",
                "entities": [r["a.name"], r["b.name"]],
                "conflicting_relations": [rel1, rel2]
            })

    # 3. 时序矛盾(如公司成立日期晚于 IPO 日期)
    result = graph_session.run(
        """
        MATCH (c:Company)
        WHERE c.founded_date IS NOT NULL AND c.ipo_date IS NOT NULL
          AND c.founded_date > c.ipo_date
        RETURN c.name, c.founded_date, c.ipo_date
        """
    ).data()
    for r in result:
        contradictions.append({
            "type": "temporal_contradiction",
            "entity": r["c.name"],
            "detail": f"成立日期 {r['c.founded_date']} > IPO日期 {r['c.ipo_date']}"
        })

    return contradictions

重复检测

def detect_duplicates(graph_session, similarity_threshold: float = 0.9) -> list[dict]:
    """检测重复实体"""

    # 1. 精确名称重复
    exact_dupes = graph_session.run(
        """
        MATCH (n:Entity)
        WITH n.name AS name, collect(n) AS nodes
        WHERE size(nodes) > 1
        RETURN name, size(nodes) AS count,
               [node in nodes | id(node)] AS node_ids
        """
    ).data()

    # 2. 模糊名称重复(编辑距离)
    fuzzy_dupes = graph_session.run(
        """
        MATCH (a:Entity), (b:Entity)
        WHERE id(a) < id(b)
          AND a.type = b.type
          AND apoc.text.jaroWinklerDistance(a.name, b.name) > $threshold
        RETURN a.name, b.name,
               apoc.text.jaroWinklerDistance(a.name, b.name) AS similarity
        ORDER BY similarity DESC
        LIMIT 100
        """,
        threshold=similarity_threshold
    ).data()

    return {
        "exact_duplicates": exact_dupes,
        "fuzzy_duplicates": fuzzy_dupes
    }

时效性管理

数据新鲜度评估

from datetime import datetime, timedelta

def evaluate_timeliness(graph_session) -> dict:
    """评估数据时效性"""

    now = datetime.now()
    thresholds = {
        "fresh": timedelta(days=30),
        "acceptable": timedelta(days=90),
        "stale": timedelta(days=180)
    }

    result = graph_session.run(
        """
        MATCH (n)
        WHERE n.updated_at IS NOT NULL
        RETURN n.updated_at AS updated,
               count(*) AS cnt
        ORDER BY updated
        """
    ).data()

    categories = {"fresh": 0, "acceptable": 0, "stale": 0, "expired": 0}
    total = sum(r["cnt"] for r in result)

    for r in result:
        age = now - r["updated"]
        if age <= thresholds["fresh"]:
            categories["fresh"] += r["cnt"]
        elif age <= thresholds["acceptable"]:
            categories["acceptable"] += r["cnt"]
        elif age <= thresholds["stale"]:
            categories["stale"] += r["cnt"]
        else:
            categories["expired"] += r["cnt"]

    return {
        "total_entities": total,
        "distribution": {k: v / total for k, v in categories.items()},
        "counts": categories
    }

自动过期策略

-- 标记过期实体
MATCH (n:Entity)
WHERE n.updated_at < datetime() - duration('P180D')
SET n:ExpiredEntity, n.expired_at = datetime();

-- 过期实体的三元组降低置信度
MATCH (n:ExpiredEntity)-[r]-()
SET r.confidence = r.confidence * 0.5,
    r.timeliness_penalty = true;

可溯源性管理

来源元数据模型

-- 来源节点
(:Source {
  id: "src_001",
  type: "document",         -- document / api / manual / llm_extraction
  uri: "https://example.com/doc.pdf",
  author: "张三",
  publish_date: date("2025-06-01"),
  credibility_score: 0.85   -- 来源可信度
})

-- 三元组关联来源
(s)-[r:RELATED_TO]->(o)
// r 的属性中包含:
// r.source_ids = ["src_001", "src_002"]
// r.extraction_method = "llm_gpt4o"
// r.extracted_at = datetime()
// r.confidence = 0.92

-- 或者使用显式来源关系(更灵活)
(:TripleStatement {
  subject: "华为",
  predicate: "创始人",
  object: "任正非",
  confidence: 0.95
})-[:SOURCED_FROM]->(:Source {id: "src_001"})

来源可信度评估

def evaluate_provenance(graph_session) -> dict:
    """评估可溯源性"""

    # 总三元组数
    total = graph_session.run(
        "MATCH ()-[r]->() RETURN count(r) AS cnt"
    ).single()["cnt"]

    # 有来源标注的三元组数
    with_source = graph_session.run(
        "MATCH ()-[r]->() WHERE r.source_ids IS NOT NULL RETURN count(r) AS cnt"
    ).single()["cnt"]

    # 来源类型分布
    source_dist = graph_session.run(
        """
        MATCH (s:Source)
        RETURN s.type AS source_type, count(s) AS cnt
        ORDER BY cnt DESC
        """
    ).data()

    return {
        "provenance_rate": with_source / total if total > 0 else 0,
        "total_triples": total,
        "triples_with_source": with_source,
        "source_distribution": source_dist
    }

治理工作流

持续质量监控流程

┌─────────────────────────────────────────────────────────┐
│              知识图谱质量治理工作流                        │
├─────────────────────────────────────────────────────────┤
│                                                         │
│  ┌─────────┐    ┌──────────┐    ┌──────────┐           │
│  │ 数据入库 │──→ │ 入库校验  │──→ │ 增量评估  │           │
│  │ Pipeline │    │ (准入门禁)│    │ (每日)    │           │
│  └─────────┘    └──────────┘    └──────────┘           │
│                                       │                 │
│                                       ▼                 │
│                               ┌──────────┐             │
│                               │ 质量报告  │             │
│                               │ Dashboard│             │
│                               └────┬─────┘             │
│                                    │                    │
│                         ┌──────────┼──────────┐        │
│                         ▼          ▼          ▼        │
│                    ┌────────┐ ┌────────┐ ┌────────┐    │
│                    │自动修复 │ │人工审核 │ │告警通知 │    │
│                    │(去重等)│ │(低置信) │ │(异常)  │    │
│                    └────────┘ └────────┘ └────────┘    │
│                         │          │          │        │
│                         └──────────┼──────────┘        │
│                                    ▼                    │
│                               ┌──────────┐             │
│                               │ 图谱更新  │             │
│                               │ + 版本记录│             │
│                               └──────────┘             │
└─────────────────────────────────────────────────────────┘

准入门禁(入库前校验)

class IngestionGatekeeper:
    """数据入库准入门禁"""

    RULES = [
        ("entity_name_not_empty", lambda e: bool(e.get("name", "").strip())),
        ("entity_type_valid", lambda e: e.get("type") in VALID_ENTITY_TYPES),
        ("confidence_above_threshold", lambda t: t.get("confidence", 0) >= 0.6),
        ("source_annotated", lambda t: bool(t.get("source_ids"))),
        ("no_self_reference", lambda t: t["subject"] != t["object"]),
    ]

    def validate(self, data: dict) -> dict:
        """验证数据是否通过准入门禁"""
        results = []
        passed = True

        for rule_name, check_fn in self.RULES:
            try:
                ok = check_fn(data)
            except Exception:
                ok = False

            results.append({"rule": rule_name, "passed": ok})
            if not ok:
                passed = False

        return {"passed": passed, "checks": results}

质量看板(Dashboard)

def generate_quality_dashboard(graph_session) -> dict:
    """生成质量看板数据"""
    dashboard = {
        "timestamp": datetime.now().isoformat(),
        "overview": {},
        "dimensions": {}
    }

    # 总览
    overview = graph_session.run(
        """
        MATCH (n) WITH count(n) AS nodes
        MATCH ()-[r]->() WITH nodes, count(r) AS edges
        RETURN nodes, edges
        """
    ).single()
    dashboard["overview"] = {
        "total_nodes": overview["nodes"],
        "total_edges": overview["edges"]
    }

    # 各维度评分(0-100 分制)
    completeness = evaluate_attribute_completeness(graph_session)
    avg_fill = sum(v["fill_rate"] for v in completeness.values()) / len(completeness)
    dashboard["dimensions"]["completeness"] = round(avg_fill * 100, 1)

    contradictions = detect_contradictions(graph_session)
    total_edges = dashboard["overview"]["total_edges"]
    contradiction_rate = len(contradictions) / total_edges if total_edges > 0 else 0
    dashboard["dimensions"]["consistency"] = round((1 - contradiction_rate) * 100, 1)

    timeliness = evaluate_timeliness(graph_session)
    fresh_rate = timeliness["distribution"].get("fresh", 0) + timeliness["distribution"].get("acceptable", 0)
    dashboard["dimensions"]["timeliness"] = round(fresh_rate * 100, 1)

    provenance = evaluate_provenance(graph_session)
    dashboard["dimensions"]["provenance"] = round(provenance["provenance_rate"] * 100, 1)

    # 综合得分
    scores = list(dashboard["dimensions"].values())
    dashboard["overall_score"] = round(sum(scores) / len(scores), 1)

    return dashboard

自动修复策略

重复实体合并

-- 合并重复实体(保留属性最完整的节点)
MATCH (a:Entity {name: $name}), (b:Entity {name: $name})
WHERE id(a) < id(b)
-- 将 b 的关系迁移到 a
CALL {
  WITH a, b
  MATCH (b)-[r]->(target)
  MERGE (a)-[newR:RELATES_TO]->(target)
  SET newR = properties(r)
  DELETE r
}
CALL {
  WITH a, b
  MATCH (source)-[r]->(b)
  MERGE (source)-[newR:RELATES_TO]->(a)
  SET newR = properties(r)
  DELETE r
}
-- 合并属性(b 的非空属性补充到 a)
SET a += apoc.map.removeKeys(properties(b), ['name'])
DELETE b;

低置信三元组清理

def cleanup_low_confidence(
    graph_session,
    threshold: float = 0.3,
    dry_run: bool = True
) -> dict:
    """清理低置信度三元组"""

    # 查找低置信度三元组
    low_conf = graph_session.run(
        """
        MATCH (s)-[r]->(o)
        WHERE r.confidence IS NOT NULL AND r.confidence < $threshold
        RETURN s.name, type(r), o.name, r.confidence
        ORDER BY r.confidence ASC
        """,
        threshold=threshold
    ).data()

    if not dry_run:
        graph_session.run(
            """
            MATCH ()-[r]->()
            WHERE r.confidence IS NOT NULL AND r.confidence < $threshold
            DELETE r
            """,
            threshold=threshold
        )

    return {
        "mode": "dry_run" if dry_run else "executed",
        "affected_count": len(low_conf),
        "threshold": threshold,
        "samples": low_conf[:10]
    }

质量治理成熟度模型

级别 名称 特征 实践
L1 被动 出了问题才修 人工排查、临时修复
L2 监控 有质量指标但不自动化 Dashboard + 人工巡检
L3 主动 入库前校验 + 定期评估 准入门禁 + 定时任务
L4 自动化 自动发现 + 自动修复 自动去重/合并/降级
L5 持续优化 闭环反馈 + 模型迭代 主动学习 + A/B 测试

总结

知识图谱质量治理的核心原则:

  1. 度量先行:不能度量就不能改进,六维质量模型必须全覆盖
  2. 准入门禁:在入库前就拦截低质量数据,远比事后修复成本低
  3. 持续监控:质量看板每日更新,异常自动告警
  4. 自动修复优先:重复检测、低置信清理等可自动化的治理动作不要依赖人工
  5. 可溯源:每条三元组必须标注来源和抽取方法,出问题可回溯
  6. 版本管理:图谱变更做版本记录,支持回滚

Maurice | maurice_wen@proton.me