灵阙智能体平台 - 插件系统实施指南

项目概览

目标: 建立统一的扩展框架,全面集成MCP工具、插件系统和Skill系统 周期: 7-8周(约2个月) 架构: 完整生产级架构,考虑版本兼容、错误处理、性能优化


已完成工作(Phase 1.1 部分)

1. 核心类型定义 ✅

文件: /lib/plugin-system/types.ts (700+行)

内容:

  • 插件元数据和版本管理
  • 权限系统(PluginPermission枚举)
  • 节点贡献接口(NodeContribution)
  • 执行器接口(NodeExecutor)
  • UI组件贡献(ComponentContribution)
  • 钩子系统(HookContribution)
  • 插件上下文API(PluginContext)
  • 插件生命周期状态(PluginStatus)

2. 插件上下文实现 ✅

文件: /lib/plugin-system/plugin-context.ts (400+行)

功能:

  • 节点注册/执行器注册/组件注册/钩子注册
  • 配置管理(get/set Config)
  • 状态管理(get/set State)
  • 日志记录器(debug/info/warn/error)
  • HTTP客户端(axios封装)
  • 权限检查机制

3. 插件管理器 ✅

文件: /lib/plugin-system/plugin-manager.ts (600+行)

功能:

  • 插件加载/启用/禁用/卸载生命周期管理
  • 贡献点统一注册和查询
  • 依赖图管理
  • 事件系统(on/off/emit)
  • 全局单例管理(getPluginManager/initializePluginManager)

4. 示例插件 ✅

文件: /plugins/example-custom-node/index.ts (200+行)

演示:

  • 如何定义插件元数据
  • 如何贡献自定义节点
  • 如何实现节点执行器
  • 如何进行配置验证

总计: 4个文件,约2200行代码


后续实施路线图

Phase 1.2-1.4: 完成插件系统基础(3周)

Week 1: 节点贡献系统集成

任务1.1: 修改现有工作流类型定义

// types/workflow.ts - 添加动态节点注册支持

/**
 * 扩展NodeType以支持插件节点
 */
export type NodeType =
  | 'llm' | 'prompt' | 'input' | 'output'
  | 'retrieval' | 'embedding' | 'transform'
  | 'conditional' | 'loop' | 'merge'
  | 'api_call' | 'database_query' | 'web_search'
  | 'agent' | 'memory' | 'code_execution'
  | 'ocr' | 'vision' | 'audio_stt' | 'audio_tts' | 'reranker'
  | 'mcp_tool' | 'skill'  // 新增
  | string;  // 允许插件自定义类型

/**
 * 节点注册表(全局)
 */
export class NodeRegistry {
  private static nodes: Map<NodeType, NodeContribution> = new Map();

  static register(contribution: NodeContribution): void {
    if (this.nodes.has(contribution.type)) {
      console.warn(`Node type "${contribution.type}" already registered`);
      return;
    }
    this.nodes.set(contribution.type, contribution);
  }

  static get(nodeType: NodeType): NodeContribution | undefined {
    return this.nodes.get(nodeType);
  }

  static getAll(): NodeContribution[] {
    return Array.from(this.nodes.values());
  }
}

任务1.2: 重构节点库组件

// components/workflow/node-palette.tsx

import { getPluginManager } from '@/lib/plugin-system/plugin-manager';

export function NodePalette() {
  const [nodes, setNodes] = useState<NodeContribution[]>([]);

  useEffect(() => {
    // 加载内置节点
    const builtinNodes = getBuiltinNodes();

    // 加载插件贡献的节点
    const pluginManager = getPluginManager();
    const pluginNodes = pluginManager.getAllNodeContributions();

    // 合并
    setNodes([...builtinNodes, ...pluginNodes]);

    // 监听插件节点注册事件
    const handleNodeRegistered = ({ nodeType, contribution }) => {
      setNodes(prev => [...prev, contribution]);
    };

    pluginManager.on('node:registered', handleNodeRegistered);

    return () => {
      pluginManager.off('node:registered', handleNodeRegistered);
    };
  }, []);

  // ... 渲染节点库
}

任务1.3: 集成执行器到工作流引擎

// lib/workflow/WorkflowExecutor.ts - 修改

import { getPluginManager } from '@/lib/plugin-system/plugin-manager';

class WorkflowExecutor {
  async executeNode(node: WorkflowNode, input: any): Promise<any> {
    // 1. 尝试从插件管理器获取执行器
    const pluginManager = getPluginManager();
    let executor = pluginManager.getExecutor(node.data.type);

    // 2. 如果没有,从内置执行器注册表获取
    if (!executor) {
      executor = NodeExecutorRegistry.getExecutor(node.data.type);
    }

    if (!executor) {
      throw new Error(`No executor found for node type: ${node.data.type}`);
    }

    // 3. 执行节点
    const result = await executor.execute(node, input, this.context);

    return result;
  }
}

Week 2: 钩子系统与UI组件扩展

任务2.1: 实现钩子执行引擎

// lib/plugin-system/hooks/hook-executor.ts

import { getPluginManager } from '../plugin-manager';
import type { HookContext, HookResult, HookTiming } from '../types';

export class HookExecutor {
  /**
   * 执行钩子
   */
  static async executeHooks(
    timing: HookTiming,
    context: HookContext
  ): Promise<HookResult> {
    const pluginManager = getPluginManager();
    const hooks = pluginManager.getHookContributions(timing, context.node.data.type);

    let currentContext = context;
    let result: HookResult = { continue: true };

    // 按优先级顺序执行钩子
    for (const hook of hooks) {
      try {
        result = await hook.handler(currentContext);

        if (!result.continue) {
          console.info(`[HookExecutor] Hook stopped execution: ${timing}`);
          break;
        }

        // 更新上下文(如果钩子修改了数据)
        if (result.modifiedInputs) {
          currentContext.inputs = result.modifiedInputs;
        }
        if (result.modifiedOutputs) {
          currentContext.outputs = result.modifiedOutputs;
        }
      } catch (error: any) {
        console.error(`[HookExecutor] Hook execution failed:`, error);
        // 决定是否继续执行后续钩子(可配置)
      }
    }

    return result;
  }
}

任务2.2: 集成钩子到工作流执行器

// lib/workflow/WorkflowExecutor.ts - 在executeNode中集成钩子

async executeNode(node: WorkflowNode, input: any): Promise<any> {
  // 1. PRE_EXECUTE钩子
  const preHookResult = await HookExecutor.executeHooks(HookTiming.PRE_EXECUTE, {
    ...this.context,
    node,
    inputs: input,
  });

  if (!preHookResult.continue) {
    return { skipped: true };
  }

  // 使用钩子修改后的输入
  const finalInput = preHookResult.modifiedInputs || input;

  // 2. 执行节点
  let result;
  try {
    const executor = this.getExecutor(node.data.type);
    result = await executor.execute(node, finalInput, this.context);

    // 3. POST_EXECUTE钩子
    const postHookResult = await HookExecutor.executeHooks(HookTiming.POST_EXECUTE, {
      ...this.context,
      node,
      inputs: finalInput,
      outputs: result.output,
    });

    // 使用钩子修改后的输出
    if (postHookResult.modifiedOutputs) {
      result.output = postHookResult.modifiedOutputs;
    }
  } catch (error: any) {
    // 4. ON_ERROR钩子
    const errorHookResult = await HookExecutor.executeHooks(HookTiming.ON_ERROR, {
      ...this.context,
      node,
      inputs: finalInput,
      error,
    });

    if (!errorHookResult.errorHandled) {
      throw error;
    }
  }

  return result;
}

任务2.3: 实现UI组件注册

// components/workflow/plugin-node-wrapper.tsx

import { getPluginManager } from '@/lib/plugin-system/plugin-manager';

export function PluginNodeWrapper({ node }) {
  const pluginManager = getPluginManager();

  // 尝试获取插件贡献的自定义组件
  const customComponent = pluginManager.getComponentContribution(
    `${node.data.type}_node`
  );

  if (customComponent && customComponent.type === 'node') {
    const CustomNode = customComponent.component;
    return <CustomNode node={node} />;
  }

  // 回退到默认节点组件
  return <DefaultNode node={node} />;
}

Week 3: 插件加载器与沙箱

任务3.1: 实现插件加载器

// lib/plugin-system/plugin-loader.ts

import dynamic from 'next/dynamic';

export class PluginLoader {
  /**
   * 从URL加载插件
   */
  static async loadFromUrl(url: string): Promise<Plugin> {
    try {
      const module = await import(/* webpackIgnore: true */ url);
      return module.default;
    } catch (error: any) {
      throw new Error(`Failed to load plugin from ${url}: ${error.message}`);
    }
  }

  /**
   * 从本地路径加载插件
   */
  static async loadFromPath(path: string): Promise<Plugin> {
    try {
      const module = dynamic(() => import(path));
      return (await module) as any;
    } catch (error: any) {
      throw new Error(`Failed to load plugin from ${path}: ${error.message}`);
    }
  }

  /**
   * 验证插件manifest
   */
  static validateManifest(manifest: PluginManifest): { valid: boolean; errors: string[] } {
    const errors: string[] = [];

    if (!manifest.id) errors.push('Missing plugin id');
    if (!manifest.name) errors.push('Missing plugin name');
    if (!manifest.version) errors.push('Missing plugin version');
    if (!manifest.main) errors.push('Missing plugin main entry');

    return {
      valid: errors.length === 0,
      errors,
    };
  }
}

任务3.2: 实现沙箱隔离(可选高级特性)

// lib/plugin-system/sandbox.ts

/**
 * 插件沙箱(使用Worker线程隔离)
 */
export class PluginSandbox {
  private worker: Worker | null = null;

  async execute(code: string, context: any): Promise<any> {
    // 创建Worker
    const workerCode = `
      self.addEventListener('message', async (e) => {
        try {
          const { code, context } = e.data;
          const fn = new Function('context', code);
          const result = await fn(context);
          self.postMessage({ success: true, result });
        } catch (error) {
          self.postMessage({ success: false, error: error.message });
        }
      });
    `;

    const blob = new Blob([workerCode], { type: 'application/javascript' });
    this.worker = new Worker(URL.createObjectURL(blob));

    return new Promise((resolve, reject) => {
      this.worker!.addEventListener('message', (e) => {
        if (e.data.success) {
          resolve(e.data.result);
        } else {
          reject(new Error(e.data.error));
        }
        this.dispose();
      });

      this.worker!.postMessage({ code, context });
    });
  }

  dispose() {
    if (this.worker) {
      this.worker.terminate();
      this.worker = null;
    }
  }
}

Phase 2: MCP工具集成(3周)

Week 4: 通用MCP工具节点

文件创建清单:

lib/workflow/node-executors/MCPToolNodeExecutor.ts
components/workflow/nodes/MCPToolNode.tsx
components/workflow/mcp-tool-selector.tsx
types/mcp.ts (扩展MCP类型定义)

MCPToolNodeExecutor实现示例:

// lib/workflow/node-executors/MCPToolNodeExecutor.ts

import { mcpAPI } from '@/lib/api/mcp';

export class MCPToolNodeExecutor implements NodeExecutor {
  async execute(node, inputs, context) {
    const { serverId, toolName, arguments: args } = node.data.config;

    // 验证配置
    if (!serverId || !toolName) {
      throw new Error('MCP服务器ID和工具名称是必需的');
    }

    context.logger.info(`Calling MCP tool: ${toolName} on server ${serverId}`);

    try {
      // 调用MCP工具
      const result = await mcpAPI.callTool(serverId, toolName, {
        ...args,
        ...inputs,  // 合并工作流输入
      });

      return {
        output: result,
        status: 'success',
        metadata: {
          serverId,
          toolName,
          callTime: new Date().toISOString(),
        },
      };
    } catch (error: any) {
      context.logger.error(`MCP tool call failed: ${error.message}`);

      return {
        output: {},
        status: 'error',
        error: `MCP调用失败: ${error.message}`,
      };
    }
  }
}

Week 5: 常用MCP工具专用节点生成

自动生成逻辑:

// lib/mcp/node-generator.ts

export class MCPNodeGenerator {
  /**
   * 根据MCP工具定义生成节点贡献
   */
  static async generateNodeFromTool(
    server: MCPServer,
    tool: MCPTool
  ): Promise<NodeContribution> {
    return {
      type: `mcp_${server.name}_${tool.name}` as NodeType,
      category: 'mcp',
      label: tool.name,
      description: tool.description || `MCP工具: ${tool.name}`,
      icon: this.inferIconFromToolName(tool.name),

      // 从JSON Schema生成输入定义
      inputs: this.schemaToInputs(tool.input_schema),

      // 输出(通常是通用object)
      outputs: [
        { name: 'result', type: 'object', description: '工具执行结果' },
      ],

      // 配置
      configSchema: {
        type: 'object',
        properties: {
          timeout: { type: 'number', default: 30000 },
          retryCount: { type: 'number', default: 3 },
        },
      },

      defaultConfig: {
        serverId: server.id,
        toolName: tool.name,
        timeout: 30000,
        retryCount: 3,
      },
    };
  }

  /**
   * 根据工具名推断图标
   */
  private static inferIconFromToolName(toolName: string): string {
    if (toolName.includes('file')) return 'File';
    if (toolName.includes('db') || toolName.includes('database')) return 'Database';
    if (toolName.includes('search')) return 'Search';
    if (toolName.includes('http') || toolName.includes('api')) return 'Globe';
    return 'Tool';
  }

  /**
   * JSON Schema转输入定义
   */
  private static schemaToInputs(schema: any): NodeInput[] {
    const inputs: NodeInput[] = [];

    if (schema.type === 'object' && schema.properties) {
      for (const [key, propSchema] of Object.entries(schema.properties as any)) {
        inputs.push({
          name: key,
          type: propSchema.type as any,
          required: schema.required?.includes(key),
          description: propSchema.description,
        });
      }
    }

    return inputs;
  }
}

Week 6: MCP工具发现与同步

后台同步服务:

// lib/mcp/tool-discovery.ts

export class MCPToolDiscoveryService {
  private syncInterval: NodeJS.Timeout | null = null;

  /**
   * 启动自动同步
   */
  startAutoSync(intervalMs: number = 5 * 60 * 1000) {
    this.syncInterval = setInterval(() => {
      this.syncAllServers().catch(console.error);
    }, intervalMs);

    console.info('[MCPToolDiscovery] Auto-sync started');
  }

  /**
   * 同步所有MCP服务器的工具
   */
  async syncAllServers(): Promise<void> {
    const servers = await mcpAPI.listServers();

    for (const server of servers) {
      if (!server.is_active) continue;

      try {
        await this.syncServerTools(server.id);
      } catch (error) {
        console.error(`[MCPToolDiscovery] Failed to sync server ${server.id}:`, error);
      }
    }
  }

  /**
   * 同步单个服务器的工具
   */
  async syncServerTools(serverId: string): Promise<void> {
    // 1. 获取服务器工具列表
    const tools = await mcpAPI.listServerTools(serverId);

    // 2. 生成节点
    const pluginManager = getPluginManager();
    const server = await mcpAPI.getServer(serverId);

    for (const tool of tools) {
      const nodeContribution = await MCPNodeGenerator.generateNodeFromTool(server, tool);

      // 3. 注册到插件系统
      try {
        pluginManager.registerNode(nodeContribution);
        console.info(`[MCPToolDiscovery] Registered node for tool: ${tool.name}`);
      } catch (error) {
        // 节点可能已存在,忽略
      }
    }
  }

  stopAutoSync() {
    if (this.syncInterval) {
      clearInterval(this.syncInterval);
      this.syncInterval = null;
    }
  }
}

Phase 3: Skill系统实现(3周)

Week 7: Skill抽象与Anthropic集成

文件创建清单:

lib/skills/skill-interface.ts
lib/skills/skill-registry.ts
lib/skills/skill-executor.ts
lib/skills/providers/anthropic-skills.ts
lib/workflow/node-executors/SkillNodeExecutor.ts
components/workflow/nodes/SkillNode.tsx

Skill接口定义:

// lib/skills/skill-interface.ts

export interface Skill {
  id: string;
  name: string;
  description: string;
  provider: 'anthropic' | 'openai' | 'custom';
  version: string;

  // 参数定义
  parameters: Record<string, ParameterSchema>;

  // 执行方法
  execute(params: any, context: ExecutionContext): Promise<any>;
}

export class AnthropicSkill implements Skill {
  constructor(
    public id: string,
    public name: string,
    public description: string,
    public version: string,
    public parameters: Record<string, ParameterSchema>,
    private skillId: string  // Anthropic Skills API中的ID
  ) {}

  async execute(params: any, context: ExecutionContext): Promise<any> {
    // 调用Anthropic Skills API
    const client = new Anthropic({ apiKey: process.env.ANTHROPIC_API_KEY });

    const result = await client.beta.skills.execute({
      skill_id: this.skillId,
      parameters: params,
    });

    return result;
  }
}

数据库Schema扩展

Prisma Schema更新

// prisma/schema.prisma

model Plugin {
  id              String   @id @default(cuid())
  name            String
  version         String
  description     String?
  author          String?
  repository      String?
  isEnabled       Boolean  @default(true)
  isSystem        Boolean  @default(false)
  manifest        Json     @db.JsonB
  configuration   Json?    @db.JsonB
  installPath     String?
  installedAt     DateTime @default(now())
  updatedAt       DateTime @updatedAt
  userId          String?

  nodeTypes       PluginNodeType[]

  @@unique([name, version])
  @@index([isEnabled])
  @@index([userId])
}

model PluginNodeType {
  id              String   @id @default(cuid())
  pluginId        String
  nodeType        String   @unique
  definition      Json     @db.JsonB
  createdAt       DateTime @default(now())

  plugin          Plugin   @relation(fields: [pluginId], references: [id], onDelete: Cascade)

  @@index([pluginId])
  @@index([nodeType])
}

model Skill {
  id              String   @id @default(cuid())
  name            String
  description     String?
  provider        String
  providerId      String?
  version         String
  parameters      Json     @db.JsonB
  implementation  String?  @db.Text
  isPublic        Boolean  @default(false)
  usageCount      Int      @default(0)
  createdAt       DateTime @default(now())
  updatedAt       DateTime @updatedAt
  userId          String?

  @@unique([provider, providerId])
  @@index([provider])
  @@index([userId])
}

执行迁移

npx prisma migrate dev --name add_plugin_and_skill_system
npx prisma generate

API端点开发

插件管理API

// app/api/plugins/route.ts

export async function GET(request: NextRequest) {
  const pluginManager = getPluginManager();
  const plugins = pluginManager.getAllPlugins();

  return NextResponse.json({
    success: true,
    data: {
      plugins: plugins.map(p => ({
        id: p.plugin.metadata.id,
        name: p.plugin.metadata.name,
        version: p.plugin.metadata.version,
        status: p.status,
        contributedNodesCount: p.contributedNodes.length,
        // ... 更多信息
      })),
    },
  });
}

export async function POST(request: NextRequest) {
  const { pluginPath, config } = await request.json();

  // 加载插件
  const plugin = await PluginLoader.loadFromPath(pluginPath);

  // 注册到管理器
  const pluginManager = getPluginManager();
  await pluginManager.loadPlugin(plugin, config);

  return NextResponse.json({
    success: true,
    data: { pluginId: plugin.metadata.id },
  });
}

前端UI开发

插件管理页面

// app/workspace/plugins/page.tsx

export default function PluginsPage() {
  const [plugins, setPlugins] = useState<PluginInstance[]>([]);

  useEffect(() => {
    loadPlugins();
  }, []);

  async function loadPlugins() {
    const response = await fetch('/api/plugins');
    const data = await response.json();
    setPlugins(data.data.plugins);
  }

  return (
    <div className="container mx-auto py-8">
      <div className="flex justify-between items-center mb-6">
        <h1 className="text-2xl font-bold">插件管理</h1>
        <Button onClick={() => setInstallDialogOpen(true)}>
          安装插件
        </Button>
      </div>

      <div className="grid grid-cols-1 md:grid-cols-2 lg:grid-cols-3 gap-4">
        {plugins.map(plugin => (
          <PluginCard key={plugin.id} plugin={plugin} />
        ))}
      </div>

      <InstallPluginDialog
        open={installDialogOpen}
        onOpenChange={setInstallDialogOpen}
      />
    </div>
  );
}

测试策略

单元测试示例

// __tests__/plugin-system/plugin-manager.test.ts

describe('PluginManager', () => {
  let manager: PluginManager;

  beforeEach(() => {
    manager = new PluginManager({ platformVersion: '1.0.0' });
  });

  it('should load plugin successfully', async () => {
    const mockPlugin: Plugin = {
      metadata: {
        id: 'test-plugin',
        name: 'Test Plugin',
        version: '1.0.0',
      },
      onLoad: jest.fn(),
      onUnload: jest.fn(),
    };

    await manager.loadPlugin(mockPlugin);

    expect(mockPlugin.onLoad).toHaveBeenCalled();
    expect(manager.getPlugin('test-plugin')).toBeDefined();
  });

  it('should register node contribution', async () => {
    // ... 测试节点注册
  });
});

性能优化建议

  1. 懒加载插件: 仅在需要时加载插件代码
  2. 节点执行缓存: 对纯函数节点结果进行缓存
  3. Worker线程隔离: 将重计算节点放入Worker线程
  4. 批量注册: 插件启动时批量注册节点,减少事件触发
  5. 虚拟滚动: 节点库使用虚拟滚动,提升大量节点时的性能

安全最佳实践

  1. 权限最小化原则: 插件默认不授予敏感权限
  2. 代码签名: 系统插件必须通过数字签名验证
  3. 输入验证: 所有插件输入必须通过JSON Schema验证
  4. 资源限制: 限制插件CPU/内存/网络使用
  5. 沙箱隔离: 第三方插件在Worker线程中执行

后续迭代计划

Phase 4-6: 数据库+API+UI(6周)

  • Week 8-9: 数据库Schema实现和API开发
  • Week 10-11: 插件管理UI和Skill管理UI
  • Week 12-13: 性能优化和安全加固

Phase 7: 内置插件开发(2周)

  • Week 14: MCP工具插件、Anthropic Skills插件
  • Week 15: 代码执行插件、自定义函数插件

Phase 8-9: 测试与文档(2周)

  • Week 16: 完整单元测试和集成测试
  • Week 17: 开发者文档、用户手册、API参考

开发者快速上手

1. 创建插件的5个步骤

// Step 1: 创建插件类
class MyPlugin implements Plugin {
  metadata = {
    id: 'com.mycompany.myplugin',
    name: 'My Plugin',
    version: '1.0.0',
  };

  // Step 2: 实现onLoad钩子
  async onLoad(context: PluginContext) {
    // Step 3: 注册节点
    context.registerNode({
      type: 'my_custom_node',
      label: '我的节点',
      // ... 节点定义
    });

    // Step 4: 注册执行器
    context.registerExecutor('my_custom_node', new MyExecutor());
  }

  async onUnload() {
    // Step 5: 清理资源
  }
}

export default MyPlugin;

2. 使用插件

import { getPluginManager } from '@/lib/plugin-system/plugin-manager';
import MyPlugin from './my-plugin';

const manager = getPluginManager();
await manager.loadPlugin(new MyPlugin());
await manager.enablePlugin('com.mycompany.myplugin');

常见问题解答

Q1: 如何调试插件? A: 使用context.logger记录日志,在浏览器开发者工具中查看。

Q2: 插件如何访问数据库? A: 声明DATABASE权限,然后使用context.db.query()

Q3: 如何让插件节点持久化到工作流? A: 节点类型必须全局唯一,卸载插件后重新加载时保持相同的nodeType。

Q4: 插件版本升级如何处理兼容性? A: 使用Semantic Versioning,遵循向后兼容原则,或提供迁移脚本。

Q5: 如何发布插件到市场? A: 暂未实现插件市场,计划在Phase 10实现。


联系与支持


猪哥云(四川)网络科技有限公司 | 合规网 www.hegui.com 猪哥云-数据产品部-Maurice | maurice_wen@proton.me 2025 猪哥云-灵阙企业级智能体平台