【3.工程开发】-mq-kafka/rocketmq

本篇是消息队列中的一节,为何讲到消息队列见:https://segmentfault.com/a/11...。其中流处理的数据传播用到消息队列。另外消息队列还能够做用于异步处理,流量削峰,多系统同步等。另外一篇介绍了传统的JMS(activemq),AMQP(rabbitmq),本篇介绍kafka,robbitmq,ddmq,另外简单说下bridgemq以及常见mq的综合对比。同其余系统同样,终点关注架构组件,功能(生产消费等),分布式的高可用,扩展性,一致性等linux

kafka

官方:发布订阅,流处理管道和存储
https://kafka.apache.org/docu...redis

组件

clipboard.png

  • broker:负责消息存储转发,包含topic(一个queue)=》partition(物理分布,一个topic包含一个或多个partition,能够分布在不一样的broker上)
  • producer(与broker leader直连,负载均衡指定partition,可批次发,可设置要ack的副本数)
  • consumer/consumer group.同一group中只有一个c能够消费一个p,负载均衡,此时若这组cg的线程多于p会有空等线程。可是多个cg能够同时消费一个p不受影响。所以在考虑为了消费更快下,对于partition的分区和c的线程数能够一致。
  • 其中partition的集群和单机物理结构以下:
    clipboard.png

    clipboard.png

  • index所有映射到内存,每一个partition下自增id
  • 元数据放在zk上.分partition,每一个partition副本分散在broker上,单partition+单消费才能保证顺序(rocketmq同样)。每一个partition一个索引,顺序写一个文件。流处理+批量处理(累计部分数据才发送),实时上有取舍。

https://kafka.apache.org/docu...apache

高可用和可扩展

1) Producer端使用zookeeper用来"发现"broker列表,以及和Topic下每一个partition leader创建socket链接并发送消息.
2) Broker端使用zookeeper用来注册broker信息,以及监测partition leader存活性.全部的Kafka Broker节点一块儿去Zookeeper上注册一个临时节点,成功的为Broker controller,失效后zk后发现从新注册节点,controller负责各broker内partition的选主(ISR中,记录replica进度,随便选)ISR,在这个集合中的节点都是和leader保持高度一致的,任何一条消息必须被这个集合中的每一个节点读取并追加到日志中了,才回通知外部这个消息已经被提交了。所以这个集合中的任何一个节点随时均可以被选为leader.若是ISR的大小超过某个最小值,则分区将仅接受写入,以防止丢失仅写入单个副本的消息(只关注ISR,而不是共识多个都写入,多数(两个故障须要5个副本,一个要三个)对于主数据的写代价大)【与ES相似都使用的Microsoft的PacificA】
3) Consumer端使用zookeeper用来注册consumer信息,其中包括consumer消费的partition列表等,同时也用来发现broker列表,并和partition leader创建socket链接,并获取消息。
broker,partition,customer组内线程可扩展。json

消费

只保证一个partition被一个customer消费有序
producter推,customer拉(拉须要存日志)
partition中的每一个message只能被组(Consumer group )中的一个consumer(consumer 线程)消费,若多个同时要配多个Consumer group。
kafka中的消息是批量(一般以消息的条数或者chunk的尺寸为单位)发送给consumer,当消息被consumer接收以后,负责维护消息的消费记录(JMS等都是broker维护),consumer能够在本地保存最后消息的offset,并间歇性的向zookeeper注册offset.也没有ACK
消息消费的可靠性,消费者控制,最多一次,先保存offset再处理;至少一次,先处理再保存offset;只一次:最少1次+消费者的输出中额外增长已处理消息最大编号segmentfault

日志压缩

确保有每一个分区数据日志中每一个key有最后已知值,offset不能变。对同一partition的多个文件一块儿压缩合并。
position是文件的bytes偏移吧?压缩过程当中要重建索引和位置?【我的理解是要重建的】
active不动(不影响写入),对cleaner point后面的作压缩,选择日志tail和header比例小的,合并压缩每组log不超过1G,index不超过10M。
clipboard.png
对于tail的压缩过程:【position不变???我的理解这是错误的,position是变得】
每一个日志清理线程会使用一个名为“SkimpyOffsetMap”的对象来构建key与offset的映射关系的哈希表。日志清理须要遍历两第二天志文件,第一次遍历把每一个key的哈希值和最后出现的offset都保存在SkimpyOffsetMap中,映射模型以下图所示。第二次遍历检查每一个消息是否符合保留条件,若是符合就保留下来,不然就会被清理掉服务器

clipboard.png

rocketmq

activemq 不能分片。kafka性能(上面知道基本上partition和consumer须要配置同样的,一个consumer group的线程数和partition数量一致,受partition限制,rocketmq多partition的扩展在于都用一个commitlog,而不是一个partition单独一份顺序log,对于磁盘多个文件是随机写入的,随机高性能很差不能linux组提交,cq只存储位置,在commitlog中找数据,一份彻底顺序的写入提升性能。对于消费顺序和kafka都是同样的保证,cq都是负载均衡,只保证一个cq顺序。在消费时,须要先读取cq上个的offset再读commitlog。http://rocketmq.apache.org/ro...架构

组件

clipboard.png

  • broker :主从,topic,queue,tag
  • nameserver:几乎无状态,可集群内部署,节点对等,不一样步。数据是broker同步过来的
  • producer:链接ns,主从brokers(心跳),无状态
  • consumer/group :链接ns,主从brokers(心跳)

高可用和可扩展

clipboard.png

  • 负载均衡
    Broker上存Topic信息,Topic由多个队列组成,队列会平均分散在多个Broker上,而Producer的发送机制保证消息尽可能平均分布到全部队列中,最终效果就是全部消息都平均落在每一个Broker上。
  • 主从
    机器级别,不依赖zk,元数据:在 Broker 启动的时候,其会将本身在本地存储的配置文件 (默认位于$HOME/store/config/topics.json 目录) 中的全部话题加载到内存中去,而后会将这些全部的话题所有同步到全部的 Name 服务器中。与此同时,Broker 也会启动一个定时任务,默认每隔 30 秒来执行一次话题全同步.主从写commitlog保证持久性和同步和其余同样,就再也不说了。
  • Broker与Namesrv的心跳机制:
    单个Broker跟全部Namesrv保持心跳请求,心跳间隔为30秒,心跳请求中包括当前Broker全部的Topic信息。Namesrv会反查Broer的心跳信息,若是某个Broker在2分钟以内都没有心跳,则认为该Broker下线,调整Topic跟Broker的对应关系。但此时Namesrv不会主动通知Producer、Consumer有Broker宕机。
  • 消息存储持久化
    全部broker上的全部topic都顺序写入内存文件mapedfile(1G),mapedfilelist记录每一个mapedfile在磁盘的偏移量,新消息写入最后一个文件。
  • 动态伸缩能力
    (非顺序消息,消息分散;有序消息只能放在一个queue中,切不支持迁移,只保证一个queue内顺序,但能够多消费线程保证顺序):Broker的伸缩性体如今两个维度:Topic, Broker。
    1)Topic维度:假如一个Topic的消息量特别大,但集群水位压力仍是很低,就能够扩大该Topic的队列数,Topic的队列数跟发送、消费速度成正比。
    2)Broker维度:若是集群水位很高了,须要扩容,直接加机器部署Broker就能够。Broker起来后想Namesrv注册,Producer、Consumer经过Namesrv发现新Broker,当即跟该Broker直连,收发消息。

消费

  1. 消费者注册,消费者上有多有topic的broker地址和队列,消费者负载均衡选择;
    1)广播模式:每一个costumer全量消费,消费偏移量保存在costumer中
    2)集群模式:constumer均匀消费部分,每一个消息只有一个costumer消费,保存在broker上
  2. 新消息发送到q:brocker上commit log和消费组信息

    clipboard.png
    每一个commmit log消息发给topic的随机queue中(生产者的负载均衡,每一个msg只发送到一个q中),每一个queue有不少consumequeue,发给全部。广播模式,cq会在全部q上,集群模式cq会负载均衡到某个q上,消息根据这些配置数据落到q的全部cq上。
    clipboard.png并发

  3. 消费
    3.1)普通的并发消费:queue的全部cq都直接发,全部cq发送后删除(q以TreeMap结构存储)。内部RocketMQ 的消息树是用 TreeMap 实现的,其内部基于消息偏移量维护了消息的有序性。每次消费请求都会从消息数中拿取偏移量最小的几条消息 (默认为 1 条)给用户,以此来达到有序消费的目的。
    3.2)有序消费:在3.1的基础上加两个锁,costumer client给消费的每一个queue会加锁,保证同一时刻只有一个costumer client在消费queue(不然发给一个client删除了消息,此消息在另外一个client和后面的client的消息没法保证顺序),默认20s加一次,queue检测60s没有就释放,每次成功后才取下一条,反正只有一个客户端消费。第二把锁是在client中,将堆积的消息按照顺序加锁的写入线程池task队列中。

    clipboard.png

其余

bridgequeue

内存。redis实现。适合小型系统
clipboard.png负载均衡

ddmq 对大型延时系统的支持,引入chronos

clipboard.png

这里的kafka去掉了。普通的直接用哪一个rocketmq.延时消息和事务消息异步

  • 延时消息
    放入rocketmq一个内部的消费topic中,消费入chronos中(存RocksDB,seektimestamp, while从leveldb中取符合时间的再放入rocketmq中)
  • 事务消息
    A执行后要发送消息给B,由于ddmq一旦接收是保证被消费的,因此增长发送方事务回查。
    clipboard.png

对比

clipboard.png分析:少topic时kafka性能好,rockemq须要读mq后去读一个大的cl。多topic是rockemq好,处理线程多。

相关文章
相关标签/搜索