RabbitMQ消息传输中Protostuff序列化数据异常的深度解析与解决方案

作者:Mr.4567日期:2025/10/18

目录

问题背景

环境配置

使用的依赖

测试对象

初始代码(有问题的版本)

问题分析

1. 初步排查

2. 关键发现

3. RabbitTemplate的默认行为分析

4. SimpleMessageConverter的处理机制

深入理解消息转换

消息转换器的层次结构:

而直接发送 Message:

解决方案

方案1:直接使用Message对象(推荐)

方案2:配置自定义MessageConverter

问题根因总结

经验教训

结论

最后最后附上序列化工具:


问题背景

在日常开发中,我们经常使用RabbitMQ作为消息中间件进行系统间的通信。最近,我在使用Protostuff进行对象序列化,并通过RabbitMQ传输时遇到了一个棘手的问题:反序列化失败!

  • 发送端:序列化后的数据长度为8 bytes
  • 接收端:接收到的数据长度变为14 bytes
  • 错误信息
1Caused by: io.protostuff.ProtobufException: While parsing a protocol message, the input ended unexpectedly in the middle of a field. This could mean either than the input has been truncated or that an embedded message misreported its own length.
2	at io.protostuff.ProtobufException.truncatedMessage(ProtobufException.java:76)
3Caused by: java.lang.ArrayIndexOutOfBoundsException: Index 69 out of bounds for length 14

环境配置

使用的依赖

1<dependency>
2    <groupId>io.protostuff</groupId>
3    <artifactId>protostuff-core</artifactId>
4    <version>1.8.0</version>
5</dependency>
6<dependency>
7    <groupId>io.protostuff</groupId>
8    <artifactId>protostuff-runtime</artifactId>
9    <version>1.8.0</version>
10</dependency>

测试对象

1@Data
2public class TestVO {
3    private String name;
4    private Integer age;
5    
6    public TestVO() {
7    }
8}

初始代码(有问题的版本)

发送端:

1TestVO test = new TestVO();
2test.setName("test");
3test.setAge(1);
4byte[] data = ProtostuffUtils.serialize(test);
5System.out.println("发送消息大小: " + data.length + " bytes"); // 输出:8 bytes
6rabbitTemplate.convertAndSend(rabbitMQConfigProperties.getExchange().getFanout().getPSet(), "", data);

接收端:

1@RabbitListener(queues = "${my-rabbitmq-config.queue.p-set}")
2@RabbitHandler
3public void pSetListener(Message message) {
4    byte[] body = message.getBody();
5    System.out.println("接收消息大小: " + body.length + " bytes"); // 输出:14 bytes!
6    TestVO result = ProtostuffUtils.deserialize(body, TestVO.class); // 这里报错!
7}

问题分析

1. 初步排查

首先,我排除了Protostuff本身的问题:

  • 在本地直接序列化后立即反序列化:✓ 正常工作
  • 序列化数据在不同JVM间传输:✓ 正常工作

这说明Protostuff序列化机制本身没有问题。

2. 关键发现

通过对比发送和接收的数据长度:

  • 发送:8 bytes
  • 接收:14 bytes

数据在传输过程中被修改了!这指向了RabbitMQ的消息处理机制。

3. RabbitTemplate的默认行为分析

深入研究RabbitTemplate的源码后发现,当使用convertAndSend()方法时,会经过以下流程:

1// RabbitTemplate内部逻辑
2public void convertAndSend(String exchange, String routingKey, Object object, @Nullable CorrelationData correlationData) throws AmqpException {
3        this.send(exchange, routingKey, this.convertMessageIfNecessary(object), correlationData);
4    }
5
6
7protected Message convertMessageIfNecessary(Object object) {
8        if (object instanceof Message msg) {
9            return msg;
10        } else {
11            return this.getRequiredMessageConverter().toMessage(object, new MessageProperties());
12        }
13    }
14
15

convertMessageIfNecessary()方法会使用配置的MessageConverter对消息进行转换。默认使用的是SimpleMessageConverter

4. SimpleMessageConverter的处理机制

对于byte[]类型的数据,SimpleMessageConverter可能会进行以下处理:

1protected Message createMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
2        if (object instanceof byte[] bytes) {
3            messageProperties.setContentType("application/octet-stream");
4        } else if (object instanceof String) {
5            try {
6                bytes = ((String)object).getBytes(this.defaultCharset);
7            } catch (UnsupportedEncodingException e) {
8                throw new MessageConversionException("failed to convert to Message content", e);
9            }
10
11            messageProperties.setContentType("text/plain");
12            messageProperties.setContentEncoding(this.defaultCharset);
13        } else if (object instanceof Serializable) {
14            try {
15                bytes = SerializationUtils.serialize(object);
16            } catch (IllegalArgumentException e) {
17                throw new MessageConversionException("failed to convert to serialized Message content", e);
18            }
19
20            messageProperties.setContentType("application/x-java-serialized-object");
21        }
22
23        if (bytes != null) {
24            messageProperties.setContentLength((long)bytes.length);
25            return new Message(bytes, messageProperties);
26        } else {
27            String var10002 = this.getClass().getSimpleName();
28            throw new IllegalArgumentException(var10002 + " only supports String, byte[] and Serializable payloads, received: " + object.getClass().getName());

深入理解消息转换

消息转换器的层次结构:

1RabbitTemplate.convertAndSend()
23MessageConverter.convertToMessage()
45SimpleMessageConverter.toMessage()  // 这里进行了数据包装!
67创建最终的 Message 对象

而直接发送 Message:

1RabbitTemplate.send()
23直接使用提供的 Message 对象  // 跳过转换步骤!
45发送到 RabbitMQ

解决方案

方案1:直接使用Message对象(推荐)

修改后的发送端代码:

1@Component
2public class FixedMessageSender {
3    
4    @Autowired
5    private RabbitTemplate rabbitTemplate;
6    
7    public void sendProtostuffMessage(Object object, String exchange, String routingKey) {
8        // 序列化对象
9        byte[] data = ProtostuffUtils.serialize(object);
10        System.out.println("发送消息大小: " + data.length + " bytes");
11        
12        // 创建Message对象,明确指定内容类型
13        MessageProperties properties = new MessageProperties();
14        properties.setContentType("application/x-protostuff");
15        properties.setContentLength(data.length);
16        Message message = new Message(data, properties);
17        
18        // 直接发送Message对象,跳过自动转换
19        rabbitTemplate.send(exchange, routingKey, message);
20    }
21}

方案2:配置自定义MessageConverter

1@Configuration
2public class RabbitMQConfig {
3    
4    /**
5     * 配置二进制消息转换器,避免对byte[]进行额外处理
6     */
7    @Bean
8    public MessageConverter binaryMessageConverter() {
9        return new MessageConverter() {
10            @Override
11            public Message toMessage(Object object, MessageProperties messageProperties) {
12                if (object instanceof byte[]) {
13                    // 对于byte[],直接包装,不做任何处理
14                    return new Message((byte[]) object, messageProperties);
15                }
16                // 其他类型使用默认处理
17                return new SimpleMessageConverter().toMessage(object, messageProperties);
18            }
19            
20            @Override
21            public Object fromMessage(Message message) {
22                // 直接返回消息体,不进行任何解码
23                return message.getBody();
24            }
25        };
26    }
27}

在接收端也保持一致性:

1@RabbitListener(queues = "queue_name")
2public void handleBinaryMessage(Message message) {
3    byte[] body = message.getBody();  // 直接获取原始数据
4    // 进行反序列化
5    MyObject obj = ProtostuffUtils.deserialize(body, MyObject.class);
6}

问题根因总结

  1. 直接原因:RabbitTemplate的convertAndSend()方法内部的SimpleMessageConverter对byte[]数据进行了额外处理
  2. 根本原因:消息转换器对二进制数据的默认处理策略与Protostuff的原始格式不兼容
  3. 数据变化:8 bytes → 14 bytes 的具体原因可能是:
    • Base64编码或其他编码转换
    • 添加消息头信息
    • 数据包装和格式化

经验教训

  1. 不要假设:不要假设RabbitTemplate会原样传输byte[]数据
  2. 明确指定:对于二进制数据,总是明确指定内容类型
  3. 跳过转换:使用Message对象直接发送,跳过自动转换步骤
  4. 添加调试:在关键位置添加数据长度和内容的调试信息
  5. 版本兼容:确保发送端和接收端使用相同的序列化版本

结论

通过这个问题的解决,我们深刻认识到:在使用消息中间件传输二进制数据时,必须了解其内部的消息转换机制。直接使用Message对象而不是依赖自动转换,可以确保数据的完整性和一致性。

这个经验不仅适用于Protostuff,同样适用于Protocol Buffers、Avro等其他二进制序列化框架在RabbitMQ中的使用。

最后最后附上序列化工具:

1import io.protostuff.LinkedBuffer;
2import io.protostuff.ProtostuffIOUtil;
3import io.protostuff.Schema;
4import io.protostuff.runtime.RuntimeSchema;
5import org.slf4j.Logger;
6import org.slf4j.LoggerFactory;
7
8import java.util.Map;
9import java.util.concurrent.ConcurrentHashMap;
10
11/**
12 * Protostuff序列化工具类
13 * 基于Protostuff实现高效的对象序列化与反序列化
14 *
15 * @author wp
16 * @Description:
17 * @date 2025-10-15 15:53
18 */
19public class ProtostuffSerializer {
20    private static final Logger log = LoggerFactory.getLogger(ProtostuffSerializer.class);
21
22    /**
23     * 缓存Schema,避免重复创建提高性能
24     */
25    private static final Map<Class<?>, Schema<?>> SCHEMA_CACHE = new ConcurrentHashMap<>();
26
27    /**
28     * 默认缓冲区大小
29     */
30    private static final int DEFAULT_BUFFER_SIZE = 512;
31
32    /**
33     * 获取类的Schema,如果缓存中没有则创建并缓存
34     *
35     * @param clazz 类对象
36     * @param <T>   泛型类型
37     * @return 类对应的Schema
38     */
39    @SuppressWarnings("unchecked")
40    private static <T> Schema<T> getSchema(Class<T> clazz) {
41        Schema<T> schema = (Schema<T>) SCHEMA_CACHE.get(clazz);
42        if (schema == null) {
43            schema = RuntimeSchema.getSchema(clazz);
44            if (schema != null) {
45                SCHEMA_CACHE.put(clazz, schema);
46            } else {
47                throw new IllegalArgumentException("无法为类 " + clazz.getName() + " 创建Schema,请检查该类是否有默认无参构造函数");
48            }
49        }
50        return schema;
51    }
52
53    /**
54     * 将对象序列化为字节数组
55     *
56     * @param obj 要序列化的对象
57     * @param <T> 对象类型
58     * @return 序列化后的字节数组,若对象为null则返回null
59     */
60    public static <T> byte[] serialize(T obj) {
61        if (obj == null) {
62            log.warn("序列化对象为null");
63            return null;
64        }
65
66        Class<T> clazz = (Class<T>) obj.getClass();
67        LinkedBuffer buffer = null;
68        byte[] data = null;
69
70        try {
71            Schema<T> schema = getSchema(clazz);
72            buffer = LinkedBuffer.allocate(DEFAULT_BUFFER_SIZE);
73            data = ProtostuffIOUtil.toByteArray(obj, schema, buffer);
74            log.debug("对象[{}]序列化成功,长度: {}字节", clazz.getName(), data.length);
75        } catch (Exception e) {
76            log.error("对象序列化失败", e);
77            throw new RuntimeException("对象序列化失败: " + e.getMessage(), e);
78        } finally {
79            // 释放缓冲区资源
80            if (buffer != null) {
81                buffer.clear();
82            }
83        }
84
85        return data;
86    }
87
88    /**
89     * 将字节数组反序列化为指定类型的对象
90     *
91     * @param data  序列化后的字节数组
92     * @param clazz 目标对象类型
93     * @param <T>   泛型类型
94     * @return 反序列化后的对象,若字节数组为null或空则返回null
95     */
96    public static <T> T deserialize(byte[] data, Class<T> clazz) {
97        if (data == null || data.length == 0) {
98            log.warn("反序列化字节数组为null或空");
99            return null;
100        }
101
102        T obj = null;
103
104        try {
105            obj = clazz.getDeclaredConstructor().newInstance();
106            Schema<T> schema = getSchema(clazz);
107            ProtostuffIOUtil.mergeFrom(data, obj, schema);
108            log.debug("字节数组反序列化为[{}]成功", clazz.getName());
109        } catch (Exception e) {
110            log.error("字节数组反序列化失败", e);
111            throw new RuntimeException("字节数组反序列化失败: " + e.getMessage(), e);
112        }
113
114        return obj;
115    }
116}

RabbitMQ消息传输中Protostuff序列化数据异常的深度解析与解决方案》 是转载文章,点击查看原文


相关推荐


Apache Doris 与 ClickHouse:运维与开源闭源对比
SelectDB技术团队2025/10/16

引言 在当今数据驱动的商业环境中,OLAP(在线分析处理)数据库的选择对企业的数据分析能力和运维成本有着深远影响。Apache Doris 和 ClickHouse 作为业界领先的高性能 OLAP 数据库,各自在不同场景下展现出独特优势。 Apache Doris 以其优秀的宽表查询能力、多表 JOIN 性能、实时更新、search 以及湖加速特性而著称。ClickHouse 同样在宽表处理方面表现出色,其丰富的分析函数库和高性能单表聚合能力备受青睐。 然而,从运维角度来看,两者在存算分离


统一高效图像生成与编辑!百度&新加坡国立提出Query-Kontext,多项任务“反杀”专用模型
AI生成未来2025/10/15

论文链接:https://arxiv.org/pdf/2509.26641 亮点直击 Query-Kontext,一种经济型集成多模态模型(UMM),能够将视觉语言模型(VLMs)中的多模态生成推理与扩散模型执行的高保真视觉渲染相分离。 提出了一种三阶段渐进式训练策略,该策略逐步将 VLM 与越来越强大的扩散生成器对齐,同时增强它们在生成推理和视觉合成方面的各自优势。 提出了一种精心策划的数据集收集方案,以收集真实、合成和经过仔细筛选的开源数据集,涵盖多样的多模态参考到图像


微美全息(NASDAQ:WIMI)融合区块链+AI+IoT 三大技术,解锁物联网入侵检测新范式
爱看科技2025/10/14

在全面数字化转型的浪潮中,区块链、网络安全、人工智能与机器学习不再是孤立的技术概念,而是相互交织、共同推动行业进步的强大引擎。这些技术的紧密结合,特别是在物联网(IoT)领域的应用,正引领着一场前所未有的安全、效率与智能化变革。   实际,区块链技术以其去中心化、安全性和不可篡改性,为物联网数据存储和共享提供了全新的解决方案。而人工智能与机器学习技术的应用,使得物联网系统具备了自我学习和优化的能力。机器学习算法能够分析海量数据,识别出潜在的安全威胁或性能瓶颈,为系统提供精准的决策支持。


基于旗鱼算法优化卷积神经网络结合长短期记忆网络与注意力机制(CNN-LSTM-Attention)的风电场发电功率预测
智能算法研学社(Jack旭)2025/10/12

基于旗鱼算法优化卷积神经网络结合长短期记忆网络与注意力机制(CNN-LSTM-Attention)的风电场发电功率预测 文章目录 基于旗鱼算法优化卷积神经网络结合长短期记忆网络与注意力机制(CNN-LSTM-Attention)的风电场发电功率预测1.CNN原理2.LSTM原理3.注意力机制4.CNN-LSTM-Attention5.风电功率预测5.1 数据集6.基于旗鱼算法优化的CNN-LSTM-Attention7.实验结果8.Matlab代码 1.CNN原理 卷积神经


C++ const 用法全面总结与深度解析
oioihoii2025/10/10

1. const 基础概念 const 关键字用于定义不可修改的常量,是C++中确保数据只读性和程序安全性的核心机制。它可以应用于变量、指针、函数参数、返回值、成员函数等多种场景,深刻影响代码的正确性和性能。 1.1 本质与编译期处理 const变量在编译时会被编译器严格检查,任何修改尝试都会导致编译错误。与C语言不同,C++中的const变量(尤其是全局const)通常不会分配内存,而是直接嵌入到指令中(类似#define),但在以下情况会分配内存: 取const变量地址时 const变量为


php artisan db:seed执行的时候遇到报错
快支棱起来2025/10/9

INFO Seeding database. Illuminate\Database\QueryException SQLSTATE[42S22]: Column not found: 1054 Unknown column 'email_verified_at' in 'field list' (Connection: mysql, SQL: insert into users (name, email, email_verified_at, password, remember_token,


apache POI 万字总结:满足你对报表一切幻想
大鱼七成饱2025/10/7

背景 国庆期间接了个兼职,处理机构的几张Excel报表。初次沟通,感觉挺简单,接入Easyexcel(FastExcel),然后拼lamda表达式就跑出来了。不过毕竟工作了这些年,感觉没这么简单。后面找业务方详细聊了一次,将需求落到纸面上。逐行研究了下BRD,有点挠头,跑数加各种样式,兼容新老版本,老方案是不行了。综合对比,最终选了老牌的 Apache POI 实现,下面说下为啥选POI,还有POI怎么用,包含样式、公式、动态表头、安全防范、百万级数据导入导出等功能。 一、技术选型 如果实现该


【转载】前验光师如何通过聪明模仿而非蛮干构建月收入3.5万美元的SaaS应用
是魔丸啊2025/10/6

转载 大多数人都认为你需要在科技领域拥有突破性的想法才能成功。 Samuel Rondot的想法与众不同。他的整个行动手册建立在一个简单的规则上:不要重新发明轮子——只要让它变得1%更好。 这种心态帮助他辞去了验光师的工作,从零开始自学编程,并推出了三个现在每月收入3.5万美元的SaaS应用。 以下是他如何做到的。 从验光师到自学程序员 Samuel从未计划成为一名程序员。几年前,他在眼镜行业工作,完全没有编程经验。 什么改变了?他想重建一个自己正在使用的Instagram工具——这一次,完全靠


免费领源码-Spring boot的物流管理系统 |可做计算机毕设Java、Python、PHP、小程序APP、C#、爬虫大数据、单片机、文案
vx_dmxq2112025/10/5

目   录 摘  要 Abstract 1  前言 1.1 设计目的 1.2 设计思路 1.3 国内外研究现状 2  相关技术 2.1  Java语言 2.2 MySQL数据库 2.3 Spring Boot框架 2.4 B/S模式 3  系统分析 3.1  可行性分析 3.2  系统需求分析 3.2.1  功能性分析 3.2.2  非功能性需求分析 3.3  系统用例分析 3.3.1  注


第一章 机器学习基础理论:机器学习概述(一)
FPGA+护理+人工智能2025/10/3

第一章 机器学习基础理论:机器学习概述 文章目录 第一章 机器学习基础理论:机器学习概述具体的专栏内容请参考: 人工智能专栏一、目标二、重点与难点三、内容1. 机器学习概述2. 机器学习在精神病护理领域的应用 前面python的基础内容算是完成了,接下来将要进入机器学习部分了。 具体的专栏内容请参考: 人工智能专栏 一、目标 通过本章学习,能够: 理解机器学习的基本概念和发展历程 了解机器学习在精神病护理领域的应用现状与前

首页编辑器站点地图

Copyright © 2025 聚合阅读

License: CC BY-SA 4.0