RabbitMQ与spring的集成,,基础。

小弟 前段时间使用mq是由于要在Jfianl架构中使用,但Jfinal并不擅长,因此使用的是工具类建立的连接和通道。又写了消费者和生产者的公共方法。java

如今有一个业务。对接银行的时候,因异步回调。致使客户在对一张A表操做 和银行回调对A表的操做产生并发。导致A表出现一个seq_no重复。余额也计算错误。领导要求集成MQ,小弟终于在3天后集成了一个基础的demo。如今记录一下:spring

首先 maven项目确定要引入jar包的apache

<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit</artifactId>
    <version>1.4.5.RELEASE</version>
</dependency>

而后请看spring的配置:json

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:p="http://www.springframework.org/schema/p"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
           http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
           http://www.springframework.org/schema/rabbit
           http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd"
>
    <description>rabbitmq 链接服务配置</description>
    <rabbit:connection-factory id="connectionFactory"
           username="${mq.name}" password="${mq.pwd}" host="${mq.url}" port="${mq.port}"/>

    <rabbit:admin connection-factory="connectionFactory"/>

    <!-- spring template声明-->
    <rabbit:template exchange="${mq.user.bill.exchange.name}" id="amqpTemplate"  connection-factory="connectionFactory"
                     message-converter="jsonMessageConverter" />

    <!-- 消息对象json转换类 -->
    <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" />

    <!-- 业务队列 -->
    <rabbit:queue id="user_bill_queue" name="user_bill_queue" durable="true" auto-delete="false" exclusive="false">
       <!-- <rabbit:queue-arguments>
            &lt;!&ndash; 设置死信交换机 &ndash;&gt;
            <entry key="x-dead-letter-exchange">
                <value type="java.lang.String">dead_letter_userbill_exchange</value>
            </entry>
            &lt;!&ndash; 设置死信交换机的路由键 &ndash;&gt;
            <entry key="x-dead-letter-routing-key">
                <value type="java.lang.String">userbill_queue_fail</value>
            </entry>
        </rabbit:queue-arguments>-->
    </rabbit:queue>

    <!-- 死信队列 -->
    <!--<rabbit:queue id="user_bill_dead_queue" name="user_bill_dead_queue" durable="true" auto-delete="false" exclusive="false" />-->


    <!-- 死信交换机配置 -->
    <!--<rabbit:direct-exchange name="dead_letter_userbill_exchange" durable="true" auto-delete="false" id="dead_letter_exchange">
        <rabbit:bindings>
            <rabbit:binding queue="user_bill_dead_queue" key="userbill_queue_fail"/>
        </rabbit:bindings>
    </rabbit:direct-exchange>-->

    <!-- 正常交换机配置 -->
    <rabbit:direct-exchange name="${mq.user.bill.exchange.name}" durable="true" auto-delete="false" id="${mq.user.bill.exchange.name}">
        <rabbit:bindings>
            <rabbit:binding queue="user_bill_queue" key="${mq.user.bill.routing.key}"/>
        </rabbit:bindings>
    </rabbit:direct-exchange>

    <!-- 配置监听 手动ack  prefetch="1" 表示消费一条-->
    <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1" >
        <rabbit:listener queues="user_bill_queue" ref="queueListenter"/>
        <!--<rabbit:listener queues="user_bill_dead_queue" ref="deadUserBillQueueListenter"/>-->
    </rabbit:listener-container>

</beans>

一、强调一下命名空间很少。够用就行。这里只单单配置了mq 与spring其余文件集成可以使用import不要重复引用便可缓存

二、这里粘出来的 有异常msg的处理。也就是死信队列。后面会提到服务器

以上基本都是固定配置。获取连接,建立admin(在消息代理中如何利用协议来配置队列,交换和绑定。实现将自动声明在一个应用上下文的Queues,Exchanges,Bindings。具体功能我也不清楚。一直没搞懂) 建立生产者模板,建立队列。建立指定路由key的交换器 并绑定队列,消息对象转json的bean等等。架构

三、若是想要引入消息失效时间,须要在定义队列的地方添加属性<rabbit:queue-arguments>,并指定并发

<entry key="x-message-ttl">
    <value type="java.lang.Integer">60000</value>
</entry>

表示该队列中的信息失效时间为1min。dom

要引入队列的等级 须要的key=x-max-priority。异步

下面来讲下 死信队列。当有消息再消费端处理失败时。若是要ackNack的话(true),会致使不断消费这个消息,一直产生错误,一个死循环。

这时,使用死信队列就能够处理。

一、定义业务队列的时候绑定一个死信交换机。并绑定一个路由key,注意x-dead-letter-exchange和x-dead-letter-routing-key是固定参数

<rabbit:queue-arguments>
    <!-- 设置死信交换机 -->
    <entry key="x-dead-letter-exchange">
        <value type="java.lang.String">dead_letter_userbill_exchange</value>
    </entry>
    <!-- 设置死信交换机的路由键 -->
    <entry key="x-dead-letter-routing-key">
        <value type="java.lang.String">userbill_queue_fail</value>
    </entry>
</rabbit:queue-arguments>

二、设置一个死信队列,用来接收死信交换机转发来的异常信息(想要队列的其余属性能够自定义配置)

<rabbit:queue id="user_bill_dead_queue" name="user_bill_dead_queue" durable="true" auto-delete="false" exclusive="false" />

三、定义一个死信交换机,名称与业务队列中定义的一致,绑定死信队列和路由key(与业务队列中定义的死信交换机的路由key一致)

<rabbit:direct-exchange name="dead_letter_userbill_exchange" durable="true" auto-delete="false" id="dead_letter_exchange">
    <rabbit:bindings>
        <rabbit:binding queue="user_bill_dead_queue" key="userbill_queue_fail"/>
    </rabbit:bindings>
</rabbit:direct-exchange>

四、在监听器中将死信队列归入监听 监听器中的ref bean 都是经过@Component注解注入的。

<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1" >
    <rabbit:listener queues="user_bill_queue" ref="queueListenter"/>
    <rabbit:listener queues="user_bill_dead_queue" ref="deadUserBillQueueListenter"/>
</rabbit:listener-container>

这样就完成了失败消息转发到死信队列中。在设计另外一个消费者deadUserBillQueueListenter 进行消息处理便可,可设计,在处理一次失败就将期ackreject

这里要提醒一下,当设计有自定义交换机时,生产者传入的就不是队列名称 ,而是交换机名称和路由key,只有在使用默认交换机时才使用队列名称

生产者代码:

package com.qiantu.core.rabbitmq;

/**
 * @Description: 给队列发送消息接口类
 * @Date: create in 2018-07-30 16:36
 * @Author:Reynold-白
 */
public interface MQProducer {


    /**
     * 发送消息到指定队列
     * @param queueKey
     * @param object
     */
    void sendDataToQueue(String exchangeName, String routingKey, Object object);

}
package com.qiantu.core.rabbitmq;

import com.alibaba.fastjson.JSON;
import org.apache.log4j.Logger;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.support.GenericXmlApplicationContext;
import org.springframework.stereotype.Service;

/**
 * @Description: 发送消息实现
 * @Date: create in 2018-07-30 16:37
 * @Author:Reynold-白
 */
@Service("mqProducer")
public class MQProducerImpl implements MQProducer{
    private final static Logger log = Logger.getLogger(MQProducerImpl.class);

    @Autowired
    private AmqpTemplate amqpTemplate;

    @Override
    public void sendDataToQueue(String exchangeName, String routingKey, Object object) {
        try {
            log.info("========向MQ发送消息【开始】========消息:" + object.toString());

            amqpTemplate.convertAndSend(exchangeName, routingKey,object);
            log.info("========向MQ发送消息【完成】========消息:");
        } catch (Exception e) {
            log.error("=======发送消息失败======", e);
            e.printStackTrace();
        }
    }


}

消费者代码:

package com.qiantu.core.rabbitmq;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.qiantu.core.constants.UserBillConstants;
import com.qiantu.core.model.RabbitMQConsumerFailData;
import com.qiantu.core.service.UserBillSerivce;
import com.qiantu.core.utils.IdGenerator;
import com.rabbitmq.client.Channel;
import org.apache.log4j.Logger;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

/**
 * @Description: userBill消息监听消费
 * @Date: create in 2018-07-30 17:08
 * @Author:Reynold-白
 */
@Component
public class QueueListenter implements ChannelAwareMessageListener {

    protected static Logger log = Logger.getLogger(QueueListenter.class);

    @Autowired
    private UserBillSerivce userBillSerivce;

    @Override
    public void onMessage(Message message, Channel channel) {
        String msgStr = "";
        try{
            msgStr = new String(message.getBody(), "UTF-8");
            log.info("=====获取消息" + msgStr);

            Map<String, String> userBillParams = JSONObject.parseObject(msgStr, new TypeReference<Map<String, String>>() {});

            boolean result = userBillSerivce.queueMsgCreateUserBill(userBillParams);

            if(result){
                //处理成功,响应队列,删除该条信息
                this.basicACK(message, channel);
                log.info("=======消息:" + msgStr + ",处理成功!");
            }else{

                RabbitMQConsumerFailData rmcfd = new RabbitMQConsumerFailData();
                rmcfd.setId(IdGenerator.randomUUID());
                rmcfd.setData(msgStr);
                rmcfd.setType("0");
                rmcfd.setCreateBy("admin");
                rmcfd.setCreateTime(new Date());

                userBillSerivce.insertRabbitMQFailData(rmcfd);

                //处理失败,拒绝数据
                this.basicReject(message, channel);
                log.info("=======消息:" + msgStr + ",处理失败。回退!");
            }
        }catch(Exception e){
            log.error("=======消息业务处理异常=====", e);
            this.basicReject(message, channel);
            e.printStackTrace();
        }
    }

    //正常消费通知
    private void basicACK(Message message,Channel channel){
        try{
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        }catch(IOException e){
            log.error("通知服务器移除mq时异常,异常信息:"+e);
        }
    }
    //处理异常,消息回到异常处理队列总再处理
    private void basicReject(Message message,Channel channel){
        try {
            /**
             * 第一个参数:该消息的index
             * 第二个参数:是否批量.true:将一次性拒绝全部小于deliveryTag的消息。
             * 第三个参数:被拒绝的是否从新入队列
             */
//            channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
            channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
        } catch (IOException e) {
            try {
                log.error(new String(message.getBody(), "utf-8") + "从新进入服务器时出现异常,异常信息:", e);
            } catch (UnsupportedEncodingException e1) {
                e1.printStackTrace();
            }
            e.printStackTrace();
        }

    }


}

死信队列消费者:

package com.qiantu.core.rabbitmq;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.qiantu.core.service.UserBillSerivce;
import com.rabbitmq.client.Channel;
import org.apache.log4j.Logger;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.Map;

/**
 * @Description: 失败信息再处理
 * @Date: create in 2018-08-02 15:00
 * @Author:Reynold-白
 */
@Component
public class DeadUserBillQueueListenter implements ChannelAwareMessageListener {

    protected static Logger log = Logger.getLogger(QueueListenter.class);

    @Autowired
    private UserBillSerivce userBillSerivce;

    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        String msgStr = "";
        try{
            msgStr = new String(message.getBody(), "UTF-8");
            log.info("=====获取消息" + msgStr);

            Map<String, String> userBillParams = JSONObject.parseObject(msgStr, new TypeReference<Map<String, String>>() {});

            boolean result = userBillSerivce.queueMsgCreateUserBill(userBillParams);

            if(result){
                //处理成功,响应队列,删除该条信息
                this.basicACK(message, channel);
                log.info("=======deadUserBillQueue消息:" + msgStr + ",处理成功!");
            }else{
                //处理失败,抛弃数据
                this.basicNack(message, channel);
                log.info("=======deadUserBillQueue消息:" + msgStr + ",处理失败。回退!");
            }
        }catch(Exception e){
            log.error("=======deadUserBillQueue消息业务处理异常=====", e);
            this.basicNack(message, channel);
            e.printStackTrace();
        }
    }

    //正常消费通知
    private void basicACK(Message message,Channel channel){
        try{
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        }catch(IOException e){
            log.error("deadUserBillQueue通知服务器移除mq时异常,异常信息:"+e);
        }
    }
    //处理异常,删除信息
    private void basicNack(Message message,Channel channel){
        try {
            /**
             * 第一个参数:该消息的index
             * 第二个参数:是否批量.true:将一次性拒绝全部小于deliveryTag的消息。
             * 第三个参数:被拒绝的是否从新入队列
             */
            channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
//            channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
        } catch (IOException e) {
            log.error("deadUserBillQueue通知服务器移除mq时异常,异常信息:"+e);
            try {
                log.error(new String(message.getBody(), "utf-8") + "从新进入服务器时出现异常,异常信息:", e);
            } catch (UnsupportedEncodingException e1) {
                e1.printStackTrace();
            }
            e.printStackTrace();
        }

    }
}

亲测可实现错误消息转发,至于队列和消息的优先级能够根据队列的数据进行配置。与消息失效方式一致。

但要注意,队列和消息优先级须要 spring的版本较高至少要4.1以上(低版本主要是命名空间中的属性标签不支持),RabbitMQ3.5以上才能支持。

2018-08-09日补充:

以上demo在处理消息时还不够全面。首先若是消费端业务过于复杂致使消息 消费失败,这个时候可使用死信队列保存(我的以为),或者入库都可,但却没法保证 排除消息重发的这种现象。一旦消息重发,呗消费端消费,有涉及客户的小金库,那就玩完。。。通宵补数据都是轻的。

经过查阅资料得知,能够向异步接口那样,引用幂等概念进行控制。有两种方案。

一、经过MQ自身的msg-id来进行控制(这个id一直都没有找到在哪里获取);

二、能够在上游(生产端)生成一个惟一标识(相似流水号不重复的这种),在消费端进行验证。入库也好。缓存验证也行。目前采用这中方法。

以上 是我的的一点浅谈。。继续找那个msg-id去

相关文章
相关标签/搜索