RabbitMQ消息反序列化失败问题回顾

一、背景java

Springboot4RabbitMQ为消息的生产者git

RabbitProducer为消息的消费者github

消息队列为:order-queuespring

二、问题app

生产者生产的消息发送到RabbitMQ后,消费者在读取消息消息时反序列化消息失败ide

消息类:post

public class Order implements Serializable
{

    private static final long serialVersionUID = -7128203829971899888L;

    private String id;

    private String name;

    private String messageId;

    public String getId()
    {
        return id;
    }

    public void setId(String id)
    {
        this.id = id == null ? null : id.trim();
    }

    public String getName()
    {
        return name;
    }

    public void setName(String name)
    {
        this.name = name == null ? null : name.trim();
    }

    public String getMessageId()
    {
        return messageId;
    }

    public void setMessageId(String messageId)
    {
        this.messageId = messageId == null ? null : messageId.trim();
    }

    @Override
    public String toString()
    {
        return "Order [id=" + id + ", name=" + name + ", messageId=" + messageId + "]";
    }

}

生产者发送消息后RabbitMQ中的数据为测试

能够看到order-queue队列中有一条待确认的消息,ui

消费者异常信息为:this

org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void com.rabbit.producer.RabbitProducer.receiver.OrderRecevier.onOrderMessage(com.rabbit.producer.RabbitProducer.entity.Order,com.rabbitmq.client.Channel,java.util.Map<java.lang.String, java.lang.Object>) throws java.lang.Exception]
Bean [com.rabbit.producer.RabbitProducer.receiver.OrderRecevier@600b7b3d]
	at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:185) ~[spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE]
	at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:120) ~[spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE]
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1414) ~[spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE]
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1337) ~[spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE]
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1324) ~[spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE]
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1303) ~[spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE]
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:817) [spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE]
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:801) [spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE]
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$700(SimpleMessageListenerContainer.java:77) [spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE]
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1042) [spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE]
	at java.lang.Thread.run(Thread.java:745) [na:1.8.0_71]
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [com.rabbit.Springboot4RabbitMQ.entity.Order] to [com.rabbit.producer.RabbitProducer.entity.Order] for GenericMessage [payload=Order [id=RabbitMQTestId0002, name=HelloWorld, messageId=1538919928275$0836e0e7-4976-457e-92fb-44b937255855], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedRoutingKey=order.ABC, amqp_receivedExchange=order-exchange, amqp_deliveryTag=1, amqp_consumerQueue=order-queue, amqp_redelivered=false, id=0ffe4dcd-048f-f274-bca9-5550f9ecebb1, amqp_consumerTag=amq.ctag-82Oo3kl1I138E2pvVRsczA, contentType=application/x-java-serialized-object, timestamp=1538919929083}]
	at org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver.resolveArgument(PayloadArgumentResolver.java:144) ~[spring-messaging-5.0.8.RELEASE.jar:5.0.8.RELEASE]
	at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:116) ~[spring-messaging-5.0.8.RELEASE.jar:5.0.8.RELEASE]
	at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:137) ~[spring-messaging-5.0.8.RELEASE.jar:5.0.8.RELEASE]
	at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:109) ~[spring-messaging-5.0.8.RELEASE.jar:5.0.8.RELEASE]
	at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:51) ~[spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE]
	at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:182) ~[spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE]
	... 10 common frames omitted

2018-10-07 21:45:29.111  WARN 5264 --- [cTaskExecutor-2] ingErrorHandler$DefaultExceptionStrategy : Fatal message conversion error; message rejected; it will be dropped or routed to a dead letter exchange, if so configured: (Body:'[B@75b3a922(byte[206])' MessageProperties [headers={}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=order-exchange, receivedRoutingKey=order.ABC, deliveryTag=1, consumerTag=amq.ctag-82Oo3kl1I138E2pvVRsczA, consumerQueue=order-queue])

从异常信息中能够看到是消费者对消息反序列化的时候失败了。虽然两个项目中的Order类是彻底同样的,但在进行反序列化的时候仍是失败了

三、解决方案

方案一、消费者引用生产者项目中的消息体即Order.java

        在消费者项目上【右键】->【Bulid Path】->【Configure Build Path】->【Projects】->【Add】 选择生产者项目,而后消费者项目就能够引用生产者项目中类,这样彻底保证了两个项目中JavaBean是一致的,因此能解决反序列失败的问题

方案二、生产者在发送消息前将消息体转换为JSONObject,消费者以JSONObject接收消息,再转换为对应的JavaBean

四、测试结果

     1)生产者将消息转换为JSONObject

public void sendOrder(Order order)
        throws Exception
    {
        rabbitTemplate.setConfirmCallback(confirmCallback);
        // 消息惟一ID
        CorrelationData correlationData = new CorrelationData(order.getMessageId());
        rabbitTemplate.convertAndSend("order-exchange", "order.ABC", FastJsonConvertUtil.toJsonObject(order), correlationData);
    }

    2) 消费者使用JSONObject接收消息再转换成对应的JavaBean

        

@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "${spring.rabbitmq.listener.order.queue.name}", durable = "${spring.rabbitmq.listener.order.queue.durable}"), exchange = @Exchange(value = "${spring.rabbitmq.listener.order.exchange.name}", durable = "${spring.rabbitmq.listener.order.exchange.durable}", type = "${spring.rabbitmq.listener.order.exchange.type}", ignoreDeclarationExceptions = "${spring.rabbitmq.listener.order.exchange.ignoreDeclarationeExceptions}"), key = "${spring.rabbitmq.listener.order.key}"))
    public void onOrderMessage(@Payload JSONObject object, Channel channel, @Headers Map<String, Object> headers)
        throws Exception
    {
        System.err.println("----------------------------------");
        Order order = JsonConvertUtils.convertJSONToObject(object);
        System.err.println("消费端Order: " + order.toString());
        Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
        channel.basicAck(deliveryTag, false);

    }

     3) postman发送用于生产的消息体:

     

    4)消费者控制台成功打印消息体内容

    

 

项目的源代码地址:https://github.com/KillMonkey/Springboot4RabbitMQ

相关文章
相关标签/搜索