企业级 Agent 平台的权限与审计设计
原创
灵阙教研团队
S 精选 进阶 |
约 9 分钟阅读
更新于 2026-02-28 AI 导读
企业级 Agent 平台的权限与审计设计 概述 当 AI Agent 从个人工具走向企业平台,权限控制和审计追踪成为刚需。一个能执行代码、调用 API、访问数据库的 Agent,如果缺少权限约束,就是一个高效的安全隐患。 本文从 RBAC 模型、工具权限、数据访问控制、操作审计和合规要求五个维度,设计企业级 Agent 平台的安全架构。 威胁模型 企业 Agent 平台威胁面 | ├── 用户层...
企业级 Agent 平台的权限与审计设计
概述
当 AI Agent 从个人工具走向企业平台,权限控制和审计追踪成为刚需。一个能执行代码、调用 API、访问数据库的 Agent,如果缺少权限约束,就是一个高效的安全隐患。
本文从 RBAC 模型、工具权限、数据访问控制、操作审计和合规要求五个维度,设计企业级 Agent 平台的安全架构。
威胁模型
企业 Agent 平台威胁面
|
├── 用户层
│ ├── 越权操作:普通用户通过 Agent 执行管理员操作
│ ├── 数据窃取:通过 Agent 访问他人或敏感数据
│ └── 社工攻击:诱骗 Agent 执行恶意操作
|
├── Agent 层
│ ├── 提示词注入:覆盖系统指令绕过限制
│ ├── 工具滥用:过度调用或误用工具
│ └── 上下文泄露:跨会话/跨用户信息泄露
|
└── 系统层
├── 代码执行逃逸:从沙箱逃逸到宿主机
├── 供应链攻击:恶意模型/插件
└── 日志篡改:删除审计记录
RBAC 权限模型
角色定义
from enum import Enum
from dataclasses import dataclass, field
class Permission(Enum):
# Agent 操作权限
AGENT_CREATE = "agent:create"
AGENT_EXECUTE = "agent:execute"
AGENT_DELETE = "agent:delete"
AGENT_SHARE = "agent:share"
# 工具权限
TOOL_READ = "tool:read"
TOOL_WRITE = "tool:write"
TOOL_EXECUTE = "tool:execute"
TOOL_ADMIN = "tool:admin"
# 数据权限
DATA_READ_OWN = "data:read:own"
DATA_READ_TEAM = "data:read:team"
DATA_READ_ALL = "data:read:all"
DATA_WRITE_OWN = "data:write:own"
DATA_DELETE = "data:delete"
# 代码执行权限
CODE_EXECUTE_SANDBOX = "code:execute:sandbox"
CODE_EXECUTE_LOCAL = "code:execute:local"
# 管理权限
ADMIN_USERS = "admin:users"
ADMIN_BILLING = "admin:billing"
ADMIN_AUDIT = "admin:audit"
@dataclass
class Role:
name: str
permissions: set[Permission]
description: str
# 预定义角色
ROLES = {
"viewer": Role(
name="viewer",
permissions={
Permission.AGENT_EXECUTE,
Permission.TOOL_READ,
Permission.DATA_READ_OWN,
},
description="只能运行 Agent 和查看自己的数据",
),
"developer": Role(
name="developer",
permissions={
Permission.AGENT_CREATE,
Permission.AGENT_EXECUTE,
Permission.AGENT_SHARE,
Permission.TOOL_READ,
Permission.TOOL_WRITE,
Permission.TOOL_EXECUTE,
Permission.DATA_READ_OWN,
Permission.DATA_READ_TEAM,
Permission.DATA_WRITE_OWN,
Permission.CODE_EXECUTE_SANDBOX,
},
description="可以创建和分享 Agent,在沙箱中执行代码",
),
"admin": Role(
name="admin",
permissions=set(Permission), # 所有权限
description="平台管理员,拥有所有权限",
),
}
权限检查器
class PermissionChecker:
"""统一权限检查入口"""
def __init__(self, user_store, role_store):
self.user_store = user_store
self.role_store = role_store
async def check(self, user_id: str, permission: Permission,
resource: dict = None) -> tuple[bool, str]:
"""检查用户是否有指定权限"""
user = await self.user_store.get(user_id)
if not user:
return False, "User not found"
if user.get("suspended"):
return False, "User account suspended"
role = ROLES.get(user["role"])
if not role:
return False, f"Unknown role: {user['role']}"
# 基础权限检查
if permission not in role.permissions:
return False, f"Role '{role.name}' lacks permission '{permission.value}'"
# 资源级别权限检查(ABAC)
if resource:
allowed = self._check_resource_access(user, permission, resource)
if not allowed:
return False, "Resource access denied"
return True, "ok"
def _check_resource_access(self, user, permission, resource) -> bool:
"""基于属性的访问控制(ABAC)"""
# 规则一:只能访问自己的数据
if permission in {Permission.DATA_READ_OWN, Permission.DATA_WRITE_OWN}:
return resource.get("owner_id") == user["id"]
# 规则二:团队数据需要同一团队
if permission == Permission.DATA_READ_TEAM:
return resource.get("team_id") in user.get("teams", [])
# 规则三:Agent 分享需要是创建者
if permission == Permission.AGENT_SHARE:
return resource.get("creator_id") == user["id"]
return True
工具权限控制
工具分级
from enum import IntEnum
class ToolRiskLevel(IntEnum):
SAFE = 0 # 无副作用:查询、计算、搜索
LOW = 1 # 低风险:读取文件、查看日志
MEDIUM = 2 # 中风险:写入文件、发送通知
HIGH = 3 # 高风险:执行代码、修改数据库
CRITICAL = 4 # 极高风险:删除数据、管理权限、发送邮件
@dataclass
class ToolPermissionPolicy:
tool_name: str
risk_level: ToolRiskLevel
required_role: str # 最低角色要求
requires_confirmation: bool = False # 是否需要人工确认
max_calls_per_hour: int = 100
allowed_args_patterns: dict = None # 参数白名单
blocked_args_patterns: dict = None # 参数黑名单
audit_level: str = "standard" # minimal / standard / detailed
TOOL_POLICIES = {
"search_web": ToolPermissionPolicy(
tool_name="search_web",
risk_level=ToolRiskLevel.SAFE,
required_role="viewer",
),
"read_file": ToolPermissionPolicy(
tool_name="read_file",
risk_level=ToolRiskLevel.LOW,
required_role="developer",
allowed_args_patterns={"path": r"^/workspace/.*"}, # 限制读取路径
),
"execute_sql": ToolPermissionPolicy(
tool_name="execute_sql",
risk_level=ToolRiskLevel.HIGH,
required_role="developer",
requires_confirmation=True,
max_calls_per_hour=20,
blocked_args_patterns={
"query": r"(?i)(DROP|DELETE|TRUNCATE|ALTER)\s", # 禁止破坏性 SQL
},
audit_level="detailed",
),
"send_email": ToolPermissionPolicy(
tool_name="send_email",
risk_level=ToolRiskLevel.CRITICAL,
required_role="developer",
requires_confirmation=True,
max_calls_per_hour=10,
audit_level="detailed",
),
"execute_code": ToolPermissionPolicy(
tool_name="execute_code",
risk_level=ToolRiskLevel.HIGH,
required_role="developer",
requires_confirmation=False, # 沙箱执行不需确认
audit_level="detailed",
),
}
class ToolPermissionGuard:
"""工具权限守卫"""
def __init__(self, permission_checker: PermissionChecker):
self.checker = permission_checker
self.call_counts: dict[str, dict[str, int]] = {}
async def authorize(self, user_id: str, tool_name: str,
args: dict) -> tuple[bool, str, bool]:
"""
返回: (allowed, reason, needs_confirmation)
"""
policy = TOOL_POLICIES.get(tool_name)
if not policy:
return False, f"Tool '{tool_name}' not registered", False
# 1. 角色检查
user = await self.checker.user_store.get(user_id)
role_order = ["viewer", "developer", "admin"]
user_role_idx = role_order.index(user.get("role", "viewer"))
required_idx = role_order.index(policy.required_role)
if user_role_idx < required_idx:
return False, (
f"Tool '{tool_name}' requires role '{policy.required_role}', "
f"user has '{user['role']}'"
), False
# 2. 频率检查
key = f"{user_id}:{tool_name}"
count = self.call_counts.get(key, {}).get("count", 0)
if count >= policy.max_calls_per_hour:
return False, f"Rate limit exceeded for '{tool_name}'", False
# 3. 参数白名单检查
if policy.allowed_args_patterns:
for arg_name, pattern in policy.allowed_args_patterns.items():
if arg_name in args:
import re
if not re.match(pattern, str(args[arg_name])):
return False, (
f"Argument '{arg_name}' does not match allowed pattern"
), False
# 4. 参数黑名单检查
if policy.blocked_args_patterns:
for arg_name, pattern in policy.blocked_args_patterns.items():
if arg_name in args:
import re
if re.search(pattern, str(args[arg_name])):
return False, (
f"Argument '{arg_name}' matches blocked pattern"
), False
# 5. 记录调用
if key not in self.call_counts:
self.call_counts[key] = {"count": 0, "reset_time": time.time() + 3600}
self.call_counts[key]["count"] += 1
return True, "ok", policy.requires_confirmation
沙箱执行
class SandboxConfig:
"""沙箱配置"""
TIERS = {
"minimal": {
"cpu_limit": "0.5",
"memory_limit": "256m",
"timeout": 30,
"network": False,
"filesystem": "readonly",
},
"standard": {
"cpu_limit": "1",
"memory_limit": "512m",
"timeout": 60,
"network": False,
"filesystem": "tmpfs",
},
"full": {
"cpu_limit": "2",
"memory_limit": "2g",
"timeout": 300,
"network": True, # 需要 admin 批准
"filesystem": "tmpfs",
},
}
class DockerSandbox:
"""Docker 容器沙箱"""
async def execute(self, code: str, tier: str = "standard",
user_id: str = None) -> dict:
config = SandboxConfig.TIERS[tier]
# 网络访问需要额外授权
if config["network"]:
if not await self._check_network_permission(user_id):
return {"error": "Network access requires admin approval"}
import docker
client = docker.from_env()
try:
container = client.containers.run(
"python:3.11-slim",
command=["python", "-c", code],
mem_limit=config["memory_limit"],
cpu_period=100000,
cpu_quota=int(float(config["cpu_limit"]) * 100000),
network_disabled=not config["network"],
read_only=config["filesystem"] == "readonly",
tmpfs={"/tmp": "size=100m"} if config["filesystem"] == "tmpfs" else {},
remove=True,
detach=False,
stdout=True,
stderr=True,
timeout=config["timeout"],
)
return {
"stdout": container.decode("utf-8"),
"exit_code": 0,
}
except docker.errors.ContainerError as e:
return {
"stderr": e.stderr.decode("utf-8") if e.stderr else str(e),
"exit_code": e.exit_status,
}
审计日志系统
审计事件模型
from datetime import datetime
from enum import Enum
class AuditEventType(Enum):
# 用户事件
USER_LOGIN = "user.login"
USER_LOGOUT = "user.logout"
# Agent 事件
AGENT_CREATED = "agent.created"
AGENT_EXECUTED = "agent.executed"
AGENT_SHARED = "agent.shared"
# 工具事件
TOOL_CALLED = "tool.called"
TOOL_BLOCKED = "tool.blocked"
TOOL_CONFIRMED = "tool.confirmed"
# 数据事件
DATA_ACCESSED = "data.accessed"
DATA_MODIFIED = "data.modified"
DATA_EXPORTED = "data.exported"
# 安全事件
PERMISSION_DENIED = "security.permission_denied"
INJECTION_DETECTED = "security.injection_detected"
RATE_LIMIT_HIT = "security.rate_limit"
ANOMALY_DETECTED = "security.anomaly"
@dataclass
class AuditEvent:
event_id: str
event_type: AuditEventType
timestamp: datetime
user_id: str
session_id: str
agent_id: str = ""
tool_name: str = ""
action: str = ""
resource: str = ""
result: str = "" # success / denied / error
details: dict = field(default_factory=dict)
ip_address: str = ""
user_agent: str = ""
risk_level: int = 0
class AuditLogger:
"""不可篡改的审计日志"""
def __init__(self, storage_backend):
self.storage = storage_backend
async def log(self, event: AuditEvent):
"""写入审计日志(仅追加,不可修改)"""
record = {
"event_id": event.event_id,
"event_type": event.event_type.value,
"timestamp": event.timestamp.isoformat(),
"user_id": event.user_id,
"session_id": event.session_id,
"agent_id": event.agent_id,
"tool_name": event.tool_name,
"action": event.action,
"resource": event.resource,
"result": event.result,
"details": event.details,
"ip_address": event.ip_address,
"risk_level": event.risk_level,
# 完整性校验
"checksum": self._compute_checksum(event),
}
await self.storage.append(record)
# 高风险事件实时告警
if event.risk_level >= 3:
await self._alert(event)
def _compute_checksum(self, event: AuditEvent) -> str:
import hashlib
data = f"{event.event_id}:{event.timestamp}:{event.user_id}:{event.action}"
return hashlib.sha256(data.encode()).hexdigest()[:16]
async def _alert(self, event: AuditEvent):
"""高风险事件告警"""
alert_msg = (
f"[SECURITY ALERT] {event.event_type.value}\n"
f"User: {event.user_id}\n"
f"Action: {event.action}\n"
f"Resource: {event.resource}\n"
f"Result: {event.result}\n"
f"Risk Level: {event.risk_level}"
)
# 发送到 Slack / PagerDuty / 邮件
await notify_security_team(alert_msg)
审计查询与分析
class AuditAnalyzer:
"""审计日志分析"""
async def user_activity_report(self, user_id: str,
start: datetime, end: datetime) -> dict:
events = await self.storage.query(
user_id=user_id,
start=start,
end=end,
)
return {
"user_id": user_id,
"period": f"{start.date()} to {end.date()}",
"total_events": len(events),
"by_type": self._group_by_type(events),
"tools_used": self._tools_summary(events),
"security_events": [
e for e in events
if e["event_type"].startswith("security.")
],
"data_access": [
e for e in events
if e["event_type"].startswith("data.")
],
}
async def detect_anomalies(self, user_id: str) -> list[dict]:
"""检测异常行为模式"""
anomalies = []
recent = await self.storage.query(
user_id=user_id,
hours=24,
)
# 模式一:短时间内大量权限被拒绝
denied = [e for e in recent if e["result"] == "denied"]
if len(denied) > 10:
anomalies.append({
"type": "excessive_permission_denied",
"count": len(denied),
"severity": "high",
})
# 模式二:异常时间操作
for event in recent:
hour = datetime.fromisoformat(event["timestamp"]).hour
if hour < 6 or hour > 22:
anomalies.append({
"type": "off_hours_activity",
"event": event["event_type"],
"time": event["timestamp"],
"severity": "medium",
})
# 模式三:大量数据导出
exports = [e for e in recent if e["event_type"] == "data.exported"]
if len(exports) > 5:
anomalies.append({
"type": "bulk_data_export",
"count": len(exports),
"severity": "critical",
})
return anomalies
合规要求映射
| 合规框架 | 要求 | Agent 平台实现 |
|---|---|---|
| GDPR | 数据最小化 | Agent 仅访问必要的用户数据 |
| GDPR | 被遗忘权 | 支持删除用户相关的所有 Agent 数据 |
| SOC 2 | 访问控制 | RBAC + 最小权限原则 |
| SOC 2 | 审计追踪 | 不可篡改的审计日志 |
| SOC 2 | 变更管理 | Agent 配置变更记录 |
| ISO 27001 | 信息分类 | 数据按敏感级别标记和隔离 |
| PCI DSS | 数据保护 | Agent 禁止访问支付卡数据 |
总结
企业级 Agent 平台安全的核心架构:
- RBAC + ABAC:角色控制基础权限,属性控制资源级别访问
- 工具分级:按风险级别控制工具调用权限,高风险操作需人工确认
- 沙箱隔离:代码执行在容器沙箱中,网络访问需额外授权
- 审计不可篡改:所有操作留痕,带完整性校验和实时告警
- 异常检测:基于行为模式的异常检测,防止内部威胁
Maurice | maurice_wen@proton.me