若是要保证顺序消费,那么他的核心点就是:生产者有序存储
、消费者有序消费
。java
无序消息
无序消息也指普通的消息,Producer 只管发送消息,Consumer 只管接收消息,至于消息和消息之间的顺序并无保证。网络
举例
Producer 依次发送 orderId 为 一、二、3 的消息,Consumer 接到的消息顺序有多是 一、二、3,也有多是 二、一、3 等状况,这就是普通消息。并发
对于指定的一个 Topic,全部消息按照严格的先入先出(FIFO)的顺序进行发布和消费。app
举例
好比 Producer 发送orderId 1,3,2 的消息, 那么 Consumer 也必需要按照 1,3,2 的顺序进行消费。异步
在实际开发有些场景中,我并不须要消息彻底按照彻底按的先进先出,而是某些消息保证先进先出就能够了。分布式
就比如一个订单涉及 订单生成
,订单支付
、订单完成
。我不用管其它的订单,只保证一样订单ID能保证这个顺序
就能够了。ide
咱们知道 生产的message最终会存放在Queue中,若是一个Topic关联了16个Queue,若是咱们不指定消息往哪一个队列里放,那么默认是平均分配消息到16个queue,函数
比如有100条消息,那么这100条消息会平均分配在这16个Queue上,那么每一个Queue大概放5~6个左右。这里有一点很重的是:测试
同一个queue,存储在里面的message 是按照先进先出的原则
线程
这个时候思路就来了,比如有orderId=1的3条消息,分别是 订单生产、订单付款、订单完成。只要保证它们放到同一个Queue那就保证消费者先进先出了。
这就保证局部顺序了,即同一订单按照前后顺序放到同一Queue,那么取消息的时候就能够保证先进先取出。
那么全局消息呢?
这个就简单啦,你把全部消息都放在一个Queue里,这样不就保证全局消息了。
就这么简单
固然不是,这里还有很关键的一点,比如在一个消费者集群的状况下,消费者1先去Queue拿消息,它拿到了 订单生成,它拿完后,消费者2去queue拿到的是 订单支付。
拿的顺序是没毛病了,但关键是先拿到不表明先消费完它。会存在虽然你消费者1先拿到订单生成,但因为网络等缘由,消费者2比你真正的先消费消息。这是否是很尴尬了。
订单付款仍是可能会比订单生成更早消费的状况。那怎么办。
分布式锁来了
Rocker采用的是分段锁,它不是锁整个Broker而是锁里面的单个Queue,由于只要锁单个Queue就能够保证局部顺序消费了。
因此最终的消费者这边的逻辑就是
消费者1去Queue拿 订单生成,它就锁住了整个Queue,只有它消费完成并返回成功后,这个锁才会释放。
而后下一个消费者去拿到 订单支付 一样锁住当前Queue,这样的一个过程来真正保证对同一个Queue可以真正意义上的顺序消费,而不只仅是顺序取出。
全局顺序与分区顺序对比
消息类型对比
发送方式对比
其它的注意事项
一、顺序消息暂不支持广播模式。 二、顺序消息不支持异步发送方式,不然将没法严格保证顺序。 三、建议同一个 Group ID 只对应一种类型的 Topic,即不一样时用于顺序消息和无序消息的收发。 四、对于全局顺序消息,建议建立实例个数 >=2。
这里保证两点
一、生产端 同一orderID的订单放到同一个queue。 二、消费端 同一个queue取出消息的时候锁住整个queue,直到消费后再解锁。
@AllArgsConstructor @Data @ToString public class ProductOrder { /** * 订单编号 */ private String orderId; /** * 订单类型(订单建立、订单付款、订单完成) */ private String type; }
生产者和以前发送普通消息最大的区别,就是针对每个message都手动经过MessageQueueSelector
选择好queue。
@RestController public class Product { private static List<ProductOrder> orderList = null; private static String producerGroup = "test_producer"; /** * 模拟数据 */ static { orderList = new ArrayList<>(); orderList.add(new ProductOrder("XXX001", "订单建立")); orderList.add(new ProductOrder("XXX001", "订单付款")); orderList.add(new ProductOrder("XXX001", "订单完成")); orderList.add(new ProductOrder("XXX002", "订单建立")); orderList.add(new ProductOrder("XXX002", "订单付款")); orderList.add(new ProductOrder("XXX002", "订单完成")); orderList.add(new ProductOrder("XXX003", "订单建立")); orderList.add(new ProductOrder("XXX003", "订单付款")); orderList.add(new ProductOrder("XXX003", "订单完成")); } @GetMapping("message") public void sendMessage() throws Exception { //示例生产者 DefaultMQProducer producer = new DefaultMQProducer(producerGroup); //不开启vip通道 开通口端口会减2 producer.setVipChannelEnabled(false); //绑定name server producer.setNamesrvAddr("IP:9876"); producer.start(); for (ProductOrder order : orderList) { //一、生成消息 Message message = new Message(JmsConfig.TOPIC, "", order.getOrderId(), order.toString().getBytes()); //二、发送消息是 针对每条消息选择对应的队列 SendResult sendResult = producer.send(message, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { //三、arg的值其实就是下面传入 orderId String orderid = (String) arg; //四、由于订单是String类型,因此经过hashCode转成int类型 int hashCode = orderid.hashCode(); //五、由于hashCode可能为负数 因此取绝对值 hashCode = Math.abs(hashCode); //六、保证同一个订单号 必定分配在同一个queue上 long index = hashCode % mqs.size(); return mqs.get((int) index); } }, order.getOrderId(),50000); System.out.printf("Product:发送状态=%s, 存储queue=%s ,orderid=%s, type=%s\n", sendResult.getSendStatus(), sendResult.getMessageQueue().getQueueId(), order.getOrderId(), order.getType()); } producer.shutdown(); } }
看看生产者有没有把相同订单指定到同一个queue
经过测试结果能够看出:相同订单已经存到同一queue中了
。
上面说过,消费者真正要达到消费顺序,须要分布式锁,因此这里须要将MessageListenerOrderly
替换以前的MessageListenerConcurrently,由于它里面实现了分布式锁。
@Slf4j @Component public class Consumer { /** * 消费者实体对象 */ private DefaultMQPushConsumer consumer; /** * 消费者组 */ public static final String CONSUMER_GROUP = "consumer_group"; /** * 经过构造函数 实例化对象 */ public Consumer() throws MQClientException { consumer = new DefaultMQPushConsumer(CONSUMER_GROUP); consumer.setNamesrvAddr("IP:9876"); //TODO 这里真的是个坑,我product设置VipChannelEnabled(false),但消费者并无设置这个参数,以前发送普通消息的时候也没有问题。能正常消费。 //TODO 但在顺序消息时,consumer一直不消费消息了,找了很久都没有找到缘由,直到我这里也设置为VipChannelEnabled(false),居然才能够消费消息。 consumer.setVipChannelEnabled(false); //订阅主题和 标签( * 表明全部标签)下信息 consumer.subscribe(JmsConfig.TOPIC, "*"); //注册消费的监听 这里注意顺序消费为MessageListenerOrderly 以前并发为ConsumeConcurrentlyContext consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> { //获取消息 MessageExt msg = msgs.get(0); //消费者获取消息 这里只输出 不作后面逻辑处理 log.info("Consumer-线程名称={},消息={}", Thread.currentThread().getName(), new String(msg.getBody())); return ConsumeOrderlyStatus.SUCCESS; }); consumer.start(); } }
看看消费结果是否是咱们须要的结果
经过测试结果咱们看出
一、消费消息的顺序并无彻底按照以前的先进先出,即没有知足全局顺序。 二、同一订单来说,订单的 订单生成、订单支付、订单完成 消费顺序是保证的。
这是局部保证顺序消费就已经知足咱们当前实际开发中的需求了。
有关消费端选择MessageListenerOrderly
后,consumer.start()启动相关的源码能够参考博客:RocketMQ顺序消息消费端源码
只要本身变优秀了,其余的事情才会跟着好起来(上将4)