目录
问题背景
环境配置
使用的依赖
测试对象
初始代码(有问题的版本)
问题分析
1. 初步排查
2. 关键发现
3. RabbitTemplate的默认行为分析
4. SimpleMessageConverter的处理机制
深入理解消息转换
消息转换器的层次结构:
而直接发送 Message:
解决方案
方案1:直接使用Message对象(推荐)
方案2:配置自定义MessageConverter
问题根因总结
经验教训
结论
最后最后附上序列化工具:
问题背景
在日常开发中,我们经常使用RabbitMQ作为消息中间件进行系统间的通信。最近,我在使用Protostuff进行对象序列化,并通过RabbitMQ传输时遇到了一个棘手的问题:反序列化失败!
- 发送端:序列化后的数据长度为8 bytes
- 接收端:接收到的数据长度变为14 bytes
- 错误信息:
1Caused by: io.protostuff.ProtobufException: While parsing a protocol message, the input ended unexpectedly in the middle of a field. This could mean either than the input has been truncated or that an embedded message misreported its own length. 2 at io.protostuff.ProtobufException.truncatedMessage(ProtobufException.java:76) 3Caused by: java.lang.ArrayIndexOutOfBoundsException: Index 69 out of bounds for length 14
环境配置
使用的依赖
1<dependency> 2 <groupId>io.protostuff</groupId> 3 <artifactId>protostuff-core</artifactId> 4 <version>1.8.0</version> 5</dependency> 6<dependency> 7 <groupId>io.protostuff</groupId> 8 <artifactId>protostuff-runtime</artifactId> 9 <version>1.8.0</version> 10</dependency>
测试对象
1@Data 2public class TestVO { 3 private String name; 4 private Integer age; 5 6 public TestVO() { 7 } 8}
初始代码(有问题的版本)
发送端:
1TestVO test = new TestVO(); 2test.setName("test"); 3test.setAge(1); 4byte[] data = ProtostuffUtils.serialize(test); 5System.out.println("发送消息大小: " + data.length + " bytes"); // 输出:8 bytes 6rabbitTemplate.convertAndSend(rabbitMQConfigProperties.getExchange().getFanout().getPSet(), "", data);
接收端:
1@RabbitListener(queues = "${my-rabbitmq-config.queue.p-set}") 2@RabbitHandler 3public void pSetListener(Message message) { 4 byte[] body = message.getBody(); 5 System.out.println("接收消息大小: " + body.length + " bytes"); // 输出:14 bytes! 6 TestVO result = ProtostuffUtils.deserialize(body, TestVO.class); // 这里报错! 7}
问题分析
1. 初步排查
首先,我排除了Protostuff本身的问题:
- 在本地直接序列化后立即反序列化:✓ 正常工作
- 序列化数据在不同JVM间传输:✓ 正常工作
这说明Protostuff序列化机制本身没有问题。
2. 关键发现
通过对比发送和接收的数据长度:
- 发送:8 bytes
- 接收:14 bytes
数据在传输过程中被修改了!这指向了RabbitMQ的消息处理机制。
3. RabbitTemplate的默认行为分析
深入研究RabbitTemplate的源码后发现,当使用convertAndSend()方法时,会经过以下流程:
1// RabbitTemplate内部逻辑 2public void convertAndSend(String exchange, String routingKey, Object object, @Nullable CorrelationData correlationData) throws AmqpException { 3 this.send(exchange, routingKey, this.convertMessageIfNecessary(object), correlationData); 4 } 5 6 7protected Message convertMessageIfNecessary(Object object) { 8 if (object instanceof Message msg) { 9 return msg; 10 } else { 11 return this.getRequiredMessageConverter().toMessage(object, new MessageProperties()); 12 } 13 } 14 15
convertMessageIfNecessary()方法会使用配置的MessageConverter对消息进行转换。默认使用的是SimpleMessageConverter。
4. SimpleMessageConverter的处理机制
对于byte[]类型的数据,SimpleMessageConverter可能会进行以下处理:
1protected Message createMessage(Object object, MessageProperties messageProperties) throws MessageConversionException { 2 if (object instanceof byte[] bytes) { 3 messageProperties.setContentType("application/octet-stream"); 4 } else if (object instanceof String) { 5 try { 6 bytes = ((String)object).getBytes(this.defaultCharset); 7 } catch (UnsupportedEncodingException e) { 8 throw new MessageConversionException("failed to convert to Message content", e); 9 } 10 11 messageProperties.setContentType("text/plain"); 12 messageProperties.setContentEncoding(this.defaultCharset); 13 } else if (object instanceof Serializable) { 14 try { 15 bytes = SerializationUtils.serialize(object); 16 } catch (IllegalArgumentException e) { 17 throw new MessageConversionException("failed to convert to serialized Message content", e); 18 } 19 20 messageProperties.setContentType("application/x-java-serialized-object"); 21 } 22 23 if (bytes != null) { 24 messageProperties.setContentLength((long)bytes.length); 25 return new Message(bytes, messageProperties); 26 } else { 27 String var10002 = this.getClass().getSimpleName(); 28 throw new IllegalArgumentException(var10002 + " only supports String, byte[] and Serializable payloads, received: " + object.getClass().getName());
深入理解消息转换
消息转换器的层次结构:
1RabbitTemplate.convertAndSend() 2 ↓ 3MessageConverter.convertToMessage() 4 ↓ 5SimpleMessageConverter.toMessage() // 这里进行了数据包装! 6 ↓ 7创建最终的 Message 对象
而直接发送 Message:
1RabbitTemplate.send() 2 ↓ 3直接使用提供的 Message 对象 // 跳过转换步骤! 4 ↓ 5发送到 RabbitMQ
解决方案
方案1:直接使用Message对象(推荐)
修改后的发送端代码:
1@Component 2public class FixedMessageSender { 3 4 @Autowired 5 private RabbitTemplate rabbitTemplate; 6 7 public void sendProtostuffMessage(Object object, String exchange, String routingKey) { 8 // 序列化对象 9 byte[] data = ProtostuffUtils.serialize(object); 10 System.out.println("发送消息大小: " + data.length + " bytes"); 11 12 // 创建Message对象,明确指定内容类型 13 MessageProperties properties = new MessageProperties(); 14 properties.setContentType("application/x-protostuff"); 15 properties.setContentLength(data.length); 16 Message message = new Message(data, properties); 17 18 // 直接发送Message对象,跳过自动转换 19 rabbitTemplate.send(exchange, routingKey, message); 20 } 21}
方案2:配置自定义MessageConverter
1@Configuration 2public class RabbitMQConfig { 3 4 /** 5 * 配置二进制消息转换器,避免对byte[]进行额外处理 6 */ 7 @Bean 8 public MessageConverter binaryMessageConverter() { 9 return new MessageConverter() { 10 @Override 11 public Message toMessage(Object object, MessageProperties messageProperties) { 12 if (object instanceof byte[]) { 13 // 对于byte[],直接包装,不做任何处理 14 return new Message((byte[]) object, messageProperties); 15 } 16 // 其他类型使用默认处理 17 return new SimpleMessageConverter().toMessage(object, messageProperties); 18 } 19 20 @Override 21 public Object fromMessage(Message message) { 22 // 直接返回消息体,不进行任何解码 23 return message.getBody(); 24 } 25 }; 26 } 27}
在接收端也保持一致性:
1@RabbitListener(queues = "queue_name") 2public void handleBinaryMessage(Message message) { 3 byte[] body = message.getBody(); // 直接获取原始数据 4 // 进行反序列化 5 MyObject obj = ProtostuffUtils.deserialize(body, MyObject.class); 6}
问题根因总结
- 直接原因:RabbitTemplate的
convertAndSend()方法内部的SimpleMessageConverter对byte[]数据进行了额外处理 - 根本原因:消息转换器对二进制数据的默认处理策略与Protostuff的原始格式不兼容
- 数据变化:8 bytes → 14 bytes 的具体原因可能是:
- Base64编码或其他编码转换
- 添加消息头信息
- 数据包装和格式化
经验教训
- 不要假设:不要假设RabbitTemplate会原样传输byte[]数据
- 明确指定:对于二进制数据,总是明确指定内容类型
- 跳过转换:使用Message对象直接发送,跳过自动转换步骤
- 添加调试:在关键位置添加数据长度和内容的调试信息
- 版本兼容:确保发送端和接收端使用相同的序列化版本
结论
通过这个问题的解决,我们深刻认识到:在使用消息中间件传输二进制数据时,必须了解其内部的消息转换机制。直接使用Message对象而不是依赖自动转换,可以确保数据的完整性和一致性。
这个经验不仅适用于Protostuff,同样适用于Protocol Buffers、Avro等其他二进制序列化框架在RabbitMQ中的使用。
最后最后附上序列化工具:
1import io.protostuff.LinkedBuffer; 2import io.protostuff.ProtostuffIOUtil; 3import io.protostuff.Schema; 4import io.protostuff.runtime.RuntimeSchema; 5import org.slf4j.Logger; 6import org.slf4j.LoggerFactory; 7 8import java.util.Map; 9import java.util.concurrent.ConcurrentHashMap; 10 11/** 12 * Protostuff序列化工具类 13 * 基于Protostuff实现高效的对象序列化与反序列化 14 * 15 * @author wp 16 * @Description: 17 * @date 2025-10-15 15:53 18 */ 19public class ProtostuffSerializer { 20 private static final Logger log = LoggerFactory.getLogger(ProtostuffSerializer.class); 21 22 /** 23 * 缓存Schema,避免重复创建提高性能 24 */ 25 private static final Map<Class<?>, Schema<?>> SCHEMA_CACHE = new ConcurrentHashMap<>(); 26 27 /** 28 * 默认缓冲区大小 29 */ 30 private static final int DEFAULT_BUFFER_SIZE = 512; 31 32 /** 33 * 获取类的Schema,如果缓存中没有则创建并缓存 34 * 35 * @param clazz 类对象 36 * @param <T> 泛型类型 37 * @return 类对应的Schema 38 */ 39 @SuppressWarnings("unchecked") 40 private static <T> Schema<T> getSchema(Class<T> clazz) { 41 Schema<T> schema = (Schema<T>) SCHEMA_CACHE.get(clazz); 42 if (schema == null) { 43 schema = RuntimeSchema.getSchema(clazz); 44 if (schema != null) { 45 SCHEMA_CACHE.put(clazz, schema); 46 } else { 47 throw new IllegalArgumentException("无法为类 " + clazz.getName() + " 创建Schema,请检查该类是否有默认无参构造函数"); 48 } 49 } 50 return schema; 51 } 52 53 /** 54 * 将对象序列化为字节数组 55 * 56 * @param obj 要序列化的对象 57 * @param <T> 对象类型 58 * @return 序列化后的字节数组,若对象为null则返回null 59 */ 60 public static <T> byte[] serialize(T obj) { 61 if (obj == null) { 62 log.warn("序列化对象为null"); 63 return null; 64 } 65 66 Class<T> clazz = (Class<T>) obj.getClass(); 67 LinkedBuffer buffer = null; 68 byte[] data = null; 69 70 try { 71 Schema<T> schema = getSchema(clazz); 72 buffer = LinkedBuffer.allocate(DEFAULT_BUFFER_SIZE); 73 data = ProtostuffIOUtil.toByteArray(obj, schema, buffer); 74 log.debug("对象[{}]序列化成功,长度: {}字节", clazz.getName(), data.length); 75 } catch (Exception e) { 76 log.error("对象序列化失败", e); 77 throw new RuntimeException("对象序列化失败: " + e.getMessage(), e); 78 } finally { 79 // 释放缓冲区资源 80 if (buffer != null) { 81 buffer.clear(); 82 } 83 } 84 85 return data; 86 } 87 88 /** 89 * 将字节数组反序列化为指定类型的对象 90 * 91 * @param data 序列化后的字节数组 92 * @param clazz 目标对象类型 93 * @param <T> 泛型类型 94 * @return 反序列化后的对象,若字节数组为null或空则返回null 95 */ 96 public static <T> T deserialize(byte[] data, Class<T> clazz) { 97 if (data == null || data.length == 0) { 98 log.warn("反序列化字节数组为null或空"); 99 return null; 100 } 101 102 T obj = null; 103 104 try { 105 obj = clazz.getDeclaredConstructor().newInstance(); 106 Schema<T> schema = getSchema(clazz); 107 ProtostuffIOUtil.mergeFrom(data, obj, schema); 108 log.debug("字节数组反序列化为[{}]成功", clazz.getName()); 109 } catch (Exception e) { 110 log.error("字节数组反序列化失败", e); 111 throw new RuntimeException("字节数组反序列化失败: " + e.getMessage(), e); 112 } 113 114 return obj; 115 } 116}