企业级 Agent 平台的权限与审计设计

概述

当 AI Agent 从个人工具走向企业平台,权限控制和审计追踪成为刚需。一个能执行代码、调用 API、访问数据库的 Agent,如果缺少权限约束,就是一个高效的安全隐患。

本文从 RBAC 模型、工具权限、数据访问控制、操作审计和合规要求五个维度,设计企业级 Agent 平台的安全架构。

威胁模型

企业 Agent 平台威胁面
    |
    ├── 用户层
    │   ├── 越权操作:普通用户通过 Agent 执行管理员操作
    │   ├── 数据窃取:通过 Agent 访问他人或敏感数据
    │   └── 社工攻击:诱骗 Agent 执行恶意操作
    |
    ├── Agent 层
    │   ├── 提示词注入:覆盖系统指令绕过限制
    │   ├── 工具滥用:过度调用或误用工具
    │   └── 上下文泄露:跨会话/跨用户信息泄露
    |
    └── 系统层
        ├── 代码执行逃逸:从沙箱逃逸到宿主机
        ├── 供应链攻击:恶意模型/插件
        └── 日志篡改:删除审计记录

RBAC 权限模型

角色定义

from enum import Enum
from dataclasses import dataclass, field

class Permission(Enum):
    # Agent 操作权限
    AGENT_CREATE = "agent:create"
    AGENT_EXECUTE = "agent:execute"
    AGENT_DELETE = "agent:delete"
    AGENT_SHARE = "agent:share"

    # 工具权限
    TOOL_READ = "tool:read"
    TOOL_WRITE = "tool:write"
    TOOL_EXECUTE = "tool:execute"
    TOOL_ADMIN = "tool:admin"

    # 数据权限
    DATA_READ_OWN = "data:read:own"
    DATA_READ_TEAM = "data:read:team"
    DATA_READ_ALL = "data:read:all"
    DATA_WRITE_OWN = "data:write:own"
    DATA_DELETE = "data:delete"

    # 代码执行权限
    CODE_EXECUTE_SANDBOX = "code:execute:sandbox"
    CODE_EXECUTE_LOCAL = "code:execute:local"

    # 管理权限
    ADMIN_USERS = "admin:users"
    ADMIN_BILLING = "admin:billing"
    ADMIN_AUDIT = "admin:audit"

@dataclass
class Role:
    name: str
    permissions: set[Permission]
    description: str

# 预定义角色
ROLES = {
    "viewer": Role(
        name="viewer",
        permissions={
            Permission.AGENT_EXECUTE,
            Permission.TOOL_READ,
            Permission.DATA_READ_OWN,
        },
        description="只能运行 Agent 和查看自己的数据",
    ),
    "developer": Role(
        name="developer",
        permissions={
            Permission.AGENT_CREATE,
            Permission.AGENT_EXECUTE,
            Permission.AGENT_SHARE,
            Permission.TOOL_READ,
            Permission.TOOL_WRITE,
            Permission.TOOL_EXECUTE,
            Permission.DATA_READ_OWN,
            Permission.DATA_READ_TEAM,
            Permission.DATA_WRITE_OWN,
            Permission.CODE_EXECUTE_SANDBOX,
        },
        description="可以创建和分享 Agent,在沙箱中执行代码",
    ),
    "admin": Role(
        name="admin",
        permissions=set(Permission),  # 所有权限
        description="平台管理员,拥有所有权限",
    ),
}

权限检查器

class PermissionChecker:
    """统一权限检查入口"""

    def __init__(self, user_store, role_store):
        self.user_store = user_store
        self.role_store = role_store

    async def check(self, user_id: str, permission: Permission,
                    resource: dict = None) -> tuple[bool, str]:
        """检查用户是否有指定权限"""
        user = await self.user_store.get(user_id)
        if not user:
            return False, "User not found"

        if user.get("suspended"):
            return False, "User account suspended"

        role = ROLES.get(user["role"])
        if not role:
            return False, f"Unknown role: {user['role']}"

        # 基础权限检查
        if permission not in role.permissions:
            return False, f"Role '{role.name}' lacks permission '{permission.value}'"

        # 资源级别权限检查(ABAC)
        if resource:
            allowed = self._check_resource_access(user, permission, resource)
            if not allowed:
                return False, "Resource access denied"

        return True, "ok"

    def _check_resource_access(self, user, permission, resource) -> bool:
        """基于属性的访问控制(ABAC)"""
        # 规则一:只能访问自己的数据
        if permission in {Permission.DATA_READ_OWN, Permission.DATA_WRITE_OWN}:
            return resource.get("owner_id") == user["id"]

        # 规则二:团队数据需要同一团队
        if permission == Permission.DATA_READ_TEAM:
            return resource.get("team_id") in user.get("teams", [])

        # 规则三:Agent 分享需要是创建者
        if permission == Permission.AGENT_SHARE:
            return resource.get("creator_id") == user["id"]

        return True

工具权限控制

工具分级

from enum import IntEnum

class ToolRiskLevel(IntEnum):
    SAFE = 0        # 无副作用:查询、计算、搜索
    LOW = 1         # 低风险:读取文件、查看日志
    MEDIUM = 2      # 中风险:写入文件、发送通知
    HIGH = 3        # 高风险:执行代码、修改数据库
    CRITICAL = 4    # 极高风险:删除数据、管理权限、发送邮件

@dataclass
class ToolPermissionPolicy:
    tool_name: str
    risk_level: ToolRiskLevel
    required_role: str                  # 最低角色要求
    requires_confirmation: bool = False # 是否需要人工确认
    max_calls_per_hour: int = 100
    allowed_args_patterns: dict = None  # 参数白名单
    blocked_args_patterns: dict = None  # 参数黑名单
    audit_level: str = "standard"       # minimal / standard / detailed

TOOL_POLICIES = {
    "search_web": ToolPermissionPolicy(
        tool_name="search_web",
        risk_level=ToolRiskLevel.SAFE,
        required_role="viewer",
    ),
    "read_file": ToolPermissionPolicy(
        tool_name="read_file",
        risk_level=ToolRiskLevel.LOW,
        required_role="developer",
        allowed_args_patterns={"path": r"^/workspace/.*"},  # 限制读取路径
    ),
    "execute_sql": ToolPermissionPolicy(
        tool_name="execute_sql",
        risk_level=ToolRiskLevel.HIGH,
        required_role="developer",
        requires_confirmation=True,
        max_calls_per_hour=20,
        blocked_args_patterns={
            "query": r"(?i)(DROP|DELETE|TRUNCATE|ALTER)\s",  # 禁止破坏性 SQL
        },
        audit_level="detailed",
    ),
    "send_email": ToolPermissionPolicy(
        tool_name="send_email",
        risk_level=ToolRiskLevel.CRITICAL,
        required_role="developer",
        requires_confirmation=True,
        max_calls_per_hour=10,
        audit_level="detailed",
    ),
    "execute_code": ToolPermissionPolicy(
        tool_name="execute_code",
        risk_level=ToolRiskLevel.HIGH,
        required_role="developer",
        requires_confirmation=False,  # 沙箱执行不需确认
        audit_level="detailed",
    ),
}

class ToolPermissionGuard:
    """工具权限守卫"""

    def __init__(self, permission_checker: PermissionChecker):
        self.checker = permission_checker
        self.call_counts: dict[str, dict[str, int]] = {}

    async def authorize(self, user_id: str, tool_name: str,
                        args: dict) -> tuple[bool, str, bool]:
        """
        返回: (allowed, reason, needs_confirmation)
        """
        policy = TOOL_POLICIES.get(tool_name)
        if not policy:
            return False, f"Tool '{tool_name}' not registered", False

        # 1. 角色检查
        user = await self.checker.user_store.get(user_id)
        role_order = ["viewer", "developer", "admin"]
        user_role_idx = role_order.index(user.get("role", "viewer"))
        required_idx = role_order.index(policy.required_role)

        if user_role_idx < required_idx:
            return False, (
                f"Tool '{tool_name}' requires role '{policy.required_role}', "
                f"user has '{user['role']}'"
            ), False

        # 2. 频率检查
        key = f"{user_id}:{tool_name}"
        count = self.call_counts.get(key, {}).get("count", 0)
        if count >= policy.max_calls_per_hour:
            return False, f"Rate limit exceeded for '{tool_name}'", False

        # 3. 参数白名单检查
        if policy.allowed_args_patterns:
            for arg_name, pattern in policy.allowed_args_patterns.items():
                if arg_name in args:
                    import re
                    if not re.match(pattern, str(args[arg_name])):
                        return False, (
                            f"Argument '{arg_name}' does not match allowed pattern"
                        ), False

        # 4. 参数黑名单检查
        if policy.blocked_args_patterns:
            for arg_name, pattern in policy.blocked_args_patterns.items():
                if arg_name in args:
                    import re
                    if re.search(pattern, str(args[arg_name])):
                        return False, (
                            f"Argument '{arg_name}' matches blocked pattern"
                        ), False

        # 5. 记录调用
        if key not in self.call_counts:
            self.call_counts[key] = {"count": 0, "reset_time": time.time() + 3600}
        self.call_counts[key]["count"] += 1

        return True, "ok", policy.requires_confirmation

沙箱执行

class SandboxConfig:
    """沙箱配置"""

    TIERS = {
        "minimal": {
            "cpu_limit": "0.5",
            "memory_limit": "256m",
            "timeout": 30,
            "network": False,
            "filesystem": "readonly",
        },
        "standard": {
            "cpu_limit": "1",
            "memory_limit": "512m",
            "timeout": 60,
            "network": False,
            "filesystem": "tmpfs",
        },
        "full": {
            "cpu_limit": "2",
            "memory_limit": "2g",
            "timeout": 300,
            "network": True,  # 需要 admin 批准
            "filesystem": "tmpfs",
        },
    }

class DockerSandbox:
    """Docker 容器沙箱"""

    async def execute(self, code: str, tier: str = "standard",
                      user_id: str = None) -> dict:
        config = SandboxConfig.TIERS[tier]

        # 网络访问需要额外授权
        if config["network"]:
            if not await self._check_network_permission(user_id):
                return {"error": "Network access requires admin approval"}

        import docker
        client = docker.from_env()

        try:
            container = client.containers.run(
                "python:3.11-slim",
                command=["python", "-c", code],
                mem_limit=config["memory_limit"],
                cpu_period=100000,
                cpu_quota=int(float(config["cpu_limit"]) * 100000),
                network_disabled=not config["network"],
                read_only=config["filesystem"] == "readonly",
                tmpfs={"/tmp": "size=100m"} if config["filesystem"] == "tmpfs" else {},
                remove=True,
                detach=False,
                stdout=True,
                stderr=True,
                timeout=config["timeout"],
            )

            return {
                "stdout": container.decode("utf-8"),
                "exit_code": 0,
            }

        except docker.errors.ContainerError as e:
            return {
                "stderr": e.stderr.decode("utf-8") if e.stderr else str(e),
                "exit_code": e.exit_status,
            }

审计日志系统

审计事件模型

from datetime import datetime
from enum import Enum

class AuditEventType(Enum):
    # 用户事件
    USER_LOGIN = "user.login"
    USER_LOGOUT = "user.logout"

    # Agent 事件
    AGENT_CREATED = "agent.created"
    AGENT_EXECUTED = "agent.executed"
    AGENT_SHARED = "agent.shared"

    # 工具事件
    TOOL_CALLED = "tool.called"
    TOOL_BLOCKED = "tool.blocked"
    TOOL_CONFIRMED = "tool.confirmed"

    # 数据事件
    DATA_ACCESSED = "data.accessed"
    DATA_MODIFIED = "data.modified"
    DATA_EXPORTED = "data.exported"

    # 安全事件
    PERMISSION_DENIED = "security.permission_denied"
    INJECTION_DETECTED = "security.injection_detected"
    RATE_LIMIT_HIT = "security.rate_limit"
    ANOMALY_DETECTED = "security.anomaly"

@dataclass
class AuditEvent:
    event_id: str
    event_type: AuditEventType
    timestamp: datetime
    user_id: str
    session_id: str
    agent_id: str = ""
    tool_name: str = ""
    action: str = ""
    resource: str = ""
    result: str = ""  # success / denied / error
    details: dict = field(default_factory=dict)
    ip_address: str = ""
    user_agent: str = ""
    risk_level: int = 0

class AuditLogger:
    """不可篡改的审计日志"""

    def __init__(self, storage_backend):
        self.storage = storage_backend

    async def log(self, event: AuditEvent):
        """写入审计日志(仅追加,不可修改)"""
        record = {
            "event_id": event.event_id,
            "event_type": event.event_type.value,
            "timestamp": event.timestamp.isoformat(),
            "user_id": event.user_id,
            "session_id": event.session_id,
            "agent_id": event.agent_id,
            "tool_name": event.tool_name,
            "action": event.action,
            "resource": event.resource,
            "result": event.result,
            "details": event.details,
            "ip_address": event.ip_address,
            "risk_level": event.risk_level,
            # 完整性校验
            "checksum": self._compute_checksum(event),
        }

        await self.storage.append(record)

        # 高风险事件实时告警
        if event.risk_level >= 3:
            await self._alert(event)

    def _compute_checksum(self, event: AuditEvent) -> str:
        import hashlib
        data = f"{event.event_id}:{event.timestamp}:{event.user_id}:{event.action}"
        return hashlib.sha256(data.encode()).hexdigest()[:16]

    async def _alert(self, event: AuditEvent):
        """高风险事件告警"""
        alert_msg = (
            f"[SECURITY ALERT] {event.event_type.value}\n"
            f"User: {event.user_id}\n"
            f"Action: {event.action}\n"
            f"Resource: {event.resource}\n"
            f"Result: {event.result}\n"
            f"Risk Level: {event.risk_level}"
        )
        # 发送到 Slack / PagerDuty / 邮件
        await notify_security_team(alert_msg)

审计查询与分析

class AuditAnalyzer:
    """审计日志分析"""

    async def user_activity_report(self, user_id: str,
                                    start: datetime, end: datetime) -> dict:
        events = await self.storage.query(
            user_id=user_id,
            start=start,
            end=end,
        )

        return {
            "user_id": user_id,
            "period": f"{start.date()} to {end.date()}",
            "total_events": len(events),
            "by_type": self._group_by_type(events),
            "tools_used": self._tools_summary(events),
            "security_events": [
                e for e in events
                if e["event_type"].startswith("security.")
            ],
            "data_access": [
                e for e in events
                if e["event_type"].startswith("data.")
            ],
        }

    async def detect_anomalies(self, user_id: str) -> list[dict]:
        """检测异常行为模式"""
        anomalies = []
        recent = await self.storage.query(
            user_id=user_id,
            hours=24,
        )

        # 模式一:短时间内大量权限被拒绝
        denied = [e for e in recent if e["result"] == "denied"]
        if len(denied) > 10:
            anomalies.append({
                "type": "excessive_permission_denied",
                "count": len(denied),
                "severity": "high",
            })

        # 模式二:异常时间操作
        for event in recent:
            hour = datetime.fromisoformat(event["timestamp"]).hour
            if hour < 6 or hour > 22:
                anomalies.append({
                    "type": "off_hours_activity",
                    "event": event["event_type"],
                    "time": event["timestamp"],
                    "severity": "medium",
                })

        # 模式三:大量数据导出
        exports = [e for e in recent if e["event_type"] == "data.exported"]
        if len(exports) > 5:
            anomalies.append({
                "type": "bulk_data_export",
                "count": len(exports),
                "severity": "critical",
            })

        return anomalies

合规要求映射

合规框架 要求 Agent 平台实现
GDPR 数据最小化 Agent 仅访问必要的用户数据
GDPR 被遗忘权 支持删除用户相关的所有 Agent 数据
SOC 2 访问控制 RBAC + 最小权限原则
SOC 2 审计追踪 不可篡改的审计日志
SOC 2 变更管理 Agent 配置变更记录
ISO 27001 信息分类 数据按敏感级别标记和隔离
PCI DSS 数据保护 Agent 禁止访问支付卡数据

总结

企业级 Agent 平台安全的核心架构:

  1. RBAC + ABAC:角色控制基础权限,属性控制资源级别访问
  2. 工具分级:按风险级别控制工具调用权限,高风险操作需人工确认
  3. 沙箱隔离:代码执行在容器沙箱中,网络访问需额外授权
  4. 审计不可篡改:所有操作留痕,带完整性校验和实时告警
  5. 异常检测:基于行为模式的异常检测,防止内部威胁

Maurice | maurice_wen@proton.me