kafka:一个分布式消息系统

1.背景html

最近由于工做须要,调研了追求高吞吐的轻量级消息系统Kafka,打算替换掉线上运行的ActiveMQ,主要是由于明年的预算日流量有十亿,而ActiveMQ的分布式实现的很奇怪,因此但愿找一个适合分布式的消息系统。apache

如下是内容是调研过程当中总结的一些知识和经验,欢迎拍砖。数组

2.基础知识缓存

2.1.什么是消息队列多线程

首先,咱们来看看什么是消息队列,维基百科里的解释翻译过来以下:架构

 

队列提供了一种异步通讯协议,这意味着消息的发送者和接收者不须要同时与消息保持联系,发送者发送的消息会存储在队列中,直到接收者拿到它。负载均衡

 

通常咱们把消息的发送者称为生产者,消息的接收者称为消费者;注意定义中的那两个字“异步”,一般生产者的生产速度和消费者的消费速度是不相等的;若是两个程序始终保持同步沟通,那势必会有一方存在空等时间;若是两个程序一持续运行的话,消费者的平均速度必定要大于生产者,否则队列囤积会愈来愈多;固然,若是消费者没有时效性需求的话,也能够把消息囤积在队列中,集中消费。异步

说到这里,咱们再来谈谈队列的分类,通常咱们根据生产者和消费者的不一样,能够把队列分为三类:分布式

第一类是在一个应用程序内部(进程之间或者线程之间),相信你们学多线程时都写过“生产者消费者”程序,生产者负责生产,将生产的结果放到缓冲区(如共享数组),消费者从缓冲区取出消费,在这里,这个缓冲区就能够称为“消息队列”。ide

第二类其实也算在第一类的特例,就像咱们喜欢把操做系统和应用程序区别对待来看,操做系统要处理无数繁杂的事物,各进程、线程之间的数据交换少不了消息队列的支持。

第三类是更为通用意义上的“消息队列”,这类队列主要做用于不一样应用,特别是跨机器、平台,这令数据的交换更加普遍,通常一款独立的队列产品除了实现消息的传递外,还提供了相应的可靠性、事务、分布式等特性,将生产者、消费者从中解耦。常见的消费队列产品根据开源与否又可分为两类:

专有软件:IBM WebSphere MQ,MSMQ…

开源软件:ActiveMQ、RabbitMQ、Kafka…

2.2.JMS与AMQP

好了,对于上述第三类“消息队列”,要在不一样的机器中提供消息队列的功能,那势必要有统一的规范,这时候SUN就跳出来了,做为跨平台的JAVA势必也要支持跨平台的消息传递,基于此,SUN提供了一套消息标准:Java Message Service,缩写JMS,可是这套规范定义的是API层面的标准,在JAVA体系中能够很方便的交换,但对于其余平台就须要,可能须要消息队列产品自己支持多协议(如OpenWire、STMOP)。

而AMQP定义的比JMS更加底层,从名字就能看出来(Advanced Message Queuing Protocol),它定义的是Wire-level的协议,自然具备跨平台、跨语言的特性,基于此实现的消息队列能够与任何支持该协议的平台交互。

一种是JAVA层面的API,一种是Wire-level协议,这是JMS和AMQP最本质的区别;同时两种标准还有两个比较明显的差别:

一是消息传递模型;JMS比较简单,支持两种最通用的Peer-2-Peer、publisher/subscriber;通俗点就是点对点和广播模式;而AMQP定义的更为复杂,其定义了一种exchange&binding机制,由此支持五种模型:direct exchange、fanout exchange、topic exchange、headers exchange、system exchange,本质上与P2P、PUB/SUB同样,可是更加细致些。

二是支持的消息类型,JMS支持多种消息模型:TextMessage、MapMessage、BytesMessage、StreamMessage、ObjectMessage、Message等;而AMQP只有byte数组。

2.3.ActiveMQ

ActiveMQ是基于JMS实现的Provider(能够理解为队列),它支持多种协议,如OpenWire,Stomp,AMQP等,基于此,支持多平台;支持事务,支持分发策略、还有上面的多种消息模型。这里咱们不细谈ActiveMQ的各特性,咱们着重来看ActiveMQ的分布式模型。

ActiveMQ支持分布式,它支持Master-Slave提供高可用,也支持Broker-Cluster提供负载均衡,可是它的负载基于一种Forwarding Bridge机制。

 

在这种机制下,任意时刻一条消只会被一个broker持有,producer发送的消息,可能会通过多个broker转发最终才会到达consumer,能够想象,当broker愈来愈多时,几乎每次消费都要通过转发,效率会明显降低;而且在这种复杂逻辑下,任一broker的加入和移除都显得十分复杂;这两点是我不建议使用ActiveMQ分布式集群的根本缘由。

1

3.Kafka

好,咱们最后来谈今天的主角Kafka,这个奇特的名字我始终没有找到典故,也许是开发者暗恋女孩(基友)的名字吧^_^,Kafka由linkin开发,最初的目的是为了应对linkin庞大的活动流数据(登陆、浏览、点击、分享、喜欢等),这部分数据容量庞大,可是可靠性要求不高,故而经过牺牲一部分可靠性(这并非说咱们的数据会按百分比丢,咱们后面再谈)来提高吞吐量;它砍掉了不少复杂的特性,如事务、分发策略、多种消息模型等;经过自身独特的设计将消息持久化到磁盘上,以此同时支持在线和离线消费;而且其天生为分布式而设计,压根就没有单机模式(或者说单机模式是分布式的特例),可以很好的扩展。实际应用中,Kafka能够用来作消息队列、流式处理(通常结合storm)、日志聚合等。

3.1.架构

2

咱们先宏观的看看Kafka的架构,Producer集群经过zookeeper(实际中写的是broker list)获取所写topic对应的partition列表,而后顺序发送消息(支持本身实现分发策略),broker集群负责消息的存储和传递,支持Master Slaver模型,可分布式扩展;Consumer集群从zookeeper上获取topic所在的partition列表,而后消费,一个partition只能被一个consumer消费。Name Server集群(通常是zookeeper)提供名称服务等协调信息。至于什么是topic,什么是partition,咱们接下来看。

3.2.Topic

Topic是生产者生产、消费者消费的队列标识。一个Topic由一个或多个partition组成,每一个partition能够单独存在一个broker上,消费者能够往任一partition发送消息,以此实现生产的分布式,任一partition均可以被且只被一个消费者消息,以此实现消费的分布式;所以partition的设计提供了分布式的基础。

3

同时,从上图咱们也能发现这种设计还有一个优势,由于每一个partition内的消息是有序的,而一个partition只能被一个消费者消费,所以Kafka能提供partition层面的消息有序,而传统的队列在多个consumer的状况下是彻底没法保证有序的。

3.3.消息传递模型

传统的消息队列最少提供两种消息模型,一种P2P,一种PUB/SUB,而Kafka并无这么作,巧妙的,它提供了一个消费者组的概念,一个消息能够被多个消费者组消费,可是只能被一个消费者组里的一个消费者消费,这样当只有一个消费者组时就等同与P2P模型,当存在多个消费者组时就是PUB/SUB模型。

4

3.4.消息持久化

不少系统、组件为了提高效率通常巴不得把全部数据都扔到内存里,而后按期flush到磁盘上;可实际上,现代操做系统也是这样,全部的现代操做系统都乐于将空闲内存转做磁盘缓存(页面缓存),想不用都难;对于这样的系统,他的数据在内存中保存了一份,同时也在OS的页面缓存中保存了一份,这样不但多了一个步骤还让内存的使用率降低了一半;所以,Kafka决定直接使用页面缓存;可是随机写入的效率很慢,为了维护彼此的关系顺序还须要额外的操做和存储,而线性的写入能够避免这些,实际上,线性写入(linear write)的速度大约是300MB/秒,但随即写入却只有50k/秒,其中的差异接近10000倍。这样,Kafka以页面缓存为中间的设计在保证效率的同时还提供了消息的持久化,每一个消费者本身维护当前读取数据的offser(也可委托给zookeeper),以此可同时支持在线和离线的消费。

3.5.Push vs. Pull

对于消息的消费,ActiveMQ使用PUSH模型,而Kafka使用PULL模型,二者各有利弊,对于PUSH,broker很难控制数据发送给不一样消费者的速度,而PULL能够由消费者本身控制,可是PULL模型可能形成消费者在没有消息的状况下盲等,这种状况下能够经过long polling机制缓解,而对于几乎每时每刻都有消息传递的流式系统,这种影响能够忽略。

3.6.可靠性

刚刚说Kafka牺牲了一些可靠性来提高吞吐量,不少同窗可能担忧消息的丢失,那么咱们如今来看看各类状况下的可靠性。

5

对于如上的模型,咱们分开来看,

先来看消息投递可靠性,一个消息如何算投递成功,Kafka提供了三种模式,第一种是啥都无论,发送出去就看成成功,这种状况固然不能保证消息成功投递到broker;第二种是对于Master Slave模型,只有当Master和全部Slave都接收到消息时,才算投递成功,这种模型提供了最高的投递可靠性,可是损伤了性能;第三种模型,即只要Master确认收到消息就算投递成功;实际使用时,根据应用特性选择,绝大多数状况下都会中和可靠性和性能选择第三种模型。

咱们再来看消息在broker上的可靠性,由于消息会持久化到磁盘上,因此若是正常stop一个broker,其上的数据不会丢失;可是若是不正常stop,可能会使存在页面缓存来不及写入磁盘的消息丢失,这能够经过配置flush页面缓存的周期、阈值缓解,可是一样会频繁的写磁盘会影响性能,又是一个选择题,根据实际状况配置。

接着,咱们再看消息消费的可靠性,Kafka提供的是“At least once”模型,由于消息的读取进度由offset提供,offset能够由消费者本身维护也能够维护在zookeeper里,可是当消息消费后consumer挂掉,offset没有即时写回,就有可能发生重复读的状况,这种状况一样能够经过调整commit offset周期、阈值缓解,甚至消费者本身把消费和commit offset作成一个事务解决,可是若是你的应用不在意重复消费,那就干脆不要解决,以换取最大的性能。

最后,咱们再来看zookeeper的可靠性,很明显,他要挂了,一切都完了,地球就毁灭了,人类就灭绝了,星级穿越也挽救不了了……因此加强可靠性的方式就是把zookeeper也部署成集群。

3.7.性能

好了,说了那么多,咱们实际来测试下Kafka在各类状况下的性能,为了对比我也测了下单机模式下ActiveMQ的性能,不过因为懒,没有搭建ActiveMQ集群进行测试,可是基于其恶心的Forwarding Bridge模型,我也持悲观态度。

首先,测试环境以下:

Kafka:3 broker;8核/32G;默认配置

ActiveMQ:1 broker;8核/32G;默认配置

Producer: 一台机器经过多线程模拟多producer;8核/32G;默认配置,异步发送

Consumer: 一台机器经过多线程模拟多consumer;8核/32G;默认配置

除了特殊说明,生产和消费同时进行。

 

而后,我使用以下字符表示各类测试条件:

1T-1P3C-1P1C-1KW-1K

1T:1个toipc

1P3C:1个partition 3个replication

1P1C:1个producer 1个consumer

1KW:1千万条消息

1K:每一个消息1K

 

我先对ActiveMQ在单机多Producer、多consumer的状况下的测试,结果比我想象中的好,官方的给出的一个数据是1-2K的数据,每秒10-20K个,这样算下来大概30-40MB/S,而测试的结果在多线程的状况下会更好些。

ActiveMQ-thread Produce Consume
1T-XXX-1P1C-1KW-1K 28.925MB/S 28.829MB/S
1T-XXX-3P3C-1KW-1K 43.711MB/S 41.791MB/S
1T-XXX-8P8C-1KW-1K 52.426MB/S 52.383MB/S

 

而后我又对Kafka进行了相应的测试,用一个partition模拟单机模式,结果和预想的同样,在单机模型下,二者差别不大;而官方给的数听说生产者能达到50MB/S,消费者能达到100MB/S,生产者符合官方数据,而消费者我始终没有压到那么高的速度。

Kafka- thread Produce Consume
1T-1P1C-1P1C-1KW-1K 29.214MB/S 29.117MB/S
1T-1P1C-3P3C-1KW-1K 46.168MB/S 43.018MB/S
1T-1P1C-8P8C-1KW-1K 52.140MB/S 51.975MB/S

 

接下来的对于Kafka集群,我想一样数量的消息会不会由于topic数目的增多而影响,测试结果以下,代表topic越多,速度会有所降低,也符合预期。

Kafka-topic Produce Consume
1T-3P3C-3P3C-1.2KW-1K 49.255MB/S 49.204MB/S
3T-3P3C-3P3C-0.4KW*3-1K 46.239MB/S 45.774MB/S

 

而后为了测试partition对性能的影响,进行了以下测试,能够看到partition数量越多,总的生产和消费速度越快;可是意外的是Only produce状况下生产效率没有明显提高反而略慢,这里怀疑和page cache有关,没有深刻研究。

Kafka-partition Produce Consume Only Produce Only Consume
1T-1P3C-1P1C-1KW-1K 29.213MB/S 29.117MB/S 28.941MB/S 34.360MB/S
1T-3P3C-3P3C-1KW-1K 47.103MB/S 46.966MB/S 46.540MB/S 66.219MB/S
1T-8P3C-8P8C-1KW-1K 61.522MB/S 61.412MB/S 60.703MB/S 72.701MB/S

 

综上,咱们能够看到Kafka的性能和吞吐是能够扩展的。

3.8.风险点

对于咱们来讲,Kafka主要有两个风险点,第一,要深刻使用必需要熟读源码,而kafka源码是用scala写的,咱们并无相应的技术储备,须要学习;第二,kafka技术较新,目前的版本是0.8.1.1,看起来还不太成熟。

4.KG应用

这一块是在公司内部系统的应用,不适合对外,因此这里删去。

5.参考资料

Kafka-DOC:http://kafka.apache.org/documentation.html

ActiveMQ-DOC:http://activemq.apache.org

Understading the differences between AMQP & JMS:http://www.wmrichards.com/amqp.pdf

WIKI-MQ:http://en.wikipedia.org/wiki/Message_queue

WIKI-JMS:http://en.wikipedia.org/wiki/Java_Message_Service

WIKI-AMQP:http://en.wikipedia.org/wiki/Advanced_Message_Queuing_Protocol

相关文章
相关标签/搜索