消息中间件——RabbitMQ(七)高级特性全在这里!(上)

求关注

高级特性全在这里!(上)

前言

前面咱们介绍了RabbitMQ的安装、各大消息中间件的对比、AMQP核心概念、管控台的使用、快速入门RabbitMQ。本章将介绍RabbitMQ的高级特性。分两篇(上/下)进行介绍。java

  • 消息如何保障100%的投递成功?
  • 幂等性概念详解
  • 在海量订单产生的业务高峰期,如何避免消息的重复消费的问题?
  • Confirm确认消息、Return返回消息

1 消息如何保障100%的投递成功?

1.1 什么是生产端的可靠性投递?

  • 保障消息的成功发出
  • 保障MQ节点的成功接收
  • 发送端收到MQ节点(Broker)确认应答
  • 完善的消息进行补偿机制

前三步不必定能保障消息可以100%投递成功。所以要加上第四步mysql

BAT/TMD 互联网大厂的解决方案: - 消息落库,对消息状态进行打标 在发送消息的时候,须要将消息持久化到数据库中,并给这个消息设置一个状态(未发送、发送中、到达)。当消息状态发生了变化,须要对消息作一个变动。针对没有到达的消息作一个轮训操做,从新发送。对轮训次数也须要作一个限制3-5次。确保消息可以成功的发送.git

  • 消息的延迟投递,作二次确认,回调检查

具体采用哪一种方案,还须要根据业务与消息的并发量而定。github

1.2 第一种方案:

生产端-可靠性投递 面试

第一种方案

图解:算法

蓝色部分表示:生产者负责发送消息发送至Broker端 Biz DB:订单数据库 MSG DB: 消息数据 面对小规模的应用能够采用加事务的方式,保证事务的一致性。但在大厂中面对高并发,并无加事务,事务的性能拼接很是严重,而是作补偿。sql

好比:以下发一条订单消息。数据库

step1:存储订单消息(建立订单),业务数据入库,消息也入库。缺点:须要持久化两次。(status:0) step2:在step1成功的前提下,发送消息 step3:Broker收到消息后,confirm给咱们的生产端。Confirm Listener异步监听Broker回送的消息。 step4:抓取出指定的消息,更新(status=1),表示消息已经投递成功。编程

step5:分布式定时任务获取消息状态,若是等于0则抓取数据出来。 step6:从新发送消息 step7:重试限制设置3次。若是消息重试了3次仍是失败,那么(status=2),认为这个消息就是失败的。缓存

查询这些消息为何失败,可能须要人工去查询。

假设step2执行成功,step3因为网络闪断。那么confirm将永远收不到消息,那么咱们须要设定一个规则: 例如:在消息入库的时候,设置一个临界值 timeout=5min,当超过5min以后,就将这条数据抓取出来。 或者写一个定时任务每隔5分钟就将status=0的消息抓取出来。可能存在小问题:消息发送出去,定时任务又正好刚执行,Confirm还未收到,定时任务就会执行,会致使消息执行两次。 更精细化操做:消息超时容忍限制。confirm在2-3分钟内未收到消息,则从新发送。


  • 保障MQ咱们思考若是第一种可靠性投递,在高并发的场景下是否合适?

第一种方案对数据有两次入库,一次业务数据入库,一次消息入库。这样对数据的入库是一个瓶颈。 其实咱们只须要对业务进行入库。

  • 消息的延迟投递,作二次确认,回调检查

这种方式并不必定能保证100%成功,可是也能保证99.99%的消息成功。若是遇到特别极端的状况,那么就只能须要人工去补偿,或者定时任务去作。 第二种方式主要是为了减小对数据库的操做。

看下第二种方式:

第二种方案

图解:

Upstream service:生产端 DownStream service:消费端 Callback service:回调服务

step1:业务消息入库成功后,第一次消息发送。 step2:一样在消息入库成功后,发送第二次消息,这两条消息是同时发送的。第二条消息是延迟检查,能够设置2min、5min 延迟发送。 step3:消费端监听指定队列。 step4:消费端处理完消息后,内部生成新的消息send confirm。投递到MQ Broker。 step5: Callback Service 回调服务监听MQ Broker,若是收到Downstream service发送的消息,则能够肯定消息发送成功,执行消息存储到MSG DB。 step6:Check Detail检查监听step2延迟投递的消息。此时两个监听的队列不是同一个,5分钟后,Callback service收到消息,检查MSG DB。若是发现以前的消息已经投递成功,则不须要作其余事情。若是检查发现失败,则Callback 进行补偿,主动发送RPC 通讯。通知上游生产端从新发送消息。

这样作的目的:少作了一次DB存储。关注点并非百分百的投递成功,而是性能。

2. 幂等性概念

2.1 幂等性是什么?

幂等(idempotent、idempotence)是一个数学与计算机学概念,常见于抽象代数中,即f(f(x)) = f(x)。简单的来讲就是一个操做屡次执行产生的结果与一次执行产生的结果一致

  • 咱们能够借鉴数据库的乐观锁机制:
  • 好比咱们执行一条更新库存的SQL语句:
  • UPDATE T_REPS SET COUNT = COUNT - 1,VERSION = VERSION + 1 WHERE VERSION = 1

利用加版本号Version的方式来保证幂等性。

推荐文章:面试必备的数据库悲观锁与乐观锁

2.2 消费端-幂等性保障

在海量订单产生的业务高峰期,如何避免消息的重复消费问题?

在高并发的状况下,会有大量的消息到达MQ,消费端须要监听大量的消息。这样的状况下,不免会出现消息的重复投递,网络闪断等等。若是不去作幂等,则会出现消息的重复消费。 -消费端实现幂等性,就意味着,咱们的消息永远不会被消费屡次,即便咱们收到了多条同样的消息,也只会执行一次。

看下互联网大厂主流的幂等性操做: -惟一ID+指纹吗机制,利用数据库主键去重。 -利用Redis的原子性实现 -其余的技术实现幂等性

2.2.1 惟一ID+指纹码机制

  • 惟一ID + 指纹吗机制,利用数据库主键去重。 保证惟一性
  • SELECT COUNT(1) FROM T_ORDER WHERE ID = 惟一ID + 指纹码 若是查询没有,则添加。有则不须要作任何操做,消费端不须要消费消息。
  • 好处:实现简单
  • 坏处:高并发下有数据库写入的性能瓶颈
  • 解决方案:跟进ID进行分库分表进行算法路由 分摊流量压力。

2.2.2 Redis 原子特性实现

最简单使用Redis的自增。

  • 使用Redis进行幂等,须要考虑的问题。
  • 第一:咱们是否须要数据落库,若是落库的话,关键解决的问题是数据库和缓存如何作到原子性? 加事务不行,Redis和数据库的事务不是同一个,没法保证同时成功同时失败。你们有什么更好的方案呢?
  • 第二:若是不进行落库,那么都存储到缓存中,如何设置定时同步的策略? 怎么作到缓存数据的稳定性?

3. Confirm 确认消息

理解Confirm 消息确认机制:

  • 消息的确认,是指生产者投递消息后,若是Broker收到消息,则会给咱们生产者一个应答。
  • 生产者进行接收应答,用来肯定这条消息是否正常的发送到Broker,这种方式也是消息的可靠性投递的核心保障!

Confirm确认消息流程图

蓝色:producer 生产者 红色:MQ Broker 服务器

生产者把消息发送到Broker端,Broker收到消息以后回送给producer。Confirm Listener 监听应答。

操做是异步操做,当生产者发送完消息以后,就不须要管了。Confirm Listener 监听MQ Broker的应答。

3.1 如何实现Confirm确认消息?

第一步:在channel上开启确认模式:channel.confirmSelect() 第二步;在chanel上 添加监听:addConfirmListener,监听成功和失败的返回结果,根据具体的结果对消息进行从新发送、或记录日志等后续处理!

3.2 代码编写:

生产者:

/** * * @ClassName: Producer * @Description: 生产者 * @author Coder编程 * @date 2019年7月30日 上午21:27:02 * */
public class Producer {

	
	public static void main(String[] args) throws Exception {
		
		
		//1 建立ConnectionFactory
		Connection connection = ConnectionUtils.getConnection();
	
		//2 经过Connection建立一个新的Channel
		Channel channel = connection.createChannel();
		
		
		//3 指定咱们的消息投递模式: 消息的确认模式 
		channel.confirmSelect();
		
		String exchangeName = "test_confirm_exchange";
		String routingKey = "confirm.save";
		
		//4 发送一条消息
		String msg = "Hello RabbitMQ Send confirm message!";
		channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
		
		//5 添加一个确认监听 用于发送消息到Broker端以后,回送消息的监听
		channel.addConfirmListener(new ConfirmListener() {
			@Override
			public void handleNack(long deliveryTag, boolean multiple) throws IOException {
				System.err.println("-------no ack!-----------");
			}
			
			@Override
			public void handleAck(long deliveryTag, boolean multiple) throws IOException {
				System.err.println("-------ack!-----------");
			}
		});
	}
}



复制代码

消费者:

/** * * @ClassName: Consumer * @Description: 消费者 * @author Coder编程 * @date 2019年7月30日 上午21:32:02 * */
public class Consumer {

	
	public static void main(String[] args) throws Exception {
		
		
		//1 获取一个链接 
        Connection connection = ConnectionUtils.getConnection();
		
		//2经过Connection建立一个新的Channel
		Channel channel = connection.createChannel();
		
		String exchangeName = "test_confirm_exchange";
		String routingKey = "confirm.#";
		String queueName = "test_confirm_queue";
		
		//3 声明交换机和队列 而后进行绑定设置, 最后制定路由Key
		channel.exchangeDeclare(exchangeName, "topic", true);
		channel.queueDeclare(queueName, true, false, false, null);
		channel.queueBind(queueName, exchangeName, routingKey);
		
		//4 建立消费者 
		QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
		channel.basicConsume(queueName, true, queueingConsumer);
		
		while(true){
			Delivery delivery = queueingConsumer.nextDelivery();
			String msg = new String(delivery.getBody());
			
			System.err.println("消费端: " + msg);
		}
		
		
	}
}



复制代码

工具类:

/** * * @ClassName: ConnectionUtils * @Description: 链接工具类 * @author Coder编程 * @date 2019年6月21日 上午22:28:22 * */
public class ConnectionUtils {
    public static Connection getConnection() throws IOException, TimeoutException {
        //定义链接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置服务地址
        factory.setHost("127.0.0.1");
        //端口
        factory.setPort(5672);//amqp协议 端口 相似与mysql的3306
        //设置帐号信息,用户名、密码、vhost
        factory.setVirtualHost("/vhost_cp");
        factory.setUsername("user_cp");
        factory.setPassword("123456");
        // 经过工程获取链接
        Connection connection = factory.newConnection();
        return connection;
    }
}


复制代码

先启动消费端=》再启动生产端

3.3 查看管控台:

队列1
队列2

交换机

3.4 打印结果:

消费端打印结果

生产端打印结果

能够观察到消费端先接收到消息,以后生产端再接收到回调信息。若是出现磁盘已满、RabbitMQ出现异常、queue容量到达上限均可能接收到no ack

若是ack和no ack消息都未接收到,这就是以前所说的。RabbitMQ出现网络闪断,能够采用上面所说的消息补偿

4. Return消息机制

  • Return Listener用于处理一些不可路由的消息!
  • 咱们的消息生产者,经过指定一个Exchange和Routingkey,把消息送达到某一个队列中去,而后咱们的消费者监听队列,进行消费处理操做!
  • 可是在某些状况下,若是咱们在发送消息的时候,当前的exchange不存在或者指定的路由key路由不到,这个时候若是咱们须要监听这种不可达的消息,就要使用Return Listener!

在基础API中有一个关键的配置项:

  • Mandatory:若是为true,则监听器会接收到路由不可达的消息,而后进行后续处理,若是为false,那么broker端自动删除该消息!

4.1 Return消息机制流程

Return消息机制流程

Producer生产端将消息发送到MQ Broker端,可是出现NotFind Exchange,发送的消息的Exchange,在Broker端未能找到。或者找到了,可是路由key路由不到指定的队列。所以是一个错误的消息。 这个时候,生产端应该知道发送的这条消息,并不会被处理。所以MQ Broker提供了这种Return机制,将这些不可达的消息发送给生产端,这时候生产端就须要设置Return Listener去接收这些不可达的消息。而后及时记录日志,去处理这些消息。

4.2 代码演示

生产者:

/** * * @ClassName: Producer * @Description: 生产者 * @author Coder编程 * @date 2019年7月30日 上午22:03:22 * */
public class Producer {

	
	public static void main(String[] args) throws Exception {
		
		
		//1 建立ConnectionFactory
		Connection connection = ConnectionUtils.getConnection();
		
		Channel channel = connection.createChannel();
		
		String exchange = "test_return_exchange";
		String routingKey = "return.save";
		String routingKeyError = "abc.save";
		
		String msg = "Hello RabbitMQ Return Message";
		
		
		channel.addReturnListener(new ReturnListener() {
			@Override
			public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
				
				System.err.println("---------handle return----------");
				//响应码
				System.err.println("replyCode: " + replyCode);
				//响应文本
				System.err.println("replyText: " + replyText);
				System.err.println("exchange: " + exchange);
				System.err.println("routingKey: " + routingKey);
				System.err.println("properties: " + properties);
				System.err.println("body: " + new String(body));
			}
		});
		
		//第三个参数mandatory=true,意味着路由不到的话mq也不会删除消息,false则会自动删除
		channel.basicPublish(exchange, routingKey, true, null, msg.getBytes());
		//修改routingkey,测试是否可以收到消息
		//channel.basicPublish(exchange, routingKeyError, true, null, msg.getBytes());
	}
}

复制代码

消费者:

/** * * @ClassName: Consumer * @Description: 消费者 * @author Coder编程 * @date 2019年7月30日 上午22:33:34 * */
public class Consumer {

	
	public static void main(String[] args) throws Exception {
		
		
		//1 建立ConnectionFactory
		Connection connection = ConnectionUtils.getConnection();
		
		Channel channel = connection.createChannel();
		
		String exchangeName = "test_return_exchange";
		String routingKey = "return.#";
		String queueName = "test_return_queue";
		
		channel.exchangeDeclare(exchangeName, "topic", true, false, null);
		channel.queueDeclare(queueName, true, false, false, null);
		channel.queueBind(queueName, exchangeName, routingKey);
		
		QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
		
		channel.basicConsume(queueName, true, queueingConsumer);
		
		while(true){
			Delivery delivery = queueingConsumer.nextDelivery();
			String msg = new String(delivery.getBody());
			System.err.println("消费者: " + msg);
		}
	}
}


复制代码

ConnectionUtils 工具代码在上面。

启动消费端,并查看管控台。

4.3 查看管控台

Exchanges
queues

4.4 查看打印结果

放开消费端代码:channel.basicPublish(exchange, routingKey, true, null, msg.getBytes()); 消费端打印结果:

消费端打印结果

能够看到打印结果正常,此时再改代码为: channel.basicPublish(exchange, routingKeyError, true, null, msg.getBytes());

生产端打印结果
能够看到生产端接收到了不可达的消息。

文末

欢迎关注我的微信公众号:Coder编程 获取最新原创技术文章和免费学习资料,更有大量精品思惟导图、面试资料、PMP备考资料等你来领,方便你随时随地学习技术知识! 新建了一个qq群:315211365,欢迎你们进群交流一块儿学习。谢谢了!也能够介绍给身边有须要的朋友。

文章收录至 Github: github.com/CoderMerlin… Gitee: gitee.com/573059382/c… 欢迎关注并star~

微信公众号

参考文章:

《RabbitMQ消息中间件精讲》

推荐文章:

消息中间件——RabbitMQ(四)命令行与管控台的基本操做!

消息中间件——RabbitMQ(五)快速入门生产者与消费者,SpringBoot整合RabbitMQ!

消息中间件——RabbitMQ(六)理解Exchange交换机核心概念!