在好久以前写过一篇Kafka相关的文章,你须要知道的Kafka,那个时候在业务上更多的是使用的是Kafka,而如今换了公司以后,更多的使用的是Rocketmq,本篇文章会尽力全面的介绍RocketMQ和Kafka各个关键点的比较,但愿你们读完能有所收获。java
RocketMQ前身叫作MetaQ, 在MeataQ发布3.0版本的时候更名为RocketMQ,其本质上的设计思路和Kafka相似,可是和Kafka不一样的是其使用Java进行开发,因为在国内的Java受众群体远远多于Scala,因此RocketMQ是不少以Java语言为主的公司的首选。一样的RocketMQ和Kafka都是Apache基金会中的顶级项目,他们社区的活跃度都很是高,项目更新迭代也很是快。mysql
public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); producer.start(); for (int i = 0; i < 128; i++) try { { Message msg = new Message("TopicTest", "TagA", "OrderID188", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } } catch (Exception e) { e.printStackTrace(); } producer.shutdown(); } }
直接定义好一个producer,建立好Message,调用send方法便可。算法
public class PushConsumer { public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1"); consumer.subscribe("TopicTest", "*"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); //wrong time format 2017_0422_221800 consumer.setConsumeTimestamp("20181109221800"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.printf("Consumer Started.%n"); } }
对于RocketMQ先抛出几个问题:sql
对于RocketMQ的架构图,在大致上来看和Kafka并无太多的差异,可是在不少细节上是有不少差异的,接下来会一一进行讲述。微信
在3.1的架构中咱们有多个Producer,多个主Broker,多个从Broker,每一个Producer能够对应多个Topic,每一个Consumer也能够消费多个Topic。网络
Broker信息会上报至NameServer,Consumer会从NameServer中拉取Broker和Topic的信息。架构
不少朋友都在问什么是无状态呢?状态的有无实际上就是数据是否会作存储,有状态的话数据会被持久化,无状态的服务能够理解就是一个内存服务,NameServer自己也是一个内存服务,全部数据都存储在内存中,重启以后都会丢失。并发
在RocketMQ中的每一条消息,都有一个Topic,用来区分不一样的消息。一个主题通常会有多个消息的订阅者,当生产者发布消息到某个主题时,订阅了这个主题的消费者均可以接收到生产者写入的新消息。框架
在Topic中有分为了多个Queue,这实际上是咱们发送/读取消息通道的最小单位,咱们发送消息都须要指定某个写入某个Queue,拉取消息的时候也须要指定拉取某个Queue,因此咱们的顺序消息能够基于咱们的Queue维度保持队列有序,若是想作到全局有序那么须要将Queue大小设置为1,这样全部的数据都会在Queue中有序。异步
在上图中咱们的Producer会经过一些策略进行Queue的选择:
咱们同一组Consumer也会根据一些策略来选Queue,常见的好比平均分配或者一致性Hash分配。
要注意的是当Consumer出现下线或者上线的时候,这里须要作重平衡,也就是Rebalance,RocketMQ的重平衡机制以下:
因为重平衡是定时作的,因此这里有可能会出现某个Queue同时被两个Consumer消费,因此会出现消息重复投递。
Kafka的重平衡机制和RocketMQ不一样,Kafka的重平衡是经过Consumer和Coordinator联系来完成的,当Coordinator感知到消费组的变化,会在心跳过程当中发送重平衡的信号,而后由一个ConsumerLeader进行重平衡选择,而后再由Coordinator将结果通知给全部的消费者。
在RocketMQ中Queue被分为读和写两种,在最开始接触RocketMQ的时候一直觉得读写队列数量配置不一致不会出现什么问题的,好比当消费者机器不少的时候咱们配置不少读的队列,可是实际过程当中发现会出现消息没法消费和根本没有消息消费的状况。
这个功能在RocketMQ在我看来明显没什么用,由于基本上都会设置为读写队列大小同样,这个为啥不直接将其进行统一,反而容易让用户配置不同出现错误。
这个问题在RocketMQ的Issue里也没有收到好的答案。
通常来讲消息队列的消费模型分为两种,基于推送的消息(push)模型和基于拉取(poll)的消息模型。
基于推送模型的消息系统,由消息代理记录消费状态。消息代理将消息推送到消费者后,标记这条消息为已经被消费,可是这种方式没法很好地保证消费的处理语义。好比当咱们把已经把消息发送给消费者以后,因为消费进程挂掉或者因为网络缘由没有收到这条消息,若是咱们在消费代理将其标记为已消费,这个消息就永久丢失了。若是咱们利用生产者收到消息后回复这种方法,消息代理须要记录消费状态,这种不可取。
用过RocketMQ的同窗确定不由会想到,在RocketMQ中不是提供了两种消费者吗? MQPullConsumer
和MQPushConsumer
,其中MQPushConsumer
不就是咱们的推模型吗?其实这两种模型都是客户端主动去拉消息,其中的实现区别以下:
消费模式咱们分为两种,集群消费,广播消费:
在Kafka中使用的原生的socket实现网络通讯,而RocketMQ使用的是Netty网络框架,如今愈来愈多的中间件都不会直接选择原生的socket,而是使用的Netty框架,主要得益于下面几个缘由:
选择框架是一方面,而想要保证网络通讯的高效,网络线程模型也是一方面,咱们常见的有1+N(1个Acceptor线程,N个IO线程),1+N+M(1个acceptor线程,N个IO线程,M个worker线程)等模型,RocketMQ使用的是1+N1+N2+M的模型,以下图所示:
1个acceptor线程,N1个IO线程,N2个线程用来作Shake-hand,SSL验证,编解码;M个线程用来作业务处理。这样的好处将编解码,和SSL验证等一些可能耗时的操做放在了一个单独的线程池,不会占据咱们业务线程和IO线程。
作为一个好的消息系统,高性能的存储,高可用都不可少。
RocketMQ和Kafka的存储核心设计有很大的不一样,因此其在写入性能方面也有很大的差异,这是16年阿里中间件团队对RocketMQ和Kafka不一样Topic下作的性能测试:
从图上能够看出:
那RocketMQ为何在多Topic的状况下,依然还能很好的保持较多的吞吐量呢?咱们首先来看一下RocketMQ中比较关键的文件:
这里有四个目录(这里的解释就直接用RocketMQ官方的了):
HOME \store\index\${fileName},文件名fileName是以建立时的时间戳命名的,固定的单个IndexFile文件大小约为400M,一个IndexFile能够保存 2000W个索引,IndexFile的底层存储设计为在文件系统中实现HashMap结构,故rocketmq的索引文件其底层实现为hash索引。
咱们发现咱们的消息主体数据并无像Kafka同样写入多个文件,而是写入一个文件,这样咱们的写入IO竞争就很是小,能够在不少Topic的时候依然保持很高的吞吐量。有同窗说这里的ConsumeQueue写是在不停的写入呢,而且ConsumeQueue是以Queue维度来建立文件,那么文件数量依然不少,在这里ConsumeQueue的写入的数据量很小,每条消息只有20个字节,30W条数据也才6M左右,因此其实对咱们的影响相对Kafka的Topic之间影响是要小不少的。咱们整个的逻辑能够以下:
Producer不断的再往CommitLog添加新的消息,有一个定时任务ReputService会不断的扫描新添加进来的CommitLog,而后不断的去构建ConsumerQueue和Index。
注意:这里指的都是普通的硬盘,在SSD上面多个文件并发写入和单个文件写入影响不大。 读取消息 Kafka中每一个Partition都会是一个单独的文件,因此当消费某个消息的时候,会很好的出现顺序读,咱们知道OS从物理磁盘上访问读取文件的同时,会顺序对其余相邻块的数据文件进行预读取,将数据放入PageCache,因此Kafka的读取消息性能比较好。
RocketMQ读取流程以下:
ConsumerQueue也是每一个Queue一个单独的文件,而且其文件体积小,因此很容易利用PageCache提升性能。而CommitLog,因为同一个Queue的连续消息在CommitLog实际上是不连续的,因此会形成随机读,RocketMQ对此作了几个优化:
3.6.2.1 集群模式
咱们首先须要选择一种集群模式,来适应咱们可忍耐的可用程度,通常来讲分为三种:
通常来讲投入生产环境的话都会选择第四种,来保证最高的可用性。
3.6.2.2 消息的可用性
当咱们选择好了集群模式以后,那么咱们须要关心的就是怎么去存储和复制这个数据,rocketMQ对消息的刷盘提供了同步和异步的策略来知足咱们的,当咱们选择同步刷盘以后,若是刷盘超时会给返回FLUSH_DISK_TIMEOUT,若是是异步刷盘不会返回刷盘相关信息,选择同步刷盘能够尽最大程度知足咱们的消息不会丢失。
除了存储有选择以后,咱们的主从同步提供了同步和异步两种模式来进行复制,固然选择同步能够提高可用性,可是消息的发送RT时间会降低10%左右。
咱们上面对于master-slave部署模式已经作了不少分析,咱们发现,当master出现问题的时候,咱们的写入怎么都会不可用,除非恢复master,或者手动将咱们的slave切换成master,致使了咱们的Slave在多数状况下只有读取的做用。RocketMQ在最近的几个版本中推出了Dleger-RocketMQ,使用Raft协议复制CommitLog,而且自动进行选主,这样master宕机的时候,写入依然保持可用。
有关Dleger-RocketMQ的信息更多的能够查看这篇文章:Dledger-RocketMQ 基于Raft协议的commitlog存储库。
定时消息和延时消息在实际业务场景中使用的比较多,好比下面的一些场景:
在开源版本的RocketMQ中延时消息并不支持任意时间的延时,须要设置几个固定的延时等级,目前默认设置为:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
,从1s到2h分别对应着等级1到18,而阿里云中的版本(要付钱)是能够支持40天内的任什么时候刻(毫秒级别)。咱们先看下在RocketMQ中定时任务原理图:
能够看见延时消息是利用新建单独的Topic和Queue来实现的,若是咱们要实现40天以内的任意时间度,基于这种方案,那么须要402460601000个queue,这样的成本是很是之高的,那阿里云上面的支持任意时间是怎么实现的呢?这里猜想是持久化二级TimeWheel时间轮,二级时间轮用于替代咱们的ConsumeQueue,保存Commitlog-Offset,而后经过时间轮不断的取出当前已经到了的时间,而后再次投递消息。具体的实现逻辑须要后续会单独写一篇文章。
事务消息一样的也是RocketMQ中的一大特点,其能够帮助咱们完成分布式事务的最终一致性,有关分布式事务相关的能够看我之前的不少文章都有不少详细的介绍,这里直接关注公众号:咖啡拿铁。
具体使用事务消息步骤以下:
事务消息的使用整个流程相对以前几种消息使用比较复杂,下面是事务消息实现的原理图:
咱们发现RocketMQ实现事务消息也是经过修改原Topic信息,和延迟消息同样,而后模拟成消费者进行消费,作一些特殊的业务逻辑。固然咱们还能够利用这种方式去作RocketMQ更多的扩展。
这里让咱们在回到文章中提到的几个问题: