行业知识图谱案例:金融风控图谱
原创
灵阙教研团队
S 精选 进阶 |
约 12 分钟阅读
更新于 2026-02-28 AI 导读
行业知识图谱案例:金融风控图谱 业务背景 金融风控是知识图谱最成熟的落地场景之一。核心挑战在于:欺诈行为通过复杂的关系网络隐藏身份,传统基于规则和特征的反欺诈系统只能发现已知模式,而图谱可以揭示隐藏的关联关系,发现新型欺诈模式。 本文以真实的金融风控场景为蓝本,详解图谱建模、欺诈模式检测和实时风控引擎的工程实践。 金融风控图谱本体设计 核心实体类型...
行业知识图谱案例:金融风控图谱
业务背景
金融风控是知识图谱最成熟的落地场景之一。核心挑战在于:欺诈行为通过复杂的关系网络隐藏身份,传统基于规则和特征的反欺诈系统只能发现已知模式,而图谱可以揭示隐藏的关联关系,发现新型欺诈模式。
本文以真实的金融风控场景为蓝本,详解图谱建模、欺诈模式检测和实时风控引擎的工程实践。
金融风控图谱本体设计
核心实体类型
┌──────────────────────────────────────────────────────────┐
│ 金融风控图谱本体 │
├──────────────────────────────────────────────────────────┤
│ │
│ 实体类型: │
│ ┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐ │
│ │ Person │ │Company │ │Account │ │Device │ │
│ │ 自然人 │ │ 企业 │ │ 账户 │ │ 设备 │ │
│ └────────┘ └────────┘ └────────┘ └────────┘ │
│ ┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐ │
│ │Address │ │Phone │ │IP │ │Loan │ │
│ │ 地址 │ │ 电话 │ │ IP地址 │ │ 贷款 │ │
│ └────────┘ └────────┘ └────────┘ └────────┘ │
│ │
│ 关系类型: │
│ OWNS_ACCOUNT / WORKS_AT / GUARANTEES / TRANSFERS_TO │
│ REGISTERED_AT / USES_DEVICE / USES_PHONE / LIVES_AT │
│ APPLIES_LOAN / CO_BORROWED / SHAREHOLDER_OF │
│ LEGAL_REP_OF / CONTACTS / SAME_DEVICE │
└──────────────────────────────────────────────────────────┘
完整的 Cypher Schema 定义
-- 实体约束
CREATE CONSTRAINT person_id_unique
FOR (p:Person) REQUIRE p.id_number IS UNIQUE;
CREATE CONSTRAINT company_id_unique
FOR (c:Company) REQUIRE c.unified_credit_code IS UNIQUE;
CREATE CONSTRAINT account_id_unique
FOR (a:Account) REQUIRE a.account_number IS UNIQUE;
CREATE CONSTRAINT device_id_unique
FOR (d:Device) REQUIRE d.device_fingerprint IS UNIQUE;
-- 实体属性索引
CREATE INDEX person_name_idx FOR (p:Person) ON (p.name);
CREATE INDEX person_risk_idx FOR (p:Person) ON (p.risk_level);
CREATE INDEX company_name_idx FOR (c:Company) ON (c.name);
CREATE INDEX loan_status_idx FOR (l:Loan) ON (l.status);
CREATE INDEX loan_amount_idx FOR (l:Loan) ON (l.amount);
-- 全文搜索索引
CREATE FULLTEXT INDEX person_search
FOR (p:Person) ON EACH [p.name, p.id_number, p.phone];
数据入库示例
-- 创建自然人节点
CREATE (p:Person {
id_number: "510101199001010011",
name: "张三",
gender: "M",
birth_date: date("1990-01-01"),
phone: "13800138000",
risk_level: "normal",
created_at: datetime(),
updated_at: datetime()
});
-- 创建企业节点
CREATE (c:Company {
unified_credit_code: "91510100MA12345678",
name: "成都某某科技有限公司",
legal_representative: "张三",
registered_capital: 1000000,
industry: "科技",
status: "active",
registered_date: date("2020-06-15")
});
-- 创建贷款节点
CREATE (l:Loan {
loan_id: "LOAN202501001",
amount: 500000,
interest_rate: 0.045,
term_months: 36,
status: "active",
apply_date: date("2025-01-15"),
approval_date: date("2025-01-20"),
risk_score: 72
});
-- 创建关系
MATCH (p:Person {id_number: "510101199001010011"})
MATCH (c:Company {unified_credit_code: "91510100MA12345678"})
CREATE (p)-[:LEGAL_REP_OF {since: date("2020-06-15")}]->(c);
MATCH (p:Person {id_number: "510101199001010011"})
MATCH (l:Loan {loan_id: "LOAN202501001"})
CREATE (p)-[:APPLIES_LOAN {role: "borrower"}]->(l);
典型欺诈模式与图谱检测
模式一:团伙欺诈(Gang Fraud)
多个看似独立的借款人实际互相担保、共用地址/设备/IP,形成欺诈团伙。
检测特征:
- 多人共用同一设备/IP 申请贷款
- 互相担保形成闭环
- 注册地址高度集中
张三 ──担保──→ 李四
│ │
│担保 │担保
↓ ↓
王五 ←──担保── 赵六
│ │
└──共用设备──→ Device_A
└──共用IP───→ IP_10.0.0.1
-- 检测担保闭环(3人以上互保环)
MATCH path = (p1:Person)-[:GUARANTEES*3..6]->(p1)
WHERE ALL(r IN relationships(path) WHERE r.loan_amount > 100000)
WITH path, nodes(path) AS members
WHERE size(members) >= 3
RETURN
[m IN members | m.name] AS gang_members,
length(path) AS chain_length,
reduce(total = 0, r IN relationships(path) | total + r.loan_amount) AS total_amount;
-- 检测共用设备的借款群体
MATCH (p1:Person)-[:USES_DEVICE]->(d:Device)<-[:USES_DEVICE]-(p2:Person)
WHERE p1 <> p2
AND EXISTS((p1)-[:APPLIES_LOAN]->())
AND EXISTS((p2)-[:APPLIES_LOAN]->())
WITH d, collect(DISTINCT p1) + collect(DISTINCT p2) AS users
WHERE size(users) >= 3
RETURN
d.device_fingerprint AS device,
size(users) AS user_count,
[u IN users | u.name] AS user_names;
模式二:壳公司贷款(Shell Company Fraud)
注册大量空壳公司,利用公司信用套取贷款。
检测特征:
- 同一法人/股东注册多家公司
- 公司注册时间集中
- 公司间交叉持股
- 贷款申请时间集中
张三(法人)
│
├──→ 公司A(注册资本100万)──→ 贷款 500万
├──→ 公司B(注册资本100万)──→ 贷款 300万
├──→ 公司C(注册资本100万)──→ 贷款 200万
└──→ 公司D(注册资本100万)──→ 贷款 400万
-- 检测同一法人控制的多家公司贷款
MATCH (p:Person)-[:LEGAL_REP_OF]->(c:Company)-[:APPLIES_LOAN]->(l:Loan)
WITH p, collect(c) AS companies, collect(l) AS loans
WHERE size(companies) >= 3
RETURN
p.name AS legal_rep,
p.id_number AS id,
size(companies) AS company_count,
[c IN companies | c.name] AS company_names,
reduce(total = 0, l IN loans | total + l.amount) AS total_loan_amount,
[l IN loans | l.status] AS loan_statuses;
-- 检测交叉持股网络
MATCH (c1:Company)-[:SHAREHOLDER_OF]->(c2:Company)-[:SHAREHOLDER_OF]->(c3:Company)
WHERE c1 <> c3
AND (EXISTS((c3)-[:SHAREHOLDER_OF]->(c1)) OR c1 = c3)
RETURN c1.name, c2.name, c3.name;
模式三:身份冒用(Identity Theft)
使用他人身份信息申请贷款。
检测特征:
- 同一身份在不同设备/IP/地理位置同时出现
- 联系方式与身份不匹配
- 短时间内申请行为异常
身份证 X ───→ 设备A(北京) ───→ 贷款申请 09:00
───→ 设备B(上海) ───→ 贷款申请 09:05
───→ 设备C(广州) ───→ 贷款申请 09:10
-- 检测同一身份短时间内多地申请
MATCH (p:Person)-[:APPLIES_LOAN]->(l:Loan)
MATCH (p)-[:USES_DEVICE]->(d:Device)
WHERE l.apply_date = date() -- 今日申请
WITH p, collect(DISTINCT d.location) AS locations,
collect(l) AS loans,
collect(DISTINCT d.device_fingerprint) AS devices
WHERE size(locations) >= 2 AND size(devices) >= 2
RETURN
p.name,
p.id_number,
locations,
size(loans) AS loan_count,
size(devices) AS device_count;
模式四:资金回流(Fund Circulation)
贷款资金通过多个账户转账后回流到关联人账户。
检测特征:
- 贷款放款后短时间内大额转账
- 资金经过多个中间账户
- 最终回流到与借款人有关联的账户
借款人A ──放款──→ 账户A ──转账──→ 账户B ──转账──→ 账户C
│
转账回流
│
▼
账户D(A的关联人)
-- 检测资金回流路径(3-5 跳)
MATCH (borrower:Person)-[:APPLIES_LOAN]->(loan:Loan)
MATCH (borrower)-[:OWNS_ACCOUNT]->(start_account:Account)
MATCH path = (start_account)-[:TRANSFERS_TO*3..5]->(end_account:Account)
MATCH (related:Person)-[:OWNS_ACCOUNT]->(end_account)
WHERE loan.status = "active"
AND EXISTS((borrower)-[:CONTACTS|GUARANTEES|LIVES_AT*1..2]-(related))
AND borrower <> related
WITH borrower, related, path, loan,
[r IN relationships(path) | r.amount] AS amounts,
[r IN relationships(path) | r.transfer_date] AS dates
WHERE ALL(a IN amounts WHERE a > 10000)
AND ALL(d IN dates WHERE d >= loan.approval_date)
RETURN
borrower.name AS borrower,
related.name AS beneficiary,
loan.amount AS loan_amount,
length(path) AS hop_count,
amounts AS transfer_amounts,
dates AS transfer_dates;
实时风控引擎
架构设计
┌───────────────────────────────────────────────────────────┐
│ 实时风控引擎架构 │
├───────────────────────────────────────────────────────────┤
│ │
│ ┌───────────┐ ┌──────────┐ ┌──────────────┐ │
│ │ 贷款申请 │ │ Kafka │ │ 流处理引擎 │ │
│ │ 事件流 │──→ │ 消息队列 │──→ │ (Flink/Spark)│ │
│ └───────────┘ └──────────┘ └──────┬───────┘ │
│ │ │
│ ┌────────────────┼──────────┐ │
│ ▼ ▼ ▼ │
│ ┌───────────┐ ┌──────────┐ ┌────────┐ │
│ │ 特征计算 │ │ 图谱查询 │ │ 规则 │ │
│ │ (统计特征) │ │ (关系特征)│ │ 引擎 │ │
│ └─────┬─────┘ └────┬─────┘ └───┬────┘ │
│ │ │ │ │
│ ┌─────▼─────────────▼───────────▼─────┐ │
│ │ 风险评分融合层 │ │
│ │ │ │
│ │ 统计风险 + 图谱风险 + 规则风险 │ │
│ │ ──→ 综合风险评分 │ │
│ └──────────────┬───────────────────────┘ │
│ │ │
│ ┌──────▼──────┐ │
│ │ 决策引擎 │ │
│ │ │ │
│ │ 通过/拒绝 │ │
│ │ /人工审核 │ │
│ └─────────────┘ │
└───────────────────────────────────────────────────────────┘
图谱特征计算
class GraphFeatureExtractor:
"""从知识图谱中提取风控特征"""
def __init__(self, graph_session):
self.session = graph_session
def extract_features(self, person_id: str) -> dict:
"""提取个人的图谱风控特征"""
features = {}
# 1. 关联风险度(与黑名单的距离)
features["blacklist_distance"] = self._blacklist_distance(person_id)
# 2. 担保网络特征
features.update(self._guarantee_features(person_id))
# 3. 设备共用特征
features.update(self._device_sharing_features(person_id))
# 4. 资金流向特征
features.update(self._fund_flow_features(person_id))
# 5. 社区密度特征
features.update(self._community_features(person_id))
return features
def _blacklist_distance(self, person_id: str) -> int:
"""到最近黑名单人员的图距离(-1 表示不可达)"""
result = self.session.run(
"""
MATCH path = shortestPath(
(p:Person {id_number: $pid})-[*..5]-(b:Person {risk_level: "blacklisted"})
)
RETURN length(path) AS distance
ORDER BY distance ASC
LIMIT 1
""",
pid=person_id
).single()
return result["distance"] if result else -1
def _guarantee_features(self, person_id: str) -> dict:
"""担保网络特征"""
result = self.session.run(
"""
MATCH (p:Person {id_number: $pid})
OPTIONAL MATCH (p)-[:GUARANTEES*1..3]-(g:Person)
WITH p, collect(DISTINCT g) AS guarantee_network
OPTIONAL MATCH (p)-[:GUARANTEES]->(:Person)-[:GUARANTEES]->(p)
RETURN
size(guarantee_network) AS network_size,
count(*) > 0 AS has_circular_guarantee
""",
pid=person_id
).single()
return {
"guarantee_network_size": result["network_size"],
"has_circular_guarantee": result["has_circular_guarantee"]
}
def _device_sharing_features(self, person_id: str) -> dict:
"""设备共用特征"""
result = self.session.run(
"""
MATCH (p:Person {id_number: $pid})-[:USES_DEVICE]->(d:Device)
<-[:USES_DEVICE]-(other:Person)
WHERE p <> other
WITH p, collect(DISTINCT other) AS device_sharers,
collect(DISTINCT d) AS shared_devices
RETURN
size(device_sharers) AS shared_device_users,
size(shared_devices) AS shared_device_count
""",
pid=person_id
).single()
return {
"shared_device_users": result["shared_device_users"] or 0,
"shared_device_count": result["shared_device_count"] or 0
}
def _fund_flow_features(self, person_id: str) -> dict:
"""资金流向特征"""
result = self.session.run(
"""
MATCH (p:Person {id_number: $pid})-[:OWNS_ACCOUNT]->(a:Account)
OPTIONAL MATCH (a)-[t:TRANSFERS_TO]->(target:Account)
WHERE t.transfer_date >= date() - duration('P30D')
WITH p, count(t) AS transfer_count,
CASE WHEN count(t) > 0
THEN sum(t.amount) ELSE 0 END AS total_amount,
collect(DISTINCT target) AS unique_targets
RETURN
transfer_count AS transfers_30d,
total_amount AS transfer_amount_30d,
size(unique_targets) AS unique_transfer_targets
""",
pid=person_id
).single()
return {
"transfers_30d": result["transfers_30d"],
"transfer_amount_30d": result["transfer_amount_30d"],
"unique_transfer_targets": result["unique_transfer_targets"]
}
def _community_features(self, person_id: str) -> dict:
"""社区密度特征"""
result = self.session.run(
"""
MATCH (p:Person {id_number: $pid})-[*1..2]-(neighbor)
WHERE neighbor:Person
WITH p, collect(DISTINCT neighbor) AS neighbors
UNWIND neighbors AS n1
UNWIND neighbors AS n2
WHERE id(n1) < id(n2)
OPTIONAL MATCH (n1)-[]-(n2)
WITH p, size(neighbors) AS n_count,
count(*) AS potential_edges,
sum(CASE WHEN n1 IS NOT NULL AND n2 IS NOT NULL THEN 1 ELSE 0 END) AS actual_edges
RETURN
n_count AS neighbor_count,
CASE WHEN potential_edges > 0
THEN toFloat(actual_edges) / potential_edges
ELSE 0 END AS clustering_coefficient
""",
pid=person_id
).single()
return {
"neighbor_count": result["neighbor_count"],
"clustering_coefficient": round(result["clustering_coefficient"], 4)
}
风险评分融合
import numpy as np
class RiskScorer:
"""风险评分融合器"""
# 特征权重(基于业务经验和模型训练)
FEATURE_WEIGHTS = {
"blacklist_distance": -0.3, # 距离越近风险越高
"guarantee_network_size": 0.15,
"has_circular_guarantee": 0.25, # 互保环是强风险信号
"shared_device_users": 0.2,
"transfers_30d": 0.05,
"transfer_amount_30d": 0.1,
"clustering_coefficient": 0.15, # 密集社区 = 团伙嫌疑
}
def score(self, features: dict) -> dict:
"""计算综合风险评分"""
# 特征标准化
normalized = self._normalize(features)
# 加权求和
weighted_score = 0
for feat, weight in self.FEATURE_WEIGHTS.items():
if feat in normalized:
weighted_score += normalized[feat] * weight
# 映射到 0-100 分
risk_score = min(100, max(0, int(50 + weighted_score * 50)))
# 风险等级
if risk_score >= 80:
risk_level = "high"
decision = "reject"
elif risk_score >= 60:
risk_level = "medium"
decision = "manual_review"
else:
risk_level = "low"
decision = "approve"
return {
"risk_score": risk_score,
"risk_level": risk_level,
"decision": decision,
"features": features,
"top_risk_factors": self._top_factors(normalized)
}
def _normalize(self, features: dict) -> dict:
"""特征标准化到 [-1, 1]"""
normalized = {}
for key, value in features.items():
if isinstance(value, bool):
normalized[key] = 1.0 if value else 0.0
elif isinstance(value, (int, float)):
# 简化处理,实际应用基于历史数据的分位数
normalized[key] = min(1.0, value / 10.0) if value >= 0 else max(-1.0, value / 5.0)
return normalized
def _top_factors(self, normalized: dict, top_n: int = 3) -> list:
"""返回风险贡献最大的因素"""
contributions = []
for feat, weight in self.FEATURE_WEIGHTS.items():
if feat in normalized:
contributions.append({
"feature": feat,
"contribution": abs(normalized[feat] * weight)
})
contributions.sort(key=lambda x: x["contribution"], reverse=True)
return contributions[:top_n]
图谱更新与维护
实时数据接入
from kafka import KafkaConsumer
import json
class GraphUpdater:
"""图谱实时更新器"""
def __init__(self, graph_session, kafka_config: dict):
self.session = graph_session
self.consumer = KafkaConsumer(
"loan_events",
"transaction_events",
"device_events",
bootstrap_servers=kafka_config["brokers"],
value_deserializer=lambda m: json.loads(m.decode("utf-8")),
group_id="graph_updater"
)
def process_events(self):
"""持续消费事件并更新图谱"""
for message in self.consumer:
event = message.value
event_type = event.get("type")
if event_type == "loan_application":
self._handle_loan_application(event)
elif event_type == "transaction":
self._handle_transaction(event)
elif event_type == "device_login":
self._handle_device_login(event)
def _handle_loan_application(self, event: dict):
"""处理贷款申请事件"""
self.session.run(
"""
MERGE (p:Person {id_number: $person_id})
SET p.name = $name, p.updated_at = datetime()
MERGE (l:Loan {loan_id: $loan_id})
SET l.amount = $amount, l.status = "pending",
l.apply_date = date($apply_date)
MERGE (p)-[:APPLIES_LOAN {role: "borrower"}]->(l)
""",
person_id=event["person_id"],
name=event["person_name"],
loan_id=event["loan_id"],
amount=event["amount"],
apply_date=event["apply_date"]
)
def _handle_transaction(self, event: dict):
"""处理交易事件"""
self.session.run(
"""
MATCH (from:Account {account_number: $from_account})
MATCH (to:Account {account_number: $to_account})
CREATE (from)-[:TRANSFERS_TO {
amount: $amount,
transfer_date: date($date),
channel: $channel
}]->(to)
""",
from_account=event["from_account"],
to_account=event["to_account"],
amount=event["amount"],
date=event["date"],
channel=event.get("channel", "online")
)
合规与隐私
数据脱敏策略
| 数据类型 | 脱敏方法 | 示例 |
|---|---|---|
| 身份证号 | 中间6位掩码 | 510101******0011 |
| 手机号 | 中间4位掩码 | 138****8000 |
| 银行卡号 | 保留前6后4 | 622848********1234 |
| 姓名 | 保留姓氏 | 张** |
| 地址 | 精度降低到区/县 | 四川省成都市锦江区 |
审计日志
-- 创建审计日志节点
CREATE (log:AuditLog {
query_id: $query_id,
operator: $operator,
query_type: "risk_assessment",
target_person: $person_id,
timestamp: datetime(),
result: $result,
ip_address: $ip
});
-- 关联操作人
MATCH (op:Operator {id: $operator})
MATCH (log:AuditLog {query_id: $query_id})
CREATE (op)-[:PERFORMED]->(log);
关键性能指标
| 指标 | 目标值 | 实际基准 |
|---|---|---|
| 单笔风控查询延迟 | < 200ms | P99 ~150ms |
| 团伙检测查询延迟 | < 2s | P99 ~1.5s |
| 资金回流检测深度 | 5 跳 | 3-5 跳 |
| 图谱更新延迟 | < 5s | 均值 ~2s |
| 欺诈检出率 | > 85% | ~88% |
| 误报率 | < 5% | ~3.2% |
总结
金融风控图谱的核心工程要点:
- 本体设计决定上限:实体类型和关系类型要覆盖所有欺诈模式的检测需求
- 图查询即风控规则:Cypher 查询本身就是可解释的风控规则,便于合规审计
- 实时性是生命线:从事件发生到图谱更新到风控决策,全链路延迟必须控制在秒级
- 特征融合:图谱特征(关系距离、社区密度)+ 统计特征 + 规则,三者融合效果最佳
- 合规先行:数据脱敏、访问控制、审计日志是金融监管的硬性要求
Maurice | maurice_wen@proton.me