行业知识图谱案例:金融风控图谱

业务背景

金融风控是知识图谱最成熟的落地场景之一。核心挑战在于:欺诈行为通过复杂的关系网络隐藏身份,传统基于规则和特征的反欺诈系统只能发现已知模式,而图谱可以揭示隐藏的关联关系,发现新型欺诈模式。

本文以真实的金融风控场景为蓝本,详解图谱建模、欺诈模式检测和实时风控引擎的工程实践。


金融风控图谱本体设计

核心实体类型

┌──────────────────────────────────────────────────────────┐
│                  金融风控图谱本体                          │
├──────────────────────────────────────────────────────────┤
│                                                          │
│  实体类型:                                               │
│  ┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐           │
│  │ Person  │ │Company │ │Account │ │Device  │           │
│  │ 自然人  │ │ 企业   │ │ 账户   │ │ 设备   │           │
│  └────────┘ └────────┘ └────────┘ └────────┘           │
│  ┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐           │
│  │Address │ │Phone   │ │IP      │ │Loan    │           │
│  │ 地址   │ │ 电话   │ │ IP地址 │ │ 贷款   │           │
│  └────────┘ └────────┘ └────────┘ └────────┘           │
│                                                          │
│  关系类型:                                               │
│  OWNS_ACCOUNT / WORKS_AT / GUARANTEES / TRANSFERS_TO    │
│  REGISTERED_AT / USES_DEVICE / USES_PHONE / LIVES_AT    │
│  APPLIES_LOAN / CO_BORROWED / SHAREHOLDER_OF            │
│  LEGAL_REP_OF / CONTACTS / SAME_DEVICE                  │
└──────────────────────────────────────────────────────────┘

完整的 Cypher Schema 定义

-- 实体约束
CREATE CONSTRAINT person_id_unique
FOR (p:Person) REQUIRE p.id_number IS UNIQUE;

CREATE CONSTRAINT company_id_unique
FOR (c:Company) REQUIRE c.unified_credit_code IS UNIQUE;

CREATE CONSTRAINT account_id_unique
FOR (a:Account) REQUIRE a.account_number IS UNIQUE;

CREATE CONSTRAINT device_id_unique
FOR (d:Device) REQUIRE d.device_fingerprint IS UNIQUE;

-- 实体属性索引
CREATE INDEX person_name_idx FOR (p:Person) ON (p.name);
CREATE INDEX person_risk_idx FOR (p:Person) ON (p.risk_level);
CREATE INDEX company_name_idx FOR (c:Company) ON (c.name);
CREATE INDEX loan_status_idx FOR (l:Loan) ON (l.status);
CREATE INDEX loan_amount_idx FOR (l:Loan) ON (l.amount);

-- 全文搜索索引
CREATE FULLTEXT INDEX person_search
FOR (p:Person) ON EACH [p.name, p.id_number, p.phone];

数据入库示例

-- 创建自然人节点
CREATE (p:Person {
  id_number: "510101199001010011",
  name: "张三",
  gender: "M",
  birth_date: date("1990-01-01"),
  phone: "13800138000",
  risk_level: "normal",
  created_at: datetime(),
  updated_at: datetime()
});

-- 创建企业节点
CREATE (c:Company {
  unified_credit_code: "91510100MA12345678",
  name: "成都某某科技有限公司",
  legal_representative: "张三",
  registered_capital: 1000000,
  industry: "科技",
  status: "active",
  registered_date: date("2020-06-15")
});

-- 创建贷款节点
CREATE (l:Loan {
  loan_id: "LOAN202501001",
  amount: 500000,
  interest_rate: 0.045,
  term_months: 36,
  status: "active",
  apply_date: date("2025-01-15"),
  approval_date: date("2025-01-20"),
  risk_score: 72
});

-- 创建关系
MATCH (p:Person {id_number: "510101199001010011"})
MATCH (c:Company {unified_credit_code: "91510100MA12345678"})
CREATE (p)-[:LEGAL_REP_OF {since: date("2020-06-15")}]->(c);

MATCH (p:Person {id_number: "510101199001010011"})
MATCH (l:Loan {loan_id: "LOAN202501001"})
CREATE (p)-[:APPLIES_LOAN {role: "borrower"}]->(l);

典型欺诈模式与图谱检测

模式一:团伙欺诈(Gang Fraud)

多个看似独立的借款人实际互相担保、共用地址/设备/IP,形成欺诈团伙。

检测特征:
- 多人共用同一设备/IP 申请贷款
- 互相担保形成闭环
- 注册地址高度集中

        张三 ──担保──→ 李四
         │              │
         │担保           │担保
         ↓              ↓
        王五 ←──担保── 赵六
         │              │
         └──共用设备──→ Device_A
         └──共用IP───→ IP_10.0.0.1
-- 检测担保闭环(3人以上互保环)
MATCH path = (p1:Person)-[:GUARANTEES*3..6]->(p1)
WHERE ALL(r IN relationships(path) WHERE r.loan_amount > 100000)
WITH path, nodes(path) AS members
WHERE size(members) >= 3
RETURN
  [m IN members | m.name] AS gang_members,
  length(path) AS chain_length,
  reduce(total = 0, r IN relationships(path) | total + r.loan_amount) AS total_amount;

-- 检测共用设备的借款群体
MATCH (p1:Person)-[:USES_DEVICE]->(d:Device)<-[:USES_DEVICE]-(p2:Person)
WHERE p1 <> p2
  AND EXISTS((p1)-[:APPLIES_LOAN]->())
  AND EXISTS((p2)-[:APPLIES_LOAN]->())
WITH d, collect(DISTINCT p1) + collect(DISTINCT p2) AS users
WHERE size(users) >= 3
RETURN
  d.device_fingerprint AS device,
  size(users) AS user_count,
  [u IN users | u.name] AS user_names;

模式二:壳公司贷款(Shell Company Fraud)

注册大量空壳公司,利用公司信用套取贷款。

检测特征:
- 同一法人/股东注册多家公司
- 公司注册时间集中
- 公司间交叉持股
- 贷款申请时间集中

     张三(法人)
      │
      ├──→ 公司A(注册资本100万)──→ 贷款 500万
      ├──→ 公司B(注册资本100万)──→ 贷款 300万
      ├──→ 公司C(注册资本100万)──→ 贷款 200万
      └──→ 公司D(注册资本100万)──→ 贷款 400万
-- 检测同一法人控制的多家公司贷款
MATCH (p:Person)-[:LEGAL_REP_OF]->(c:Company)-[:APPLIES_LOAN]->(l:Loan)
WITH p, collect(c) AS companies, collect(l) AS loans
WHERE size(companies) >= 3
RETURN
  p.name AS legal_rep,
  p.id_number AS id,
  size(companies) AS company_count,
  [c IN companies | c.name] AS company_names,
  reduce(total = 0, l IN loans | total + l.amount) AS total_loan_amount,
  [l IN loans | l.status] AS loan_statuses;

-- 检测交叉持股网络
MATCH (c1:Company)-[:SHAREHOLDER_OF]->(c2:Company)-[:SHAREHOLDER_OF]->(c3:Company)
WHERE c1 <> c3
  AND (EXISTS((c3)-[:SHAREHOLDER_OF]->(c1)) OR c1 = c3)
RETURN c1.name, c2.name, c3.name;

模式三:身份冒用(Identity Theft)

使用他人身份信息申请贷款。

检测特征:
- 同一身份在不同设备/IP/地理位置同时出现
- 联系方式与身份不匹配
- 短时间内申请行为异常

     身份证 X ───→ 设备A(北京)  ───→ 贷款申请 09:00
                ───→ 设备B(上海)  ───→ 贷款申请 09:05
                ───→ 设备C(广州)  ───→ 贷款申请 09:10
-- 检测同一身份短时间内多地申请
MATCH (p:Person)-[:APPLIES_LOAN]->(l:Loan)
MATCH (p)-[:USES_DEVICE]->(d:Device)
WHERE l.apply_date = date()  -- 今日申请
WITH p, collect(DISTINCT d.location) AS locations,
     collect(l) AS loans,
     collect(DISTINCT d.device_fingerprint) AS devices
WHERE size(locations) >= 2 AND size(devices) >= 2
RETURN
  p.name,
  p.id_number,
  locations,
  size(loans) AS loan_count,
  size(devices) AS device_count;

模式四:资金回流(Fund Circulation)

贷款资金通过多个账户转账后回流到关联人账户。

检测特征:
- 贷款放款后短时间内大额转账
- 资金经过多个中间账户
- 最终回流到与借款人有关联的账户

  借款人A ──放款──→ 账户A ──转账──→ 账户B ──转账──→ 账户C
                                                    │
                                              转账回流
                                                    │
                                                    ▼
                                              账户D(A的关联人)
-- 检测资金回流路径(3-5 跳)
MATCH (borrower:Person)-[:APPLIES_LOAN]->(loan:Loan)
MATCH (borrower)-[:OWNS_ACCOUNT]->(start_account:Account)
MATCH path = (start_account)-[:TRANSFERS_TO*3..5]->(end_account:Account)
MATCH (related:Person)-[:OWNS_ACCOUNT]->(end_account)
WHERE loan.status = "active"
  AND EXISTS((borrower)-[:CONTACTS|GUARANTEES|LIVES_AT*1..2]-(related))
  AND borrower <> related
WITH borrower, related, path, loan,
     [r IN relationships(path) | r.amount] AS amounts,
     [r IN relationships(path) | r.transfer_date] AS dates
WHERE ALL(a IN amounts WHERE a > 10000)
  AND ALL(d IN dates WHERE d >= loan.approval_date)
RETURN
  borrower.name AS borrower,
  related.name AS beneficiary,
  loan.amount AS loan_amount,
  length(path) AS hop_count,
  amounts AS transfer_amounts,
  dates AS transfer_dates;

实时风控引擎

架构设计

┌───────────────────────────────────────────────────────────┐
│                  实时风控引擎架构                           │
├───────────────────────────────────────────────────────────┤
│                                                           │
│  ┌───────────┐    ┌──────────┐    ┌──────────────┐       │
│  │ 贷款申请   │    │ Kafka    │    │ 流处理引擎    │       │
│  │ 事件流     │──→ │ 消息队列  │──→ │ (Flink/Spark)│       │
│  └───────────┘    └──────────┘    └──────┬───────┘       │
│                                          │               │
│                         ┌────────────────┼──────────┐    │
│                         ▼                ▼          ▼    │
│                  ┌───────────┐  ┌──────────┐ ┌────────┐ │
│                  │ 特征计算   │  │ 图谱查询  │ │ 规则   │ │
│                  │ (统计特征) │  │ (关系特征)│ │ 引擎   │ │
│                  └─────┬─────┘  └────┬─────┘ └───┬────┘ │
│                        │             │           │       │
│                  ┌─────▼─────────────▼───────────▼─────┐ │
│                  │         风险评分融合层                 │ │
│                  │                                      │ │
│                  │  统计风险 + 图谱风险 + 规则风险        │ │
│                  │  ──→ 综合风险评分                     │ │
│                  └──────────────┬───────────────────────┘ │
│                                │                         │
│                         ┌──────▼──────┐                  │
│                         │ 决策引擎     │                  │
│                         │             │                  │
│                         │ 通过/拒绝    │                  │
│                         │ /人工审核    │                  │
│                         └─────────────┘                  │
└───────────────────────────────────────────────────────────┘

图谱特征计算

class GraphFeatureExtractor:
    """从知识图谱中提取风控特征"""

    def __init__(self, graph_session):
        self.session = graph_session

    def extract_features(self, person_id: str) -> dict:
        """提取个人的图谱风控特征"""
        features = {}

        # 1. 关联风险度(与黑名单的距离)
        features["blacklist_distance"] = self._blacklist_distance(person_id)

        # 2. 担保网络特征
        features.update(self._guarantee_features(person_id))

        # 3. 设备共用特征
        features.update(self._device_sharing_features(person_id))

        # 4. 资金流向特征
        features.update(self._fund_flow_features(person_id))

        # 5. 社区密度特征
        features.update(self._community_features(person_id))

        return features

    def _blacklist_distance(self, person_id: str) -> int:
        """到最近黑名单人员的图距离(-1 表示不可达)"""
        result = self.session.run(
            """
            MATCH path = shortestPath(
              (p:Person {id_number: $pid})-[*..5]-(b:Person {risk_level: "blacklisted"})
            )
            RETURN length(path) AS distance
            ORDER BY distance ASC
            LIMIT 1
            """,
            pid=person_id
        ).single()
        return result["distance"] if result else -1

    def _guarantee_features(self, person_id: str) -> dict:
        """担保网络特征"""
        result = self.session.run(
            """
            MATCH (p:Person {id_number: $pid})
            OPTIONAL MATCH (p)-[:GUARANTEES*1..3]-(g:Person)
            WITH p, collect(DISTINCT g) AS guarantee_network
            OPTIONAL MATCH (p)-[:GUARANTEES]->(:Person)-[:GUARANTEES]->(p)
            RETURN
              size(guarantee_network) AS network_size,
              count(*) > 0 AS has_circular_guarantee
            """,
            pid=person_id
        ).single()
        return {
            "guarantee_network_size": result["network_size"],
            "has_circular_guarantee": result["has_circular_guarantee"]
        }

    def _device_sharing_features(self, person_id: str) -> dict:
        """设备共用特征"""
        result = self.session.run(
            """
            MATCH (p:Person {id_number: $pid})-[:USES_DEVICE]->(d:Device)
                  <-[:USES_DEVICE]-(other:Person)
            WHERE p <> other
            WITH p, collect(DISTINCT other) AS device_sharers,
                 collect(DISTINCT d) AS shared_devices
            RETURN
              size(device_sharers) AS shared_device_users,
              size(shared_devices) AS shared_device_count
            """,
            pid=person_id
        ).single()
        return {
            "shared_device_users": result["shared_device_users"] or 0,
            "shared_device_count": result["shared_device_count"] or 0
        }

    def _fund_flow_features(self, person_id: str) -> dict:
        """资金流向特征"""
        result = self.session.run(
            """
            MATCH (p:Person {id_number: $pid})-[:OWNS_ACCOUNT]->(a:Account)
            OPTIONAL MATCH (a)-[t:TRANSFERS_TO]->(target:Account)
            WHERE t.transfer_date >= date() - duration('P30D')
            WITH p, count(t) AS transfer_count,
                 CASE WHEN count(t) > 0
                   THEN sum(t.amount) ELSE 0 END AS total_amount,
                 collect(DISTINCT target) AS unique_targets
            RETURN
              transfer_count AS transfers_30d,
              total_amount AS transfer_amount_30d,
              size(unique_targets) AS unique_transfer_targets
            """,
            pid=person_id
        ).single()
        return {
            "transfers_30d": result["transfers_30d"],
            "transfer_amount_30d": result["transfer_amount_30d"],
            "unique_transfer_targets": result["unique_transfer_targets"]
        }

    def _community_features(self, person_id: str) -> dict:
        """社区密度特征"""
        result = self.session.run(
            """
            MATCH (p:Person {id_number: $pid})-[*1..2]-(neighbor)
            WHERE neighbor:Person
            WITH p, collect(DISTINCT neighbor) AS neighbors
            UNWIND neighbors AS n1
            UNWIND neighbors AS n2
            WHERE id(n1) < id(n2)
            OPTIONAL MATCH (n1)-[]-(n2)
            WITH p, size(neighbors) AS n_count,
                 count(*) AS potential_edges,
                 sum(CASE WHEN n1 IS NOT NULL AND n2 IS NOT NULL THEN 1 ELSE 0 END) AS actual_edges
            RETURN
              n_count AS neighbor_count,
              CASE WHEN potential_edges > 0
                THEN toFloat(actual_edges) / potential_edges
                ELSE 0 END AS clustering_coefficient
            """,
            pid=person_id
        ).single()
        return {
            "neighbor_count": result["neighbor_count"],
            "clustering_coefficient": round(result["clustering_coefficient"], 4)
        }

风险评分融合

import numpy as np

class RiskScorer:
    """风险评分融合器"""

    # 特征权重(基于业务经验和模型训练)
    FEATURE_WEIGHTS = {
        "blacklist_distance": -0.3,      # 距离越近风险越高
        "guarantee_network_size": 0.15,
        "has_circular_guarantee": 0.25,   # 互保环是强风险信号
        "shared_device_users": 0.2,
        "transfers_30d": 0.05,
        "transfer_amount_30d": 0.1,
        "clustering_coefficient": 0.15,   # 密集社区 = 团伙嫌疑
    }

    def score(self, features: dict) -> dict:
        """计算综合风险评分"""
        # 特征标准化
        normalized = self._normalize(features)

        # 加权求和
        weighted_score = 0
        for feat, weight in self.FEATURE_WEIGHTS.items():
            if feat in normalized:
                weighted_score += normalized[feat] * weight

        # 映射到 0-100 分
        risk_score = min(100, max(0, int(50 + weighted_score * 50)))

        # 风险等级
        if risk_score >= 80:
            risk_level = "high"
            decision = "reject"
        elif risk_score >= 60:
            risk_level = "medium"
            decision = "manual_review"
        else:
            risk_level = "low"
            decision = "approve"

        return {
            "risk_score": risk_score,
            "risk_level": risk_level,
            "decision": decision,
            "features": features,
            "top_risk_factors": self._top_factors(normalized)
        }

    def _normalize(self, features: dict) -> dict:
        """特征标准化到 [-1, 1]"""
        normalized = {}
        for key, value in features.items():
            if isinstance(value, bool):
                normalized[key] = 1.0 if value else 0.0
            elif isinstance(value, (int, float)):
                # 简化处理,实际应用基于历史数据的分位数
                normalized[key] = min(1.0, value / 10.0) if value >= 0 else max(-1.0, value / 5.0)
        return normalized

    def _top_factors(self, normalized: dict, top_n: int = 3) -> list:
        """返回风险贡献最大的因素"""
        contributions = []
        for feat, weight in self.FEATURE_WEIGHTS.items():
            if feat in normalized:
                contributions.append({
                    "feature": feat,
                    "contribution": abs(normalized[feat] * weight)
                })
        contributions.sort(key=lambda x: x["contribution"], reverse=True)
        return contributions[:top_n]

图谱更新与维护

实时数据接入

from kafka import KafkaConsumer
import json

class GraphUpdater:
    """图谱实时更新器"""

    def __init__(self, graph_session, kafka_config: dict):
        self.session = graph_session
        self.consumer = KafkaConsumer(
            "loan_events",
            "transaction_events",
            "device_events",
            bootstrap_servers=kafka_config["brokers"],
            value_deserializer=lambda m: json.loads(m.decode("utf-8")),
            group_id="graph_updater"
        )

    def process_events(self):
        """持续消费事件并更新图谱"""
        for message in self.consumer:
            event = message.value
            event_type = event.get("type")

            if event_type == "loan_application":
                self._handle_loan_application(event)
            elif event_type == "transaction":
                self._handle_transaction(event)
            elif event_type == "device_login":
                self._handle_device_login(event)

    def _handle_loan_application(self, event: dict):
        """处理贷款申请事件"""
        self.session.run(
            """
            MERGE (p:Person {id_number: $person_id})
            SET p.name = $name, p.updated_at = datetime()
            MERGE (l:Loan {loan_id: $loan_id})
            SET l.amount = $amount, l.status = "pending",
                l.apply_date = date($apply_date)
            MERGE (p)-[:APPLIES_LOAN {role: "borrower"}]->(l)
            """,
            person_id=event["person_id"],
            name=event["person_name"],
            loan_id=event["loan_id"],
            amount=event["amount"],
            apply_date=event["apply_date"]
        )

    def _handle_transaction(self, event: dict):
        """处理交易事件"""
        self.session.run(
            """
            MATCH (from:Account {account_number: $from_account})
            MATCH (to:Account {account_number: $to_account})
            CREATE (from)-[:TRANSFERS_TO {
              amount: $amount,
              transfer_date: date($date),
              channel: $channel
            }]->(to)
            """,
            from_account=event["from_account"],
            to_account=event["to_account"],
            amount=event["amount"],
            date=event["date"],
            channel=event.get("channel", "online")
        )

合规与隐私

数据脱敏策略

数据类型 脱敏方法 示例
身份证号 中间6位掩码 510101******0011
手机号 中间4位掩码 138****8000
银行卡号 保留前6后4 622848********1234
姓名 保留姓氏 张**
地址 精度降低到区/县 四川省成都市锦江区

审计日志

-- 创建审计日志节点
CREATE (log:AuditLog {
  query_id: $query_id,
  operator: $operator,
  query_type: "risk_assessment",
  target_person: $person_id,
  timestamp: datetime(),
  result: $result,
  ip_address: $ip
});

-- 关联操作人
MATCH (op:Operator {id: $operator})
MATCH (log:AuditLog {query_id: $query_id})
CREATE (op)-[:PERFORMED]->(log);

关键性能指标

指标 目标值 实际基准
单笔风控查询延迟 < 200ms P99 ~150ms
团伙检测查询延迟 < 2s P99 ~1.5s
资金回流检测深度 5 跳 3-5 跳
图谱更新延迟 < 5s 均值 ~2s
欺诈检出率 > 85% ~88%
误报率 < 5% ~3.2%

总结

金融风控图谱的核心工程要点:

  1. 本体设计决定上限:实体类型和关系类型要覆盖所有欺诈模式的检测需求
  2. 图查询即风控规则:Cypher 查询本身就是可解释的风控规则,便于合规审计
  3. 实时性是生命线:从事件发生到图谱更新到风控决策,全链路延迟必须控制在秒级
  4. 特征融合:图谱特征(关系距离、社区密度)+ 统计特征 + 规则,三者融合效果最佳
  5. 合规先行:数据脱敏、访问控制、审计日志是金融监管的硬性要求

Maurice | maurice_wen@proton.me