📖目录
- 前言
- 1. Nacos AI 模块概述
- 2. 核心组件详解
-
- 2.1 MCP (Model Control Plane)
-
- 2.1.1 核心功能
- 2.1.2 关键类分析
-
- McpServerOperationService
* 索引机制
- McpServerOperationService
- 2.1.3 控制器层
- 2.1.1 核心功能
- 2.2 A2A (Agent to Agent)
-
- 2.2.1 核心功能
- 2.2.2 关键类分析
-
- A2aServerOperationService
* 请求处理器
- A2aServerOperationService
- 2.2.1 核心功能
- 3. 关键源码剖析
-
- 3.1 模型服务注册流程
- 3.2 代理通信处理流程
- 4. 架构设计亮点
-
- 4.1 分层架构设计
- 4.2 缓存机制优化
- 4.3 插件化设计
- 5. 使用场景
-
- 5.1 微服务AI治理
- 5.2 多模型版本管理
- 5.3 跨域代理通信
- 6. 未来发展趋势
-
- 6.1 AI服务网格集成
- 6.2 智能负载均衡
- 6.3 自动扩缩容
- 7. 结语与学习建议
-
-
- 7.1. 动手实践(快速搭建本地环境)
- 7.2. 调试技巧
- 7.3. 延伸阅读
- 7.1. 动手实践(快速搭建本地环境)
-
- 📝 版权声明
前言
📌 文章说明:本文基于 Nacos 3.x 最新版本(截至 2025 年),深入介绍 Nacos AI 模块的设计理念、核心组件以及关键功能,帮助读者了解如何利用 Nacos 管理 AI 模型服务。适合对 AI 服务治理感兴趣的中高级 Java 后端开发者和架构师阅读。
🔖 关键词:#Nacos #AI服务治理 #MCP #A2A #模型管理 #Nacos3.x #源码解析
1. Nacos AI 模块概述
Nacos AI 模块是 Nacos 3.x 中新增的一个重要功能模块,旨在为 AI 模型服务提供统一的服务发现、配置管理和治理能力。该模块主要包括两个子系统:
- MCP (Model Control Plane):模型控制平面,用于管理 AI 模型的注册、发现和配置
- A2A (Agent to Agent):代理间通信系统,支持 AI 代理之间的通信和协调
首先需要纠正一个认知:Nacos 3.x 目前没有独立的 “AI 模块”,而是将 AI 相关能力(如服务健康度 AI 预测、配置异常 AI 诊断等)嵌入到核心模块中(如服务发现模块 nacos-naming、配置模块 nacos-config),通过依赖轻量级 AI 工具包或算法实现,而非单独拆分模块。
这也是为什么你编译后看不到 “AI 字眼” 的核心原因 - 能力已分散在现有模块中,而非集中在独立目录。
2. 核心组件详解
2.1 MCP (Model Control Plane)
MCP 是 AI 模块的核心组件,负责 AI 模型服务的全生命周期管理。
2.1.1 核心功能
- 模型服务注册与发现
- 模型版本管理
- 工具规范定义
- 端点配置管理
- 元数据存储与查询
2.1.2 关键类分析
McpServerOperationService
这是 MCP 模块的核心服务类,负责模型服务的操作管理:
1// McpServerOperationService 提供了模型服务的关键操作实现,包含模型服务的注册、查询、更新和删除等方法 2public class McpServerOperationService { 3 // 创建新的MCP服务器 4 public String createMcpServer(String namespaceId, McpServerBasicInfo serverSpecification, 5 McpToolSpecification toolSpecification, McpEndpointSpec endpointSpecification) throws NacosException { 6 7 // 解析存在的MCP服务器ID 8 String existId = resolveMcpServerId(namespaceId, serverSpecification.getName(), StringUtils.EMPTY); 9 // 如果存在ID不为空 10 if (StringUtils.isNotEmpty(existId)) { 11 // 抛出资源冲突异常 12 throw new NacosApiException(NacosApiException.CONFLICT, ErrorCode.RESOURCE_CONFLICT, 13 String.format("mcp server [`%s`](https://xplanc.org/primers/document/zh/03.HTML/EX.HTML%20%E5%85%83%E7%B4%A0/EX.s.md) has existed, please update it rather than create.", 14 serverSpecification.getName())); 15 } 16 17 // 获取版本详情 18 ServerVersionDetail versionDetail = serverSpecification.getVersionDetail(); 19 // 如果版本详情为空且版本不为空 20 if (null == versionDetail && StringUtils.isNotBlank(serverSpecification.getVersion())) { 21 // 创建版本详情对象 22 versionDetail = new ServerVersionDetail(); 23 // 设置版本 24 versionDetail.setVersion(serverSpecification.getVersion()); 25 // 设置版本详情 26 serverSpecification.setVersionDetail(versionDetail); 27 } 28 // 如果版本详情为空或版本为空 29 if (Objects.isNull(versionDetail) || StringUtils.isEmpty(versionDetail.getVersion())) { 30 // 抛出参数验证异常 31 throw new NacosApiException(NacosApiException.INVALID_PARAM, ErrorCode.PARAMETER_VALIDATE_ERROR, 32 "Version must be specified in parameter [serverSpecification](file:///Users/zhiyixie/Downloads/work_space/码云/Nacos/ai/src/main/java/com/alibaba/nacos/ai/form/mcp/admin/McpDetailForm.java#L35-L35)"); 33 } 34 // 声明ID 35 String id; 36 // 获取自定义MCP ID 37 String customMcpId = serverSpecification.getId(); 38 39 // 如果自定义MCP ID为空 40 if (StringUtils.isEmpty(customMcpId)) { 41 // 生成随机UUID作为ID 42 id = UUID.randomUUID().toString(); 43 } else { 44 // 如果自定义MCP ID不是UUID字符串 45 if (!StringUtils.isUuidString(customMcpId)) { 46 // 抛出参数验证异常 47 throw new NacosApiException(NacosApiException.INVALID_PARAM, ErrorCode.PARAMETER_VALIDATE_ERROR, 48 "parameter [`serverSpecification.id`](https://xplanc.org/primers/document/zh/10.Bash/90.%E5%B8%AE%E5%8A%A9%E6%89%8B%E5%86%8C/EX.id.md) is not match uuid pattern, must obey uuid pattern"); 49 } 50 // 如果通过ID获取的MCP服务器不为空 51 if (mcpServerIndex.getMcpServerById(serverSpecification.getId()) != null) { 52 // 抛出参数验证异常 53 throw new NacosApiException(NacosApiException.INVALID_PARAM, ErrorCode.PARAMETER_VALIDATE_ERROR, 54 "parameter [`serverSpecification.id`](https://xplanc.org/primers/document/zh/10.Bash/90.%E5%B8%AE%E5%8A%A9%E6%89%8B%E5%86%8C/EX.id.md) conflict with exist mcp server id"); 55 } 56 57 // 设置ID为自定义MCP ID 58 id = customMcpId; 59 } 60 61 // 设置服务器规范的ID 62 serverSpecification.setId(id); 63 // 获取当前UTC时间 64 ZonedDateTime currentTime = ZonedDateTime.now(ZoneOffset.UTC); 65 // 创建日期时间格式化器 66 DateTimeFormatter formatter = DateTimeFormatter.ofPattern(Constants.RELEASE_DATE_FORMAT); 67 // 格式化当前时间 68 String formattedCurrentTime = currentTime.format(formatter); 69 // 设置版本详情的发布日期 70 versionDetail.setRelease_date(formattedCurrentTime); 71 72 // 创建新的服务器规范对象 73 McpServerStorageInfo newSpecification = new McpServerStorageInfo(); 74 // 复制属性 75 BeanUtils.copyProperties(serverSpecification, newSpecification); 76 // 注入工具和端点 77 injectToolAndEndpoint(namespaceId, serverSpecification.getId(), newSpecification, toolSpecification, 78 endpointSpecification, Boolean.FALSE); 79 80 // 构建服务器版本信息 81 McpServerVersionInfo versionInfo = buildServerVersionInfo(newSpecification, id, versionDetail); 82 83 // 创建配置请求信息 84 ConfigRequestInfo configRequestInfo = new ConfigRequestInfo(); 85 // 设置不更新已存在的配置 86 configRequestInfo.setUpdateForExist(Boolean.FALSE); 87 88 // 构建MCP服务器版本表单 89 ConfigFormV3 mcpServerVersionForm = buildMcpServerVersionForm(namespaceId, versionInfo); 90 // 发布配置 91 configOperationService.publishConfig(mcpServerVersionForm, configRequestInfo, null); 92 93 // 构建MCP配置表单 94 ConfigForm configForm = buildMcpConfigForm(namespaceId, id, versionDetail.getVersion(), newSpecification); 95 // 记录操作开始时间 96 long startOperationTime = System.currentTimeMillis(); 97 // 发布配置 98 configOperationService.publishConfig(configForm, configRequestInfo, null); 99 // 同步效果服务 100 syncEffectService.toSync(configForm, startOperationTime); 101 102 // 在成功的数据库操作后删除相关缓存 103 invalidateCacheAfterDbOperation(namespaceId, serverSpecification.getName(), id); 104 105 // 返回ID 106 return id; 107 } 108 109 // 更新已存在的MCP服务器 110 public void updateMcpServer(String namespaceId, boolean isPublish, McpServerBasicInfo serverSpecification, 111 McpToolSpecification toolSpecification, McpEndpointSpec endpointSpecification, boolean overrideExisting) throws NacosException { 112 113 // 获取MCP服务器ID 114 String mcpServerId = serverSpecification.getId(); 115 // 解析MCP服务器ID 116 mcpServerId = resolveMcpServerId(namespaceId, serverSpecification.getName(), mcpServerId); 117 // 如果服务器规范的ID为空 118 if (StringUtils.isEmpty(serverSpecification.getId())) { 119 // 设置服务器规范的ID 120 serverSpecification.setId(mcpServerId); 121 } 122 123 // 获取版本详情 124 ServerVersionDetail versionDetail = serverSpecification.getVersionDetail(); 125 // 如果版本详情为空且版本不为空 126 if (null == versionDetail && StringUtils.isNotBlank(serverSpecification.getVersion())) { 127 // 创建版本详情对象 128 versionDetail = new ServerVersionDetail(); 129 // 设置版本 130 versionDetail.setVersion(serverSpecification.getVersion()); 131 // 设置版本详情 132 serverSpecification.setVersionDetail(versionDetail); 133 } 134 // 如果版本详情为空或版本为空 135 if (Objects.isNull(versionDetail) || StringUtils.isEmpty(versionDetail.getVersion())) { 136 // 抛出参数验证异常 137 throw new NacosApiException(NacosApiException.INVALID_PARAM, ErrorCode.PARAMETER_VALIDATE_ERROR, 138 "Version must be specified in parameter [serverSpecification](file:///Users/zhiyixie/Downloads/work_space/码云/Nacos/ai/src/main/java/com/alibaba/nacos/ai/form/mcp/admin/McpDetailForm.java#L35-L35)"); 139 } 140 // 获取MCP服务器版本信息 141 final McpServerVersionInfo mcpServerVersionInfo = getMcpServerVersionInfo(namespaceId, mcpServerId); 142 143 // 获取更新版本 144 String updateVersion = versionDetail.getVersion(); 145 // 创建新的服务器规范对象 146 McpServerStorageInfo newSpecification = new McpServerStorageInfo(); 147 // 复制属性 148 BeanUtils.copyProperties(serverSpecification, newSpecification); 149 // 注入工具和端点 150 injectToolAndEndpoint(namespaceId, mcpServerId, newSpecification, toolSpecification, endpointSpecification, overrideExisting); 151 152 // 构建MCP配置表单 153 ConfigForm configForm = buildMcpConfigForm(namespaceId, mcpServerId, updateVersion, newSpecification); 154 // 发布配置 155 configOperationService.publishConfig(configForm, new ConfigRequestInfo(), null); 156 157 // 获取版本详情列表 158 List<ServerVersionDetail> versionDetails = mcpServerVersionInfo.getVersionDetails(); 159 // 创建版本集合 160 Set<String> versionSet = versionDetails.stream().map(ServerVersionDetail::getVersion) 161 .collect(Collectors.toSet()); 162 // 如果版本集合不包含更新版本 163 if (!versionSet.contains(updateVersion)) { 164 // 创建版本对象 165 ServerVersionDetail version = new ServerVersionDetail(); 166 // 设置版本 167 version.setVersion(updateVersion); 168 // 将版本添加到版本详情列表中 169 versionDetails.add(version); 170 // 设置版本列表 171 mcpServerVersionInfo.setVersions(versionDetails); 172 } 173 174 // 如果是发布操作 175 if (isPublish) { 176 // 设置名称 177 mcpServerVersionInfo.setName(newSpecification.getName()); 178 // 设置描述 179 mcpServerVersionInfo.setDescription(newSpecification.getDescription()); 180 // 设置仓库 181 mcpServerVersionInfo.setRepository(newSpecification.getRepository()); 182 // 设置协议 183 mcpServerVersionInfo.setProtocol(newSpecification.getProtocol()); 184 // 设置前端协议 185 mcpServerVersionInfo.setFrontProtocol(newSpecification.getFrontProtocol()); 186 // 设置能力 187 mcpServerVersionInfo.setCapabilities(newSpecification.getCapabilities()); 188 // 设置最新发布版本 189 mcpServerVersionInfo.setLatestPublishedVersion(updateVersion); 190 191 // 遍历版本详情列表 192 for (ServerVersionDetail detail : versionDetails) { 193 // 如果版本详情的版本等于更新版本 194 if (detail.getVersion().equals(updateVersion)) { 195 // 获取当前UTC时间 196 ZonedDateTime currentTime = ZonedDateTime.now(ZoneOffset.UTC); 197 // 创建日期时间格式化器 198 DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss'Z'"); 199 // 格式化当前时间 200 String formattedCurrentTime = currentTime.format(formatter); 201 // 设置发布日期 202 detail.setRelease_date(formattedCurrentTime); 203 // 设置为最新版本 204 detail.setIs_latest(true); 205 // 跳出循环 206 break; 207 } else { 208 // 设置为非最新版本 209 detail.setIs_latest(false); 210 } 211 } 212 // 设置版本列表 213 mcpServerVersionInfo.setVersions(versionDetails); 214 // 设置是否启用 215 mcpServerVersionInfo.setEnabled(newSpecification.isEnabled()); 216 } 217 218 // 构建MCP服务器版本表单 219 ConfigFormV3 mcpServerVersionForm = buildMcpServerVersionForm(namespaceId, mcpServerVersionInfo); 220 // 记录操作开始时间 221 long startOperationTime = System.currentTimeMillis(); 222 // 发布配置 223 configOperationService.publishConfig(mcpServerVersionForm, new ConfigRequestInfo(), null); 224 // 同步效果服务 225 syncEffectService.toSync(mcpServerVersionForm, startOperationTime); 226 227 // 在成功的数据库操作后删除相关缓存 228 invalidateCacheAfterDbUpdateOperation(namespaceId, mcpServerVersionInfo.getName(), 229 serverSpecification.getName(), mcpServerId); 230 } 231 232 // 删除已存在的MCP服务器 233 public void deleteMcpServer(String namespaceId, String mcpName, String mcpServerId, String version) 234 throws NacosException { 235 // 解析MCP服务器ID 236 mcpServerId = resolveMcpServerId(namespaceId, mcpName, mcpServerId); 237 // 获取MCP服务器版本信息 238 McpServerVersionInfo mcpServerVersionInfo = getMcpServerVersionInfo(namespaceId, mcpServerId); 239 // 创建需要删除的版本列表 240 List<String> versionsNeedDelete = new ArrayList<>(); 241 // 如果版本不为空 242 if (StringUtils.isNotEmpty(version)) { 243 // 将版本添加到需要删除的版本列表中 244 versionsNeedDelete.add(version); 245 } else { 246 // 将版本详情列表中的版本收集到需要删除的版本列表中 247 versionsNeedDelete = mcpServerVersionInfo.getVersionDetails().stream().map(ServerVersionDetail::getVersion) 248 .collect(Collectors.toList()); 249 } 250 251 // 遍历需要删除的版本列表 252 for (String versionNeedDelete : versionsNeedDelete) { 253 // 删除MCP工具 254 toolOperationService.deleteMcpTool(namespaceId, mcpServerId, versionNeedDelete); 255 // 删除MCP服务器端点服务 256 endpointOperationService.deleteMcpServerEndpointService(namespaceId, mcpServerVersionInfo.getName() + "::" + versionNeedDelete); 257 // 格式化服务器规范信息数据ID 258 String serverSpecDataId = McpConfigUtils.formatServerSpecInfoDataId(mcpServerId, versionNeedDelete); 259 // 删除配置 260 configOperationService.deleteConfig(serverSpecDataId, Constants.MCP_SERVER_GROUP, namespaceId, null, null, 261 "nacos", null); 262 // 格式化服务器版本信息数据ID 263 String serverVersionDataId = McpConfigUtils.formatServerVersionInfoDataId(mcpServerId); 264 // 删除配置 265 configOperationService.deleteConfig(serverVersionDataId, Constants.MCP_SERVER_VERSIONS_GROUP, namespaceId, 266 null, null, "nacos", null); 267 } 268 269 // 在成功的数据库操作后删除相关缓存 270 invalidateCacheAfterDbOperation(namespaceId, mcpName, mcpServerId); 271 } 272 273 // 获取指定MCP服务器详细信息 274 public McpServerDetailInfo getMcpServerDetail(String namespaceId, String mcpServerId, String mcpServerName, 275 String version) throws NacosException { 276 // 解析MCP服务器ID 277 mcpServerId = resolveMcpServerId(namespaceId, mcpServerName, mcpServerId); 278 279 // 获取MCP服务器版本信息 280 McpServerVersionInfo mcpServerVersionInfo = getMcpServerVersionInfo(namespaceId, mcpServerId); 281 // 如果版本为空 282 if (StringUtils.isEmpty(version)) { 283 // 获取版本详情列表大小 284 int size = mcpServerVersionInfo.getVersionDetails().size(); 285 // 获取最后一个版本详情 286 ServerVersionDetail last = mcpServerVersionInfo.getVersionDetails().get(size - 1); 287 // 设置版本为最后一个版本 288 version = last.getVersion(); 289 } 290 291 // 构建查询MCP服务器请求 292 ConfigQueryChainRequest request = buildQueryMcpServerRequest(namespaceId, mcpServerId, version); 293 // 处理请求获取响应 294 ConfigQueryChainResponse response = configQueryChainService.handle(request); 295 // 如果配置未找到 296 if (McpConfigUtils.isConfigNotFound(response.getStatus())) { 297 // 抛出未找到异常 298 throw new NacosApiException(NacosApiException.NOT_FOUND, ErrorCode.MCP_SEVER_VERSION_NOT_FOUND, 299 String.format("mcp server [`%s`](https://xplanc.org/primers/document/zh/03.HTML/EX.HTML%20%E5%85%83%E7%B4%A0/EX.s.md) for version [`%s`](https://xplanc.org/primers/document/zh/03.HTML/EX.HTML%20%E5%85%83%E7%B4%A0/EX.s.md) not found", mcpServerId, version)); 300 } 301 302 // 将响应内容转换为MCP服务器存储信息 303 McpServerStorageInfo serverSpecification = JacksonUtils.toObj(response.getContent(), 304 McpServerStorageInfo.class); 305 306 // 创建结果对象 307 McpServerDetailInfo result = new McpServerDetailInfo(); 308 // 设置ID 309 result.setId(mcpServerId); 310 // 设置命名空间ID 311 result.setNamespaceId(namespaceId); 312 // 复制属性 313 BeanUtils.copyProperties(serverSpecification, result); 314 315 // 获取版本详情列表 316 List<ServerVersionDetail> versionDetails = mcpServerVersionInfo.getVersionDetails(); 317 // 获取最新发布版本 318 String latestVersion = mcpServerVersionInfo.getLatestPublishedVersion(); 319 // 遍历版本详情列表 320 for (ServerVersionDetail versionDetail : versionDetails) { 321 // 设置是否为最新版本 322 versionDetail.setIs_latest(versionDetail.getVersion().equals(latestVersion)); 323 } 324 // 设置所有版本 325 result.setAllVersions(mcpServerVersionInfo.getVersionDetails()); 326 327 // 获取版本详情 328 ServerVersionDetail versionDetail = result.getVersionDetail(); 329 // 设置是否为最新版本 330 versionDetail.setIs_latest(versionDetail.getVersion().equals(latestVersion)); 331 // 设置版本 332 result.setVersion(versionDetail.getVersion()); 333 334 // 如果工具描述引用不为空 335 if (Objects.nonNull(serverSpecification.getToolsDescriptionRef())) { 336 // 获取MCP工具 337 McpToolSpecification toolSpec = toolOperationService.getMcpTool(namespaceId, 338 serverSpecification.getToolsDescriptionRef()); 339 // 设置工具规范 340 result.setToolSpec(toolSpec); 341 } 342 343 // 如果协议不是STDIO 344 if (!AiConstants.Mcp.MCP_PROTOCOL_STDIO.equalsIgnoreCase(serverSpecification.getProtocol())) { 345 // 注入端点 346 injectEndpoint(result); 347 } 348 // 返回结果 349 return result; 350 } 351 352} 353
索引机制
为了提高查询性能,AI 模块实现了多种索引机制:
- PlainMcpServerIndex: 基础索引实现
- CachedMcpServerIndex: 带缓存的索引实现
- MemoryMcpCacheIndex: 内存缓存索引实现
我们这里选择带缓冲的索引实现来查看:
1// CachedMcpServerIndex 结合内存缓存和数据库查询,提升查询性能 2public class CachedMcpServerIndex extends AbstractMcpServerIndex { 3 private final McpCacheIndex cacheIndex; 4 /** 5 * 根据ID获取MCP服务器信息 6 */ 7 @Override 8 // 重写父类方法,根据ID获取MCP服务器信息 9 public McpServerIndexData getMcpServerById(String id) { 10 // 检查缓存是否启用 11 if (!cacheEnabled) { 12 // 缓存未启用,直接从数据库查询 13 LOGGER.debug("缓存已禁用,直接从数据库查询mcpId: {}", id); 14 // 从数据库获取MCP服务器信息 15 return getMcpServerByIdFromDatabase(id); 16 } 17 // 优先查询缓存 18 // 从缓存中获取MCP服务器数据 19 McpServerIndexData cachedData = cacheIndex.getMcpServerById(id); 20 // 检查缓存中是否存在数据 21 if (cachedData != null) { 22 // 缓存命中,返回缓存数据 23 LOGGER.debug("缓存命中 mcpId: {}", id); 24 // 返回缓存中的数据 25 return cachedData; 26 } 27 // 缓存未命中,查询数据库 28 // 缓存未命中,从数据库查询 29 LOGGER.debug("缓存未命中 mcpId: {}, 查询数据库", id); 30 // 从数据库获取MCP服务器信息 31 McpServerIndexData dbData = getMcpServerByIdFromDatabase(id); 32 // 检查数据库中是否存在数据 33 if (dbData != null) { 34 // 数据库中存在数据,更新缓存 35 cacheIndex.updateIndex(dbData.getNamespaceId(), dbData.getId(), dbData.getId()); 36 // 记录缓存更新日志 37 LOGGER.debug("更新缓存 mcpId: {}", id); 38 } 39 // 返回数据库查询结果 40 return dbData; 41 } 42 43 /** 44 * 根据名称获取MCP服务器信息 45 */ 46 @Override 47 // 重写父类方法,根据命名空间ID和名称获取MCP服务器信息 48 public McpServerIndexData getMcpServerByName(String namespaceId, String name) { 49 // 检查参数是否有效 50 if (StringUtils.isEmpty(namespaceId) && StringUtils.isEmpty(name)) { 51 // 参数无效,记录警告日志 52 LOGGER.warn("getMcpServerByName的参数无效: namespaceId={}, name={}", namespaceId, name); 53 // 返回null 54 return null; 55 } 56 57 // 检查命名空间ID是否为空 58 if (StringUtils.isEmpty(namespaceId)) { 59 // 命名空间ID为空,获取第一个匹配名称的MCP服务器 60 return getFirstMcpServerByName(name); 61 } 62 63 // 检查缓存是否启用 64 if (!cacheEnabled) { 65 // 缓存未启用,直接从数据库查询 66 LOGGER.debug("缓存已禁用,直接从数据库查询名称: {}:{}", namespaceId, name); 67 // 从数据库获取MCP服务器信息 68 return getMcpServerByNameFromDatabase(namespaceId, name); 69 } 70 // 优先查询缓存 71 // 从缓存中获取MCP服务器数据 72 McpServerIndexData cachedData = cacheIndex.getMcpServerByName(namespaceId, name); 73 // 检查缓存中是否存在数据 74 if (cachedData != null) { 75 // 缓存命中,返回缓存数据 76 LOGGER.debug("缓存命中名称: {}:{}", namespaceId, name); 77 // 返回缓存中的数据 78 return cachedData; 79 } 80 // 缓存未命中,查询数据库 81 // 缓存未命中,从数据库查询 82 LOGGER.debug("缓存未命中名称: {}:{}, 查询数据库", namespaceId, name); 83 // 从数据库获取MCP服务器信息 84 McpServerIndexData dbData = getMcpServerByNameFromDatabase(namespaceId, name); 85 // 检查数据库中是否存在数据 86 if (dbData != null) { 87 // 数据库中存在数据,更新缓存 88 cacheIndex.updateIndex(namespaceId, name, dbData.getId()); 89 // 记录缓存更新日志 90 LOGGER.debug("更新缓存名称: {}:{}", namespaceId, name); 91 } 92 // 返回数据库查询结果 93 return dbData; 94 } 95 96 @Override 97 // 重写父类方法,在搜索完成后更新缓存 98 protected void afterSearch(List<McpServerIndexData> indexDataList, String name) { 99 // 更新缓存 100 // 检查缓存是否启用 101 if (cacheEnabled) { 102 // 遍历搜索结果列表 103 for (McpServerIndexData indexData : indexDataList) { 104 // 更新缓存索引 105 cacheIndex.updateIndex(indexData.getNamespaceId(), name, indexData.getId()); 106 } 107 // 记录缓存更新日志 108 LOGGER.debug("从搜索结果中更新了{}个缓存条目", indexDataList.size()); 109 } 110 } 111 112 // 私有方法,启动定时同步任务 113 private void startSyncTask() { 114 // 创建定时任务,固定延迟执行 115 syncTask = scheduledExecutor.scheduleWithFixedDelay(() -> { 116 try { 117 // 记录开始缓存同步任务的日志 118 LOGGER.debug("开始缓存同步任务"); 119 // 同步数据库中的缓存 120 syncCacheFromDatabase(); 121 // 记录缓存同步任务完成的日志 122 LOGGER.debug("缓存同步任务完成"); 123 } catch (Exception e) { 124 // 记录缓存同步任务错误的日志 125 LOGGER.error("缓存同步任务期间发生错误", e); 126 } 127 }, syncInterval, syncInterval, TimeUnit.SECONDS); 128 // 记录缓存同步任务启动的日志 129 LOGGER.info("缓存同步任务已启动,间隔: {}秒", syncInterval); 130 } 131 132 // 私有方法,从数据库同步缓存 133 private void syncCacheFromDatabase() { 134 // 记录开始从数据库同步缓存的日志 135 LOGGER.debug("从数据库同步缓存"); 136 // 获取有序的命名空间列表 137 List<String> namespaceList = fetchOrderedNamespaceList(); 138 // 遍历命名空间列表 139 for (String namespaceId : namespaceList) { 140 try { 141 // 按页面搜索MCP服务器名称,使用模糊搜索 142 searchMcpServerByNameWithPage(namespaceId, null, 143 Constants.MCP_LIST_SEARCH_BLUR, 1, 1000); 144 } catch (Exception e) { 145 // 记录命名空间缓存同步错误的日志 146 LOGGER.error("命名空间缓存同步错误: {}", namespaceId, e); 147 } 148 } 149 } 150 151 @Override 152 // 重写接口方法,按页面搜索MCP服务器 153 public Page<McpServerIndexData> searchMcpServerByNameWithPage(String namespaceId, String name, String search, 154 int pageNo, int limit) { 155 // 调用searchMcpServers方法搜索配置信息 156 Page<ConfigInfo> serverInfos = searchMcpServers(namespaceId, name, search, pageNo, limit); 157 // 将ConfigInfo列表映射为McpServerIndexData列表 158 List<McpServerIndexData> indexDataList = serverInfos.getPageItems().stream() 159 .map(this::mapMcpServerVersionConfigToIndexData).toList(); 160 // 创建结果页面对象 161 Page<McpServerIndexData> result = new Page<>(); 162 // 设置页面条目 163 result.setPageItems(indexDataList); 164 // 设置总条目数 165 result.setTotalCount(serverInfos.getTotalCount()); 166 // 计算并设置可用页面数 167 result.setPagesAvailable((int) Math.ceil((double) serverInfos.getTotalCount() / (double) limit)); 168 // 设置当前页码 169 result.setPageNumber(pageNo); 170 // 调用afterSearch回调方法,用于子类处理搜索后的逻辑 171 afterSearch(indexDataList, name); 172 // 返回结果 173 return result; 174 } 175 176} 177
2.1.3 控制器层
McpAdminController是 MCP 模块的管理控制器,提供 RESTful API 接口:
1// 提供模型服务的增删改查接口 2@RestController 3@RequestMapping(Constants.MCP_ADMIN_PATH) 4public class McpAdminController { 5 private final McpServerOperationService mcpServerOperationService; 6 7 /** 8 * 列出MCP服务器列表 9 * 10 * @param mcpListForm 列出MCP服务器的请求表单 11 * @param pageForm 请求的分页信息 12 * @return 包含MCP服务器列表的{@link Result}结果 13 * @throws NacosApiException 如果请求参数无效或处理出错 14 */ 15 // GET请求映射到/list路径 16 @GetMapping(value = "/list") 17 // 安全注解,指定需要读权限,签名类型为AI,API类型为管理API 18 @Secured(action = ActionTypes.READ, signType = SignType.AI, apiType = ApiType.ADMIN_API) 19 // 列出MCP服务器的方法 20 public Result<Page<McpServerBasicInfo>> listMcpServers(McpListForm mcpListForm, PageForm pageForm) 21 throws NacosException { 22 // 验证MCP列表表单参数 23 mcpListForm.validate(); 24 // 验证分页表单参数 25 pageForm.validate(); 26 // 返回成功结果,包含分页的MCP服务器列表 27 return Result.success( 28 mcpServerOperationService.listMcpServerWithPage(mcpListForm.getNamespaceId(), mcpListForm.getMcpName(), mcpListForm.getSearch(), 29 pageForm.getPageNo(), pageForm.getPageSize())); 30 } 31 32 /** 33 * 获取指定MCP服务器的详细信息 34 * 35 * @param mcpForm 获取MCP服务器的请求表单 36 * @return 包含详细信息的{@link McpServerDetailInfo} 37 * @throws NacosException 处理过程中任何异常 38 */ 39 // GET请求映射到根路径 40 @GetMapping 41 // 安全注解,指定需要读权限,签名类型为AI,API类型为管理API 42 @Secured(action = ActionTypes.READ, signType = SignType.AI, apiType = ApiType.ADMIN_API) 43 // 获取MCP服务器的方法 44 public Result<McpServerDetailInfo> getMcpServer(McpForm mcpForm) throws NacosException { 45 // 验证MCP表单参数 46 mcpForm.validate(); 47 // 返回成功结果,包含MCP服务器详细信息 48 return Result.success(mcpServerOperationService.getMcpServerDetail(mcpForm.getNamespaceId(), mcpForm.getMcpId(), 49 mcpForm.getMcpName(), mcpForm.getVersion())); 50 } 51 52 /** 53 * 创建新的MCP服务器 54 * 55 * @param mcpForm 创建MCP服务器的请求表单 56 * @throws NacosException 处理过程中任何异常 57 */ 58 // POST请求映射到根路径 59 @PostMapping 60 // 安全注解,指定需要写权限,签名类型为AI,API类型为管理API 61 @Secured(action = ActionTypes.WRITE, signType = SignType.AI, apiType = ApiType.ADMIN_API) 62 // 创建MCP服务器的方法 63 public Result<String> createMcpServer(McpDetailForm mcpForm) throws NacosException { 64 // 验证MCP详细表单参数 65 mcpForm.validate(); 66 // 解析MCP服务器基本信息 67 McpServerBasicInfo basicInfo = McpRequestUtil.parseMcpServerBasicInfo(mcpForm); 68 // 解析MCP工具规范 69 McpToolSpecification mcpTools = McpRequestUtil.parseMcpTools(mcpForm); 70 // 解析MCP端点规范 71 McpEndpointSpec endpointSpec = McpRequestUtil.parseMcpEndpointSpec(basicInfo, mcpForm); 72 // 调用服务创建MCP服务器,获取MCP服务器ID 73 String mcpId = mcpServerOperationService.createMcpServer(mcpForm.getNamespaceId(), basicInfo, mcpTools, 74 endpointSpec); 75 // 返回成功结果,包含MCP服务器ID 76 return Result.success(mcpId); 77 } 78 79 /** 80 * 更新已存在的MCP服务器 81 * 82 * @param mcpForm 更新MCP服务器的请求表单 83 * @throws NacosException 处理过程中任何异常 84 */ 85 // PUT请求映射到根路径 86 @PutMapping 87 // 安全注解,指定需要写权限,签名类型为AI,API类型为管理API 88 @Secured(action = ActionTypes.WRITE, signType = SignType.AI, apiType = ApiType.ADMIN_API) 89 // 更新MCP服务器的方法 90 public Result<String> updateMcpServer(McpUpdateForm mcpForm) throws NacosException { 91 // 验证MCP更新表单参数 92 mcpForm.validate(); 93 // 解析MCP服务器基本信息 94 McpServerBasicInfo basicInfo = McpRequestUtil.parseMcpServerBasicInfo(mcpForm); 95 // 解析MCP工具规范 96 McpToolSpecification mcpTools = McpRequestUtil.parseMcpTools(mcpForm); 97 // 解析MCP端点规范 98 McpEndpointSpec endpointSpec = McpRequestUtil.parseMcpEndpointSpec(basicInfo, mcpForm); 99 // 调用服务更新MCP服务器 100 mcpServerOperationService.updateMcpServer(mcpForm.getNamespaceId(), mcpForm.getLatest(), basicInfo, mcpTools, 101 endpointSpec, mcpForm.isOverrideExisting()); 102 // 返回成功结果 103 return Result.success("ok"); 104 } 105 106 /** 107 * 删除已存在的MCP服务器 108 * 109 * @param mcpForm 删除MCP服务器的请求表单 110 * @throws NacosException 处理过程中任何异常 111 */ 112 // DELETE请求映射到根路径 113 @DeleteMapping 114 // 安全注解,指定需要写权限,签名类型为AI,API类型为管理API 115 @Secured(action = ActionTypes.WRITE, signType = SignType.AI, apiType = ApiType.ADMIN_API) 116 // 删除MCP服务器的方法 117 public Result<String> deleteMcpServer(McpForm mcpForm) throws NacosException { 118 // 验证MCP表单参数 119 mcpForm.validate(); 120 // 调用服务删除MCP服务器 121 mcpServerOperationService.deleteMcpServer(mcpForm.getNamespaceId(), mcpForm.getMcpName(), mcpForm.getMcpId(), mcpForm.getVersion()); 122 // 返回成功结果 123 return Result.success("ok"); 124 } 125} 126
2.2 A2A (Agent to Agent)
A2A 模块实现了 AI 代理间的通信机制,支持代理的注册、发现和通信。
2.2.1 核心功能
- 代理卡片管理
- 代理身份编码
- 端点请求处理
- 代理间通信
2.2.2 关键类分析
A2aServerOperationService
这是 A2A 模块的核心服务类,负责代理服务的操作管理:
1// 提供代理服务的注册、查询、更新和删除等方法 2@Service 3public class A2aServerOperationService { 4 // 实现注册Agent的方法 5 public void registerAgent(AgentCard agentCard, String namespaceId, String registrationType) throws NacosException { 6 try { 7 // 1. 注册Agent的基本信息 8 // 构建Agent卡片版本信息 9 AgentCardVersionInfo agentCardVersionInfo = AgentCardUtil.buildAgentCardVersionInfo(agentCard, 10 registrationType, true); 11 // 转换为配置表单 12 ConfigForm configForm = transferVersionInfoToConfigForm(agentCardVersionInfo, namespaceId); 13 // 创建配置请求信息,设置不允许更新已存在的配置 14 ConfigRequestInfo versionConfigRequest = new ConfigRequestInfo(); 15 versionConfigRequest.setUpdateForExist(Boolean.FALSE); 16 // 发布配置 17 configOperationService.publishConfig(configForm, versionConfigRequest, null); 18 19 // 2. 注册Agent的版本信息 20 // 构建Agent卡片详细信息 21 AgentCardDetailInfo agentCardDetailInfo = AgentCardUtil.buildAgentCardDetailInfo(agentCard, 22 registrationType); 23 // 转换为版本配置表单 24 ConfigForm configFormVersion = transferAgentInfoToConfigForm(agentCardDetailInfo, namespaceId); 25 // 创建Agent卡片配置请求信息,设置不允许更新已存在的配置 26 ConfigRequestInfo agentCardConfigRequest = new ConfigRequestInfo(); 27 agentCardConfigRequest.setUpdateForExist(Boolean.FALSE); 28 // 记录操作开始时间 29 long startOperationTime = System.currentTimeMillis(); 30 // 发布版本配置 31 configOperationService.publishConfig(configFormVersion, agentCardConfigRequest, null); 32 33 // 同步效果服务 34 syncEffectService.toSync(configFormVersion, startOperationTime); 35 } catch (ConfigAlreadyExistsException e) { 36 // 如果配置已存在,抛出资源冲突异常 37 throw new NacosApiException(NacosException.CONFLICT, ErrorCode.RESOURCE_CONFLICT, 38 String.format("AgentCard name %s already exist", agentCard.getName())); 39 } 40 } 41 42 // 实现删除Agent的方法 43 public void deleteAgent(String namespaceId, String agentName, String version) throws NacosException { 44 // 对Agent名称进行编码 45 String encodedName = agentIdCodecHolder.encode(agentName); 46 47 // 构建配置查询请求 48 ConfigQueryChainRequest request = ConfigQueryChainRequest.buildConfigQueryChainRequest(encodedName, AGENT_GROUP, 49 namespaceId); 50 // 处理查询请求 51 ConfigQueryChainResponse response = configQueryChainService.handle(request); 52 53 // 如果配置未找到,直接返回 54 if (response.getStatus() == ConfigQueryChainResponse.ConfigQueryStatus.CONFIG_NOT_FOUND) { 55 return; 56 } 57 58 // 将响应内容转换为Agent卡片版本信息对象 59 AgentCardVersionInfo agentCardVersionInfo = JacksonUtils.toObj(response.getContent(), 60 AgentCardVersionInfo.class); 61 // 获取所有版本列表 62 List<String> allVersions = agentCardVersionInfo.getVersionDetails().stream().map(AgentVersionDetail::getVersion) 63 .toList(); 64 65 // 1. 如果指定了版本,只删除对应版本的Agent 66 if (StringUtils.isNotEmpty(version)) { 67 // 构造版本数据ID 68 String versionDataId = encodedName + "-" + version; 69 // 删除指定版本的配置 70 configOperationService.deleteConfig(versionDataId, AGENT_VERSION_GROUP, namespaceId, null, null, "nacos", 71 null); 72 73 // 获取版本详情列表 74 List<AgentVersionDetail> versionDetails = agentCardVersionInfo.getVersionDetails(); 75 76 // 判断是否为最新版本 77 boolean isLatestVersion = version.equals(agentCardVersionInfo.getLatestPublishedVersion()); 78 79 // 如果只有一个版本且就是要删除的版本,则删除整个Agent信息 80 if (versionDetails.size() == 1 && versionDetails.get(0).getVersion().equals(version)) { 81 configOperationService.deleteConfig(encodedName, AGENT_GROUP, namespaceId, null, null, "nacos", null); 82 } else { 83 // 从版本详情列表中移除指定版本 84 agentCardVersionInfo.getVersionDetails() 85 .removeIf(versionDetail -> versionDetail.getVersion().equals(version)); 86 87 // 如果删除的是最新版本,则清空最新版本信息 88 if (isLatestVersion) { 89 agentCardVersionInfo.setLatestPublishedVersion(null); 90 agentCardVersionInfo.setVersion(null); 91 } 92 93 // 更新配置表单 94 ConfigForm updateForm = transferVersionInfoToConfigForm(agentCardVersionInfo, namespaceId); 95 // 创建配置请求信息,设置允许更新已存在的配置 96 ConfigRequestInfo configRequestInfo = new ConfigRequestInfo(); 97 configRequestInfo.setUpdateForExist(Boolean.TRUE); 98 // 发布更新后的配置 99 configOperationService.publishConfig(updateForm, configRequestInfo, null); 100 } 101 } else { 102 // 2. 如果未指定版本,删除所有版本和Agent信息 103 // 遍历所有版本并删除 104 for (String each : allVersions) { 105 String versionDataId = encodedName + "-" + each; 106 configOperationService.deleteConfig(versionDataId, AGENT_VERSION_GROUP, namespaceId, null, null, 107 "nacos", null); 108 } 109 110 // 删除Agent基本信息 111 configOperationService.deleteConfig(encodedName, AGENT_GROUP, namespaceId, null, null, "nacos", null); 112 } 113 } 114 115 // 实现更新Agent卡片的方法 116 public void updateAgentCard(AgentCard agentCard, String namespaceId, String registrationType, boolean setAsLatest) 117 throws NacosException { 118 // 查询现有的Agent卡片版本信息 119 final AgentCardVersionInfo existingAgentInfo = queryAgentCardVersionInfo(namespaceId, agentCard.getName()); 120 121 // 检查版本是否存在,如果不存在,则添加新版本到版本信息中 122 boolean versionExisted = existingAgentInfo.getVersionDetails().stream().anyMatch( 123 agentVersionDetail -> StringUtils.equals(agentVersionDetail.getVersion(), agentCard.getVersion())); 124 if (!versionExisted) { 125 existingAgentInfo.getVersionDetails().add(AgentCardUtil.buildAgentVersionDetail(agentCard, setAsLatest)); 126 } 127 128 // 如果输入的新注册类型为空,则使用现有的注册类型 129 if (StringUtils.isEmpty(registrationType)) { 130 registrationType = existingAgentInfo.getRegistrationType(); 131 } 132 // 构建Agent卡片详细信息 133 AgentCardDetailInfo agentCardDetailInfo = AgentCardUtil.buildAgentCardDetailInfo(agentCard, registrationType); 134 // 复制属性,排除versionDetails和latestPublishedVersion字段 135 BeanUtils.copyProperties(agentCardDetailInfo, existingAgentInfo, "versionDetails", "latestPublishedVersion"); 136 137 // 如果设置为最新版本 138 if (setAsLatest) { 139 // 设置最新发布的版本 140 existingAgentInfo.setLatestPublishedVersion(agentCard.getVersion()); 141 142 // 更新版本详情列表 143 List<AgentVersionDetail> updatedVersionDetails = existingAgentInfo.getVersionDetails().stream() 144 .peek(detail -> { 145 if (StringUtils.equals(detail.getVersion(), agentCard.getVersion())) { 146 // 只更新对应版本 147 detail.setLatest(true); 148 AgentCardUtil.updateUpdateTime(detail); 149 } else { 150 detail.setLatest(false); 151 } 152 }).toList(); 153 existingAgentInfo.setVersionDetails(updatedVersionDetails); 154 } 155 156 // 更新Agent版本信息 157 ConfigForm configForm = transferVersionInfoToConfigForm(existingAgentInfo, namespaceId); 158 ConfigRequestInfo configRequestInfo = new ConfigRequestInfo(); 159 configRequestInfo.setUpdateForExist(Boolean.TRUE); 160 configOperationService.publishConfig(configForm, configRequestInfo, null); 161 162 // 更新Agent信息 163 ConfigForm versionConfigForm = transferAgentInfoToConfigForm(agentCardDetailInfo, namespaceId); 164 ConfigRequestInfo versionConfigRequestInfo = new ConfigRequestInfo(); 165 versionConfigRequestInfo.setUpdateForExist(Boolean.TRUE); 166 long startOperationTime = System.currentTimeMillis(); 167 configOperationService.publishConfig(versionConfigForm, versionConfigRequestInfo, null); 168 169 syncEffectService.toSync(versionConfigForm, startOperationTime); 170 } 171 172 // 实现列出Agents的方法 173 public Page<AgentCardVersionInfo> listAgents(String namespaceId, String agentName, String search, int pageNo, 174 int pageSize) throws NacosException { 175 176 String dataId; 177 // 如果Agent名称为空或搜索类型为模糊搜索 178 if (StringUtils.isEmpty(agentName) || Constants.A2A.SEARCH_BLUR.equalsIgnoreCase(search)) { 179 search = Constants.A2A.SEARCH_BLUR; 180 // 构造模糊搜索的数据ID 181 dataId = Constants.ALL_PATTERN + agentIdCodecHolder.encodeForSearch(agentName) + Constants.ALL_PATTERN; 182 } else { 183 search = Constants.A2A.SEARCH_ACCURATE; 184 // 构造精确搜索的数据ID 185 dataId = agentIdCodecHolder.encode(agentName); 186 } 187 188 // 查找配置信息页面 189 Page<ConfigInfo> configInfoPage = configDetailService.findConfigInfoPage(search, pageNo, pageSize, dataId, 190 AGENT_GROUP, namespaceId, null); 191 192 // 将配置信息转换为Agent卡片版本信息列表 193 List<AgentCardVersionInfo> versionInfos = configInfoPage.getPageItems().stream() 194 .map(configInfo -> JacksonUtils.toObj(configInfo.getContent(), AgentCardVersionInfo.class)).toList(); 195 196 // 构造返回结果页面 197 Page<AgentCardVersionInfo> result = new Page<>(); 198 result.setPageItems(versionInfos); 199 result.setTotalCount(configInfoPage.getTotalCount()); 200 result.setPagesAvailable((int) Math.ceil((double) configInfoPage.getTotalCount() / (double) pageSize)); 201 result.setPageNumber(pageNo); 202 203 return result; 204 } 205 206 // 实现列出Agent版本的方法 207 public List<AgentVersionDetail> listAgentVersions(String namespaceId, String name) throws NacosApiException { 208 // 查询Agent卡片版本信息 209 AgentCardVersionInfo agentCardVersionInfo = queryAgentCardVersionInfo(namespaceId, name); 210 // 返回版本详情列表 211 return agentCardVersionInfo.getVersionDetails(); 212 } 213 214} 215
请求处理器
A2A 模块通过一系列请求处理器来处理各种远程调用:
- AgentEndpointRequestHandler: 处理代理端点请求
- QueryAgentCardRequestHandler: 处理代理卡片查询请求
- ReleaseAgentCardRequestHandler: 处理代理卡片释放请求
1// 处理代理端点请求的具体实现 2@Component 3public class AgentEndpointRequestHandler extends RequestHandler<AgentEndpointRequest, AgentEndpointResponse> { 4 @Override 5 // 命名空间验证注解,确保请求的命名空间有效 6 @NamespaceValidation 7 // 参数提取器,用于RPC请求参数的提取和验证 8 @ExtractorManager.Extractor(rpcExtractor = AgentRequestParamExtractor.class) 9 // 安全注解,指定该操作需要写权限,签名类型为AI 10 @Secured(action = ActionTypes.WRITE, signType = SignType.AI) 11 public AgentEndpointResponse handle(AgentEndpointRequest request, RequestMeta meta) throws NacosException { 12 // 创建响应对象 13 AgentEndpointResponse response = new AgentEndpointResponse(); 14 // 设置响应类型与请求类型一致 15 response.setType(request.getType()); 16 // 填充命名空间ID 17 AgentRequestUtil.fillNamespaceId(request); 18 try { 19 // 验证请求参数的有效性 20 validateRequest(request); 21 // 将请求转换为Instance实例 22 Instance instance = transferInstance(request); 23 // 构造服务名称,格式为"编码后的代理名::版本号" 24 String serviceName = 25 agentIdCodecHolder.encode(request.getAgentName()) + "::" + request.getEndpoint().getVersion(); 26 // 创建服务对象,指定命名空间、组名和服名 27 Service service = Service.newService(request.getNamespaceId(), Constants.A2A.AGENT_ENDPOINT_GROUP, 28 serviceName); 29 // 根据请求类型执行相应的操作 30 switch (request.getType()) { 31 // 注册端点 32 case AiRemoteConstants.REGISTER_ENDPOINT: 33 doRegisterEndpoint(service, instance, meta); 34 break; 35 // 注销端点 36 case AiRemoteConstants.DE_REGISTER_ENDPOINT: 37 doDeregisterEndpoint(service, instance, meta); 38 break; 39 // 参数类型错误,抛出异常 40 default: 41 throw new NacosApiException(NacosException.INVALID_PARAM, ErrorCode.PARAMETER_VALIDATE_ERROR, 42 String.format("parameter [`type`](https://xplanc.org/primers/document/zh/10.Bash/90.%E5%B8%AE%E5%8A%A9%E6%89%8B%E5%86%8C/EX.type.md) should be %s or %s, but was %s", 43 AiRemoteConstants.REGISTER_ENDPOINT, AiRemoteConstants.DE_REGISTER_ENDPOINT, 44 request.getType())); 45 } 46 } catch (NacosApiException e) { 47 // 设置错误信息 48 response.setErrorInfo(e.getErrCode(), e.getErrMsg()); 49 // 记录错误日志 50 LOGGER.error("[{}] Register agent endpoint to agent {} error: {}", meta.getConnectionId(), 51 request.getAgentName(), e.getErrMsg()); 52 } 53 // 返回响应结果 54 return response; 55 } 56 57 // 将AgentEndpointRequest转换为Instance对象 58 private Instance transferInstance(AgentEndpointRequest request) throws NacosApiException { 59 // 创建Instance实例 60 Instance instance = new Instance(); 61 // 获取端点信息 62 AgentEndpoint endpoint = request.getEndpoint(); 63 // 设置IP地址 64 instance.setIp(endpoint.getAddress()); 65 // 设置端口号 66 instance.setPort(endpoint.getPort()); 67 // 处理路径信息,如果为空则设置为空字符串 68 String path = StringUtils.isBlank(endpoint.getPath()) ? StringUtils.EMPTY : endpoint.getPath(); 69 // 构造元数据映射 70 Map<String, String> metadata = Map.of(Constants.A2A.AGENT_ENDPOINT_PATH_KEY, path, 71 Constants.A2A.AGENT_ENDPOINT_TRANSPORT_KEY, endpoint.getTransport(), 72 Constants.A2A.NACOS_AGENT_ENDPOINT_SUPPORT_TLS, String.valueOf(endpoint.isSupportTls())); 73 // 设置元数据 74 instance.setMetadata(metadata); 75 // 验证实例的有效性 76 instance.validate(); 77 // 返回构造好的实例 78 return instance; 79 } 80 81 // 验证请求参数的有效性 82 private void validateRequest(AgentEndpointRequest request) throws NacosApiException { 83 // 检查代理名称是否为空 84 if (StringUtils.isBlank(request.getAgentName())) { 85 throw new NacosApiException(NacosException.INVALID_PARAM, ErrorCode.PARAMETER_MISSING, 86 "Required parameter [agentName](file:///Users/zhiyixie/Downloads/work_space/码云/Nacos/common/src/main/java/com/alibaba/nacos/common/paramcheck/ParamInfo.java#L50-L50) can't be empty or null"); 87 } 88 // 检查端点信息是否为空 89 if (null == request.getEndpoint()) { 90 throw new NacosApiException(NacosException.INVALID_PARAM, ErrorCode.PARAMETER_MISSING, 91 "Required parameter [endpoint](file:///Users/zhiyixie/Downloads/work_space/码云/Nacos/api/src/main/java/com/alibaba/nacos/api/annotation/NacosProperties.java#L193-L193) can't be null"); 92 } 93 // 检查端点版本是否为空 94 if (StringUtils.isBlank(request.getEndpoint().getVersion())) { 95 throw new NacosApiException(NacosException.INVALID_PARAM, ErrorCode.PARAMETER_MISSING, 96 "Required parameter `endpoint.version` can't be empty or null"); 97 } 98 } 99 100 // 执行端点注册操作 101 private void doRegisterEndpoint(Service service, Instance instance, RequestMeta meta) throws NacosException { 102 // 调用客户端操作服务注册实例 103 clientOperationService.registerInstance(service, instance, meta.getConnectionId()); 104 105 } 106 107 // 执行端点注销操作 108 private void doDeregisterEndpoint(Service service, Instance instance, RequestMeta meta) { 109 // 调用客户端操作服务注销实例 110 clientOperationService.deregisterInstance(service, instance, meta.getConnectionId()); 111 } 112 113} 114
3. 关键源码剖析
3.1 模型服务注册流程
模型服务注册的核心流程如下:
- 客户端通过
McpAdminController发起注册请求 - 请求被转发到
McpServerOperationService进行处理 - 服务信息被持久化到配置中心
- 更新索引信息以便后续查询
1// McpServerOperationService 中的注册方法示例 2public McpServerDetailInfo createMcpServer(McpForm mcpForm) throws NacosException { 3 // 验证表单数据 4 // 构造服务信息对象 5 // 保存到配置中心 6 // 更新索引 7 // 返回详细信息 8} 9
3.2 代理通信处理流程
代理间通信的处理流程如下:
- 代理发起通信请求到
AgentEndpointRequestHandler - 处理器解析请求参数
- 查询目标代理信息
- 建立通信连接并返回响应
1// AgentEndpointRequestHandler 中的处理方法示例 2@Override 3public AgentEndpointResponse handle(agentEndpointRequest request, RequestMeta meta) throws NacosException { 4 // 解析请求参数 5 // 查询代理信息 6 // 构造响应数据 7 // 返回响应 8} 9
4. 架构设计亮点
4.1 分层架构设计
Nacos AI 模块采用清晰的分层架构:
1Controller Layer (REST API) 2 ↓ 3Service Layer (业务逻辑) 4 ↓ 5Index Layer (索引管理) 6 ↓ 7Persistence Layer (数据持久化) 8
这种设计使得各层职责分明,便于维护和扩展。
4.2 缓存机制优化
通过多级缓存机制提升查询性能:
- 内存缓存:快速响应高频查询
- 数据库查询:保证数据一致性
- 缓存更新策略:平衡性能与一致性
4.3 插件化设计
AI 模块继承了 Nacos 的插件化设计理念,支持:
- 自定义索引实现
- 扩展认证机制
- 灵活的配置管理
5. 使用场景
5.1 微服务AI治理
Nacos AI 模块能够与 Nacos 微服务架构无缝集成,提供 AI 服务的注册发现、配置管理等功能。
5.2 多模型版本管理
支持同一 AI 模型的多个版本管理,方便进行灰度发布和 A/B 测试。
5.3 跨域代理通信
通过 A2A 模块,不同域的 AI 代理可以安全高效地进行通信协作。
6. 未来发展趋势
6.1 AI服务网格集成
未来 Nacos AI 模块将进一步与服务网格技术集成,提供更完善的 AI 服务治理能力。
6.2 智能负载均衡
基于 AI 模型的性能指标实现智能负载均衡策略。
6.3 自动扩缩容
根据模型服务的负载情况自动调整实例数量。
7. 结语与学习建议
Nacos AI 模块为 AI 服务提供了完整的生命周期管理解决方案,从模型注册、版本管理到服务发现和配置管理,极大地简化了 AI 服务的运维复杂度。
7.1. 动手实践(快速搭建本地环境)
1# 1. 克隆 Nacos 源码 2git clone https://github.com/alibaba/nacos.git 3cd nacos 4 5# 2. 编译所有模块,包含AI 6mvn -Prelease-nacos clean install -DskipTests 7 8# 3. 启动单机模式 9cd distribution/target/nacos-server-3.x.x/nacos/bin 10sh startup.sh -m standalone 11
7.2. 调试技巧
- 关注
nacos-ai模块中的McpServerOperationService和A2aServerOperationService类,通过断点调试理解核心流程 - 查看日志输出,重点关注索引更新和配置变更相关的日志信息
7.3. 延伸阅读
- Nacos 3.x 官方文档(最权威的版本更新与功能说明)
- 《深入理解 Nacos 源码》(机械工业出版社,2025)(从源码角度解析 Nacos 设计思想)
🌟 核心学习原则:理解 AI 模块的设计理念和架构思想,而不仅仅是使用其功能接口。例如:
- 为何要用多级索引机制?(提升查询性能)
- 为何要分离 MCP 和 A2A?(解耦模型管理和代理通信)
- 缓存机制如何平衡性能与一致性?(多级缓存策略)
📝 版权声明
本文为原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文链接及本声明。
✅ 本文持续更新,欢迎收藏 + 关注专栏【后端高阶面经:实战篇】
《【微服务】【Nacos 3】 ② 深度解析:AI模块介绍》 是转载文章,点击查看原文。