小弟 前段时间使用mq是由于要在Jfianl架构中使用,但Jfinal并不擅长,因此使用的是工具类建立的连接和通道。又写了消费者和生产者的公共方法。java
如今有一个业务。对接银行的时候,因异步回调。致使客户在对一张A表操做 和银行回调对A表的操做产生并发。导致A表出现一个seq_no重复。余额也计算错误。领导要求集成MQ,小弟终于在3天后集成了一个基础的demo。如今记录一下:spring
首先 maven项目确定要引入jar包的apache
<dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>1.4.5.RELEASE</version> </dependency>
而后请看spring的配置:json
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd" > <description>rabbitmq 链接服务配置</description> <rabbit:connection-factory id="connectionFactory" username="${mq.name}" password="${mq.pwd}" host="${mq.url}" port="${mq.port}"/> <rabbit:admin connection-factory="connectionFactory"/> <!-- spring template声明--> <rabbit:template exchange="${mq.user.bill.exchange.name}" id="amqpTemplate" connection-factory="connectionFactory" message-converter="jsonMessageConverter" /> <!-- 消息对象json转换类 --> <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" /> <!-- 业务队列 --> <rabbit:queue id="user_bill_queue" name="user_bill_queue" durable="true" auto-delete="false" exclusive="false"> <!-- <rabbit:queue-arguments> <!– 设置死信交换机 –> <entry key="x-dead-letter-exchange"> <value type="java.lang.String">dead_letter_userbill_exchange</value> </entry> <!– 设置死信交换机的路由键 –> <entry key="x-dead-letter-routing-key"> <value type="java.lang.String">userbill_queue_fail</value> </entry> </rabbit:queue-arguments>--> </rabbit:queue> <!-- 死信队列 --> <!--<rabbit:queue id="user_bill_dead_queue" name="user_bill_dead_queue" durable="true" auto-delete="false" exclusive="false" />--> <!-- 死信交换机配置 --> <!--<rabbit:direct-exchange name="dead_letter_userbill_exchange" durable="true" auto-delete="false" id="dead_letter_exchange"> <rabbit:bindings> <rabbit:binding queue="user_bill_dead_queue" key="userbill_queue_fail"/> </rabbit:bindings> </rabbit:direct-exchange>--> <!-- 正常交换机配置 --> <rabbit:direct-exchange name="${mq.user.bill.exchange.name}" durable="true" auto-delete="false" id="${mq.user.bill.exchange.name}"> <rabbit:bindings> <rabbit:binding queue="user_bill_queue" key="${mq.user.bill.routing.key}"/> </rabbit:bindings> </rabbit:direct-exchange> <!-- 配置监听 手动ack prefetch="1" 表示消费一条--> <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1" > <rabbit:listener queues="user_bill_queue" ref="queueListenter"/> <!--<rabbit:listener queues="user_bill_dead_queue" ref="deadUserBillQueueListenter"/>--> </rabbit:listener-container> </beans>
一、强调一下命名空间很少。够用就行。这里只单单配置了mq 与spring其余文件集成可以使用import不要重复引用便可缓存
二、这里粘出来的 有异常msg的处理。也就是死信队列。后面会提到服务器
以上基本都是固定配置。获取连接,建立admin(在消息代理中如何利用协议来配置队列,交换和绑定。实现将自动声明在一个应用上下文的Queues,Exchanges,Bindings。具体功能我也不清楚。一直没搞懂) 建立生产者模板,建立队列。建立指定路由key的交换器 并绑定队列,消息对象转json的bean等等。架构
三、若是想要引入消息失效时间,须要在定义队列的地方添加属性<rabbit:queue-arguments>,并指定并发
<entry key="x-message-ttl"> <value type="java.lang.Integer">60000</value> </entry>
表示该队列中的信息失效时间为1min。dom
要引入队列的等级 须要的key=x-max-priority。异步
下面来讲下 死信队列。当有消息再消费端处理失败时。若是要ackNack的话(true),会致使不断消费这个消息,一直产生错误,一个死循环。
这时,使用死信队列就能够处理。
一、定义业务队列的时候绑定一个死信交换机。并绑定一个路由key,注意x-dead-letter-exchange和x-dead-letter-routing-key是固定参数
<rabbit:queue-arguments> <!-- 设置死信交换机 --> <entry key="x-dead-letter-exchange"> <value type="java.lang.String">dead_letter_userbill_exchange</value> </entry> <!-- 设置死信交换机的路由键 --> <entry key="x-dead-letter-routing-key"> <value type="java.lang.String">userbill_queue_fail</value> </entry> </rabbit:queue-arguments>
二、设置一个死信队列,用来接收死信交换机转发来的异常信息(想要队列的其余属性能够自定义配置)
<rabbit:queue id="user_bill_dead_queue" name="user_bill_dead_queue" durable="true" auto-delete="false" exclusive="false" />
三、定义一个死信交换机,名称与业务队列中定义的一致,绑定死信队列和路由key(与业务队列中定义的死信交换机的路由key一致)
<rabbit:direct-exchange name="dead_letter_userbill_exchange" durable="true" auto-delete="false" id="dead_letter_exchange"> <rabbit:bindings> <rabbit:binding queue="user_bill_dead_queue" key="userbill_queue_fail"/> </rabbit:bindings> </rabbit:direct-exchange>
四、在监听器中将死信队列归入监听 监听器中的ref bean 都是经过@Component注解注入的。
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1" > <rabbit:listener queues="user_bill_queue" ref="queueListenter"/> <rabbit:listener queues="user_bill_dead_queue" ref="deadUserBillQueueListenter"/> </rabbit:listener-container>
这样就完成了失败消息转发到死信队列中。在设计另外一个消费者deadUserBillQueueListenter 进行消息处理便可,可设计,在处理一次失败就将期ackreject
这里要提醒一下,当设计有自定义交换机时,生产者传入的就不是队列名称 ,而是交换机名称和路由key,只有在使用默认交换机时才使用队列名称
生产者代码:
package com.qiantu.core.rabbitmq; /** * @Description: 给队列发送消息接口类 * @Date: create in 2018-07-30 16:36 * @Author:Reynold-白 */ public interface MQProducer { /** * 发送消息到指定队列 * @param queueKey * @param object */ void sendDataToQueue(String exchangeName, String routingKey, Object object); }
package com.qiantu.core.rabbitmq; import com.alibaba.fastjson.JSON; import org.apache.log4j.Logger; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.support.GenericXmlApplicationContext; import org.springframework.stereotype.Service; /** * @Description: 发送消息实现 * @Date: create in 2018-07-30 16:37 * @Author:Reynold-白 */ @Service("mqProducer") public class MQProducerImpl implements MQProducer{ private final static Logger log = Logger.getLogger(MQProducerImpl.class); @Autowired private AmqpTemplate amqpTemplate; @Override public void sendDataToQueue(String exchangeName, String routingKey, Object object) { try { log.info("========向MQ发送消息【开始】========消息:" + object.toString()); amqpTemplate.convertAndSend(exchangeName, routingKey,object); log.info("========向MQ发送消息【完成】========消息:"); } catch (Exception e) { log.error("=======发送消息失败======", e); e.printStackTrace(); } } }
消费者代码:
package com.qiantu.core.rabbitmq; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.TypeReference; import com.qiantu.core.constants.UserBillConstants; import com.qiantu.core.model.RabbitMQConsumerFailData; import com.qiantu.core.service.UserBillSerivce; import com.qiantu.core.utils.IdGenerator; import com.rabbitmq.client.Channel; import org.apache.log4j.Logger; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.util.Date; import java.util.HashMap; import java.util.Map; /** * @Description: userBill消息监听消费 * @Date: create in 2018-07-30 17:08 * @Author:Reynold-白 */ @Component public class QueueListenter implements ChannelAwareMessageListener { protected static Logger log = Logger.getLogger(QueueListenter.class); @Autowired private UserBillSerivce userBillSerivce; @Override public void onMessage(Message message, Channel channel) { String msgStr = ""; try{ msgStr = new String(message.getBody(), "UTF-8"); log.info("=====获取消息" + msgStr); Map<String, String> userBillParams = JSONObject.parseObject(msgStr, new TypeReference<Map<String, String>>() {}); boolean result = userBillSerivce.queueMsgCreateUserBill(userBillParams); if(result){ //处理成功,响应队列,删除该条信息 this.basicACK(message, channel); log.info("=======消息:" + msgStr + ",处理成功!"); }else{ RabbitMQConsumerFailData rmcfd = new RabbitMQConsumerFailData(); rmcfd.setId(IdGenerator.randomUUID()); rmcfd.setData(msgStr); rmcfd.setType("0"); rmcfd.setCreateBy("admin"); rmcfd.setCreateTime(new Date()); userBillSerivce.insertRabbitMQFailData(rmcfd); //处理失败,拒绝数据 this.basicReject(message, channel); log.info("=======消息:" + msgStr + ",处理失败。回退!"); } }catch(Exception e){ log.error("=======消息业务处理异常=====", e); this.basicReject(message, channel); e.printStackTrace(); } } //正常消费通知 private void basicACK(Message message,Channel channel){ try{ channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); }catch(IOException e){ log.error("通知服务器移除mq时异常,异常信息:"+e); } } //处理异常,消息回到异常处理队列总再处理 private void basicReject(Message message,Channel channel){ try { /** * 第一个参数:该消息的index * 第二个参数:是否批量.true:将一次性拒绝全部小于deliveryTag的消息。 * 第三个参数:被拒绝的是否从新入队列 */ // channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true); channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); } catch (IOException e) { try { log.error(new String(message.getBody(), "utf-8") + "从新进入服务器时出现异常,异常信息:", e); } catch (UnsupportedEncodingException e1) { e1.printStackTrace(); } e.printStackTrace(); } } }
死信队列消费者:
package com.qiantu.core.rabbitmq; import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.TypeReference; import com.qiantu.core.service.UserBillSerivce; import com.rabbitmq.client.Channel; import org.apache.log4j.Logger; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.util.Map; /** * @Description: 失败信息再处理 * @Date: create in 2018-08-02 15:00 * @Author:Reynold-白 */ @Component public class DeadUserBillQueueListenter implements ChannelAwareMessageListener { protected static Logger log = Logger.getLogger(QueueListenter.class); @Autowired private UserBillSerivce userBillSerivce; @Override public void onMessage(Message message, Channel channel) throws Exception { String msgStr = ""; try{ msgStr = new String(message.getBody(), "UTF-8"); log.info("=====获取消息" + msgStr); Map<String, String> userBillParams = JSONObject.parseObject(msgStr, new TypeReference<Map<String, String>>() {}); boolean result = userBillSerivce.queueMsgCreateUserBill(userBillParams); if(result){ //处理成功,响应队列,删除该条信息 this.basicACK(message, channel); log.info("=======deadUserBillQueue消息:" + msgStr + ",处理成功!"); }else{ //处理失败,抛弃数据 this.basicNack(message, channel); log.info("=======deadUserBillQueue消息:" + msgStr + ",处理失败。回退!"); } }catch(Exception e){ log.error("=======deadUserBillQueue消息业务处理异常=====", e); this.basicNack(message, channel); e.printStackTrace(); } } //正常消费通知 private void basicACK(Message message,Channel channel){ try{ channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); }catch(IOException e){ log.error("deadUserBillQueue通知服务器移除mq时异常,异常信息:"+e); } } //处理异常,删除信息 private void basicNack(Message message,Channel channel){ try { /** * 第一个参数:该消息的index * 第二个参数:是否批量.true:将一次性拒绝全部小于deliveryTag的消息。 * 第三个参数:被拒绝的是否从新入队列 */ channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false); // channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); } catch (IOException e) { log.error("deadUserBillQueue通知服务器移除mq时异常,异常信息:"+e); try { log.error(new String(message.getBody(), "utf-8") + "从新进入服务器时出现异常,异常信息:", e); } catch (UnsupportedEncodingException e1) { e1.printStackTrace(); } e.printStackTrace(); } } }
亲测可实现错误消息转发,至于队列和消息的优先级能够根据队列的数据进行配置。与消息失效方式一致。
但要注意,队列和消息优先级须要 spring的版本较高至少要4.1以上(低版本主要是命名空间中的属性标签不支持),RabbitMQ3.5以上才能支持。
2018-08-09日补充:
以上demo在处理消息时还不够全面。首先若是消费端业务过于复杂致使消息 消费失败,这个时候可使用死信队列保存(我的以为),或者入库都可,但却没法保证 排除消息重发的这种现象。一旦消息重发,呗消费端消费,有涉及客户的小金库,那就玩完。。。通宵补数据都是轻的。
经过查阅资料得知,能够向异步接口那样,引用幂等概念进行控制。有两种方案。
一、经过MQ自身的msg-id来进行控制(这个id一直都没有找到在哪里获取);
二、能够在上游(生产端)生成一个惟一标识(相似流水号不重复的这种),在消费端进行验证。入库也好。缓存验证也行。目前采用这中方法。
以上 是我的的一点浅谈。。继续找那个msg-id去