Exchange属性:java
消费者从Queue中获取消息并消费。多个消费者能够订阅同一个Queue,这时Queue中的消息会被平均分摊给多个消费者进行处理,而不是每一个消费者都收到全部的消息并处理。安全
Message acknowledgment:消息确认,在消息确认机制下,收到回执才会删除消息,未收到回执而断开了链接,消息会转发给其余消费者,若是忘记回执,会致使消息堆积,消费者重启后会重复消费这些消息并重复执行业务逻辑。服务器
Message durability:消息持久化,设置消息持久化能够避免绝大部分消息丢失,好比rabbitmq服务重启,可是采用非持久化能够提高队列的处理效率。若是要确保消息的持久化,那么消息对应的Exchange和Queue一样要设置为持久化。架构
另外,若是须要可靠性业务,须要设置持久化和ack机制,若是系统高吞吐,能够设置为非持久化、noack、自动删除机制。并发
模拟这样一个业务场景,用户下单成功后,须要给用户增长积分,同时还须要给用户发送下单成功的消息,这是在电商业务中很常见的一个业务场景。dom
若是系统是微服务架构,可能用户下单功能在订单服务,给用户增长积分的功能在积分服务,给用户发送通知消息的功能在通知服务,各个服务之间解耦,互不影响。那么要实现上述的业务场景,消息中间件rabbitmq是一个很好的选择。分布式
缘由以下:ide
实现思路:微服务
用户下单成功后,rabbitmq发送一条消息至EXCHANGE.ORDER_CREATE
交换器,该交换器绑定了两个队列,QUEUE.ORDER_INCREASESCORE
、QUEUE.ORDER_NOTIFY
,消费者订阅这两个队列分别用来处理增长积分、发送用户通知。若是后续日志系统还须要记录下单的相关日志,那么咱们只须要再定义一个队列并将其绑定到EXCHANGE.ORDER_CREATE
便可。高并发
下单发rabbitmq消息
package com.robot.rabbitmq; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.UUID; import java.util.concurrent.TimeoutException; /** * @author: 会跳舞的机器人 * @date: 2017/10/13 10:46 * @description: 模拟用户下单以后发送rabbitmq消息 */ public class OrderCreator { // 交换器名称 private static final String EXCHANGE = "EXCHANGE.ORDER_CREATE"; // 消息内容 private static String msg = "create order success"; /** * 模拟建立订单后发送mq消息 */ public void createOrder() { System.out.println("下单成功,开始发送rabbitmq消息"); ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.12.44"); connectionFactory.setPort(56720); connectionFactory.setUsername("baibei"); connectionFactory.setPassword("baibei"); Connection connection; Channel channel; try { connection = connectionFactory.newConnection(); channel = connection.createChannel(); // 持久化 boolean durable = true; // topic类型 String type = "topic"; // 声明交换器,若是交换器不存在则建立之 channel.exchangeDeclare(EXCHANGE, type, durable); String messgeId = UUID.randomUUID().toString(); // deliveryMode>=2表示设置消息持久化 AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().deliveryMode(2).messageId(messgeId).build(); // 发布消息 String routingKey = "order_create"; channel.basicPublish(EXCHANGE, routingKey, props, msg.getBytes("utf-8")); connection.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } }
积分系统订阅消息
package com.robot.rabbitmq; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.ShutdownSignalException; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @author: 会跳舞的机器人 * @date: 2017/10/13 16:02 * @description: rabbitmq消费者,模拟下单成功后给用户增长积分 */ public class IncreaseScoreConsumer implements Consumer { private Connection connection; private Channel channel; // 交换器名称 private static final String EXCHANGE = "EXCHANGE.ORDER_CREATE"; // 增长积分队列名称 private static final String QUEUENAME = "QUEUE.ORDER_INCREASESCORE"; public void consume() { // 初始化rabbitmq链接信息 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.12.44"); connectionFactory.setPort(56720); connectionFactory.setUsername("baibei"); connectionFactory.setPassword("baibei"); try { connection = connectionFactory.newConnection(); channel = connection.createChannel(); // 声明交换器 channel.exchangeDeclare(EXCHANGE, "topic", true); // 声明队列 channel.queueDeclare(QUEUENAME, true, false, false, null); // 交换器与队列绑定并设置routingKey channel.queueBind(QUEUENAME, EXCHANGE, "order_create"); // 消费消息,callback是该类,关闭自动确认消息,在完成业务逻辑后手动确认确认 channel.basicConsume(QUEUENAME, false, this); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "UTF-8"); System.out.println("《积分系统》收到订单消息:" + msg + ",给用户增长积分......"); // 手动确认消息 channel.basicAck(envelope.getDeliveryTag(), false); /** * channel.basicReject(envelope.getDeliveryTag(), false);该方法会丢弃掉队列中的这条消息 * channel.basicReject(envelope.getDeliveryTag(), true);该方法会把消息从新放回队列 * 通常系统会设定一个重试次数,若是超太重试次数,则会丢弃消息,反之则会把消息再放入队列 */ } public void handleConsumeOk(String consumerTag) { } public void handleCancelOk(String consumerTag) { } public void handleCancel(String consumerTag) throws IOException { } public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) { } public void handleRecoverOk(String consumerTag) { } }
通知系统订阅消息
package com.robot.rabbitmq; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.ShutdownSignalException; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @author: 会跳舞的机器人 * @date: 2017/10/13 16:20 * @description: rabbitmq消费者,模拟下单成功后给用户发送通知 */ public class NotifyConsumer implements Consumer { private Connection connection; private Channel channel; // 交换器名称 private static final String EXCHANGE = "EXCHANGE.ORDER_CREATE"; // 通知用户下单成功通知队列名称 private static final String QUEUENAME = "QUEUE.ORDER_NOTIFY"; public void consume() { // 初始化rabbitmq链接信息 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.12.44"); connectionFactory.setPort(56720); connectionFactory.setUsername("baibei"); connectionFactory.setPassword("baibei"); try { connection = connectionFactory.newConnection(); channel = connection.createChannel(); // 声明交换器 channel.exchangeDeclare(EXCHANGE, "topic", true); // 声明队列 channel.queueDeclare(QUEUENAME, true, false, false, null); // 交换器与队列绑定并设置routingKey channel.queueBind(QUEUENAME, EXCHANGE, "order_create"); // 消费消息,callback是该类,关闭自动确认消息,在完成业务逻辑后手动确认确认 channel.basicConsume(QUEUENAME, false, this); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "UTF-8"); System.out.println("《通知系统》收到订单消息:" + msg + ",开始给用户发送通知......"); // 手动确认消息 channel.basicAck(envelope.getDeliveryTag(), false); /** * channel.basicReject(envelope.getDeliveryTag(), false);该方法会丢弃掉队列中的这条消息 * channel.basicReject(envelope.getDeliveryTag(), true);该方法会把消息从新放回队列 * 通常系统会设定一个重试次数,若是超太重试次数,则会丢弃消息,反之则会把消息再放入队列 */ } public void handleConsumeOk(String consumerTag) { } public void handleCancelOk(String consumerTag) { } public void handleCancel(String consumerTag) throws IOException { } public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) { } public void handleRecoverOk(String consumerTag) { } }
测试
package com.robot.rabbitmq; /** * @author: 会跳舞的机器人 * @date: 2017/10/13 16:27 * @description: */ public class Test { public static void main(String[] args) { IncreaseScoreConsumer increaseScoreConsumer = new IncreaseScoreConsumer(); increaseScoreConsumer.consume(); NotifyConsumer notifyConsumer = new NotifyConsumer(); notifyConsumer.consume(); OrderCreator orderCreator = new OrderCreator(); for (int i = 0; i < 3; i++) { orderCreator.createOrder(); } } }
输出:
下单成功,开始发送rabbitmq消息 《积分系统》收到订单消息:create order success,给用户增长积分...... 《通知系统》收到订单消息:create order success,开始给用户发送通知...... 下单成功,开始发送rabbitmq消息 《积分系统》收到订单消息:create order success,给用户增长积分...... 《通知系统》收到订单消息:create order success,开始给用户发送通知...... 下单成功,开始发送rabbitmq消息 《积分系统》收到订单消息:create order success,给用户增长积分...... 《通知系统》收到订单消息:create order success,开始给用户发送通知......
原文转载:https://www.jianshu.com/p/2f55cd7a3e1c做者:会跳舞的机器人