实体消解技术详解

字符串匹配、嵌入方法、LLM消解与分块策略:大规模实体对齐工程实践

引言

实体消解(Entity Resolution, ER)——也称实体匹配、记录链接或去重——是确定两条记录是否指向同一现实世界实体的过程。在知识图谱中,同一个实体可能以不同名称、不同拼写甚至不同语言出现。如果不做消解,图谱中将充满冗余节点,查询结果将不完整甚至矛盾。本文将系统梳理实体消解的技术方案和工程实践。

问题定义

实体消解的挑战

实体消解挑战示例

同一实体的不同表述:
  "阿里巴巴集团" / "Alibaba Group" / "BABA" / "阿里" / "阿里巴巴"
  "张三丰" / "Zhang Sanfeng" / "张三风"(错别字)
  "北京大学" / "Peking University" / "PKU" / "北大"

歧义(不同实体同名):
  "苹果" → Apple Inc. vs. 水果
  "长城" → 长城汽车 vs. 万里长城
  "Python" → 编程语言 vs. 蟒蛇

需要上下文消歧:
  "他在苹果工作" → Apple Inc.
  "他买了一个苹果" → 水果

技术方案

方法一:字符串匹配

from difflib import SequenceMatcher
import re

class StringMatcher:
    """String-based entity matching methods."""

    @staticmethod
    def exact_match(a: str, b: str) -> float:
        """Exact string match after normalization."""
        return 1.0 if StringMatcher.normalize(a) == StringMatcher.normalize(b) else 0.0

    @staticmethod
    def normalize(text: str) -> str:
        """Normalize entity name for matching."""
        text = text.lower().strip()
        text = re.sub(r'[^\w\s]', '', text)  # Remove punctuation
        text = re.sub(r'\s+', ' ', text)      # Collapse whitespace
        return text

    @staticmethod
    def edit_distance_sim(a: str, b: str) -> float:
        """Levenshtein distance normalized similarity."""
        a, b = a.lower(), b.lower()
        if not a or not b:
            return 0.0
        max_len = max(len(a), len(b))
        # Simple DP edit distance
        dp = [[0] * (len(b) + 1) for _ in range(len(a) + 1)]
        for i in range(len(a) + 1):
            dp[i][0] = i
        for j in range(len(b) + 1):
            dp[0][j] = j
        for i in range(1, len(a) + 1):
            for j in range(1, len(b) + 1):
                cost = 0 if a[i-1] == b[j-1] else 1
                dp[i][j] = min(dp[i-1][j] + 1, dp[i][j-1] + 1, dp[i-1][j-1] + cost)
        return 1.0 - dp[len(a)][len(b)] / max_len

    @staticmethod
    def jaccard_sim(a: str, b: str, ngram: int = 2) -> float:
        """Jaccard similarity on character n-grams."""
        def ngrams(text, n):
            return set(text[i:i+n] for i in range(len(text) - n + 1))
        set_a = ngrams(a.lower(), ngram)
        set_b = ngrams(b.lower(), ngram)
        if not set_a or not set_b:
            return 0.0
        return len(set_a & set_b) / len(set_a | set_b)

    @staticmethod
    def sequence_match(a: str, b: str) -> float:
        """SequenceMatcher ratio (good for partial matches)."""
        return SequenceMatcher(None, a.lower(), b.lower()).ratio()


# Comparison
pairs = [
    ("阿里巴巴集团", "阿里巴巴"),
    ("Alibaba Group", "alibaba"),
    ("北京大学", "Peking University"),
    ("张三丰", "张三风"),
]

for a, b in pairs:
    print(f"\n'{a}' vs '{b}':")
    print(f"  Exact:    {StringMatcher.exact_match(a, b):.2f}")
    print(f"  Edit:     {StringMatcher.edit_distance_sim(a, b):.2f}")
    print(f"  Jaccard:  {StringMatcher.jaccard_sim(a, b):.2f}")
    print(f"  Sequence: {StringMatcher.sequence_match(a, b):.2f}")

方法二:嵌入方法

import numpy as np

class EmbeddingMatcher:
    """Embedding-based entity matching."""

    def __init__(self, embed_fn, threshold: float = 0.85):
        self.embed = embed_fn
        self.threshold = threshold

    def match(self, entity_a: str, entity_b: str,
              context_a: str = "", context_b: str = "") -> dict:
        """Match entities using embedding similarity."""
        # Name-level similarity
        name_embs = self.embed([entity_a, entity_b])
        name_sim = self._cosine_sim(name_embs[0], name_embs[1])

        # Context-level similarity (if available)
        if context_a and context_b:
            ctx_embs = self.embed([
                f"{entity_a}: {context_a}",
                f"{entity_b}: {context_b}",
            ])
            ctx_sim = self._cosine_sim(ctx_embs[0], ctx_embs[1])
            combined = 0.4 * name_sim + 0.6 * ctx_sim
        else:
            ctx_sim = None
            combined = name_sim

        return {
            "name_similarity": float(name_sim),
            "context_similarity": float(ctx_sim) if ctx_sim else None,
            "combined_score": float(combined),
            "is_match": combined >= self.threshold,
        }

    def batch_match(self, entities: list[str],
                     threshold: float = None) -> list[tuple[int, int, float]]:
        """Find all matching pairs in a batch of entities."""
        threshold = threshold or self.threshold
        embeddings = self.embed(entities)
        emb_matrix = np.array(embeddings)

        # Compute pairwise cosine similarity
        norms = np.linalg.norm(emb_matrix, axis=1, keepdims=True)
        normalized = emb_matrix / (norms + 1e-8)
        sim_matrix = normalized @ normalized.T

        # Extract matches above threshold
        matches = []
        for i in range(len(entities)):
            for j in range(i + 1, len(entities)):
                if sim_matrix[i, j] >= threshold:
                    matches.append((i, j, float(sim_matrix[i, j])))

        return sorted(matches, key=lambda x: -x[2])

    def _cosine_sim(self, a, b) -> float:
        a, b = np.array(a), np.array(b)
        return float(np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b) + 1e-8))

方法三:LLM消解

class LLMEntityResolver:
    """Use LLM for complex entity resolution."""

    def __init__(self, llm):
        self.llm = llm

    def resolve(self, entity_a: dict, entity_b: dict) -> dict:
        """Determine if two entity records refer to the same entity."""
        prompt = f"""Determine if these two records refer to the same real-world entity.

Record A:
  Name: {entity_a.get('name')}
  Type: {entity_a.get('type')}
  Context: {entity_a.get('context', 'N/A')}
  Properties: {entity_a.get('properties', {})}

Record B:
  Name: {entity_b.get('name')}
  Type: {entity_b.get('type')}
  Context: {entity_b.get('context', 'N/A')}
  Properties: {entity_b.get('properties', {})}

Answer in JSON:
{{"same_entity": true/false, "confidence": 0.0-1.0, "reasoning": "..."}}
"""
        import json
        response = self.llm.generate(prompt, temperature=0)
        try:
            return json.loads(response)
        except:
            return {"same_entity": False, "confidence": 0.0, "reasoning": "Parse error"}

分块策略(Blocking)

为什么需要分块

全量对比的复杂度是O(n^2)。100万条记录意味着5000亿次对比。分块(Blocking)通过将可能匹配的记录分到同一组中,将对比次数降低到可接受的范围。

分块策略 原理 召回率 效率 适用
标准分块 按某字段值精确分组 属性值标准化好
排序邻域 按排序键取滑动窗口 中高 名称相似
LSH分块 局部敏感哈希 大规模嵌入匹配
Canopy聚类 先粗聚类再精匹配 通用
TF-IDF分块 按TF-IDF向量近似 文本实体
class BlockingStrategy:
    """Blocking strategies to reduce comparison space."""

    @staticmethod
    def standard_blocking(records: list[dict], key: str) -> dict[str, list]:
        """Group records by exact match on a blocking key."""
        blocks = {}
        for record in records:
            block_key = record.get(key, "").lower().strip()
            blocks.setdefault(block_key, []).append(record)
        return blocks

    @staticmethod
    def sorted_neighborhood(records: list[dict], key: str,
                             window: int = 5) -> list[tuple]:
        """Sorted Neighborhood Method: sort by key, compare within window."""
        sorted_records = sorted(records, key=lambda r: r.get(key, "").lower())
        pairs = []
        for i in range(len(sorted_records)):
            for j in range(i + 1, min(i + window, len(sorted_records))):
                pairs.append((sorted_records[i], sorted_records[j]))
        return pairs

    @staticmethod
    def prefix_blocking(records: list[dict], key: str,
                         prefix_len: int = 3) -> dict[str, list]:
        """Block by name prefix."""
        blocks = {}
        for record in records:
            name = record.get(key, "").lower().strip()
            prefix = name[:prefix_len] if len(name) >= prefix_len else name
            blocks.setdefault(prefix, []).append(record)
        return blocks

评估指标

消解质量评估

指标 公式 含义
Precision TP / (TP + FP) 匹配对中真正匹配的比例
Recall TP / (TP + FN) 所有真匹配被找到的比例
F1 2PR / (P + R) 精度和召回的调和平均
Pair Completeness 候选对中真匹配数 / 总真匹配数 分块质量评估
Reduction Ratio 1 - 候选对数 / 全量对数 分块效率评估
def evaluate_er(predictions: list[tuple], ground_truth: set[tuple]) -> dict:
    """Evaluate entity resolution results."""
    pred_set = set(predictions)
    tp = len(pred_set & ground_truth)
    fp = len(pred_set - ground_truth)
    fn = len(ground_truth - pred_set)

    precision = tp / (tp + fp) if (tp + fp) > 0 else 0
    recall = tp / (tp + fn) if (tp + fn) > 0 else 0
    f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0

    return {
        "precision": round(precision, 4),
        "recall": round(recall, 4),
        "f1": round(f1, 4),
        "true_positives": tp,
        "false_positives": fp,
        "false_negatives": fn,
    }

结论

实体消解是知识图谱数据质量的基石。在工程实践中,建议采用"分层过滤"策略:首先用高效的分块策略(如前缀分块+排序邻域)将候选对缩小到可计算范围,然后用字符串相似度做快速初筛,再用嵌入相似度做精确匹配,最后对边界case使用LLM做最终判断。这种分层方法在保证召回率的同时,将计算成本控制在可接受的范围内。


Maurice | maurice_wen@proton.me