什么是kafka?php
Kakfa起初是由LinkedIn公司开发的一个分布式的消息系统,后成为Apache的一部分,它使用Scala编写,以可水平扩展和高吞吐率而被普遍使用。目前愈来愈多的开源分布式处理系统如Cloudera、Apache Storm、Spark等都支持与Kafka集成。html
活动流数据是几乎全部站点在对其网站使用状况作报表时都要用到的数据中最常规的部分。活动数据包括页面访问量(Page View)、被查看内容方面的信息以及搜索状况等内容。这种数据一般的处理方式是先把各类活动以日志的形式写入某种文件,而后周期性地对这些文件进行统计分析。运营数据指的是服务器的性能数据(CPU、IO使用率、请求时间、服务日志等等数据)。运营数据的统计方法种类繁多。
近年来,活动和运营数据处理已经成为了网站软件产品特性中一个相当重要的组成部分,这就须要一套稍微更加复杂的基础设施对其提供支持。如今最新版本已经到2.x.x前端
kafka的架构java
一个典型的Kafka体系架构包括若干Producer(能够是服务器日志,业务数据,页面前端产生的page view等等),若干broker(Kafka支持水平扩展,通常broker数量越多,集群吞吐率越高),若干Consumer (Group),以及一个Zookeeper集群。Kafka经过Zookeeper管理集群配置,选举leader,以及在consumer group发生变化时进行rebalance。Producer使用push(推)模式将消息发布到broker,Consumer使用pull(拉)模式从broker订阅并消费消息。web
kafka的优势数据库
Kafka与传统MQ区别:api
为什么要用消息系统数组
在项目启动之初来预测未来项目会碰到什么需求,是极其困难的。消息系统在处理过程当中间插入了一个隐含的、基于数据的接口层,两边的处理过程都要实现这一接口。这容许你独立的扩展或修改两边的处理过程,只要确保它们遵照一样的接口约束。缓存
有些状况下,处理数据的过程会失败。除非数据被持久化,不然将形成丢失。消息队列把数据进行持久化直到它们已经被彻底处理,经过这一方式规避了数据丢失风险。许多消息队列所采用的”插入-获取-删除”范式中,在把一个消息从队列中删除以前,须要你的处理系统明确的指出该消息已经被处理完毕,从而确保你的数据被安全的保存直到你使用完毕。安全
由于消息队列解耦了你的处理过程,因此增大消息入队和处理的频率是很容易的,只要另外增长处理过程便可。不须要改变代码、不须要调节参数。扩展就像调大电力按钮同样简单。
在访问量剧增的状况下,应用仍然须要继续发挥做用,可是这样的突发流量并不常见;若是为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列可以使关键组件顶住突发的访问压力,而不会由于突发的超负荷的请求而彻底崩溃。
系统的一部分组件失效时,不会影响到整个系统。消息队列下降了进程间的耦合度,因此即便一个处理消息的进程挂掉,加入队列中的消息仍然能够在系统恢复后被处理。
在大多使用场景下,数据处理的顺序都很重要。大部分消息队列原本就是排序的,而且能保证数据会按照特定的顺序来处理。Kafka保证一个Partition内的消息的有序性。
在任何重要的系统中,都会有须要不一样的处理时间的元素。例如,加载一张图片比应用过滤器花费更少的时间。消息队列经过一个缓冲层来帮助任务最高效率的执行———写入队列的处理会尽量的快速。该缓冲有助于控制和优化数据流通过系统的速度。
不少时候,用户不想也不须要当即处理消息。消息队列提供了异步处理机制,容许用户把一个消息放入队列,但并不当即处理它。想向队列中放入多少消息就放多少,而后在须要的时候再去处理它们。
RabbitMQ是使用Erlang编写的一个开源的消息队列,自己支持不少的协议:AMQP,XMPP, SMTP, STOMP,也正因如此,它很是重量级,更适合于企业级的开发。同时实现了Broker构架,这意味着消息在发送给客户端时先在中心队列排队。对路由,负载均衡或者数据持久化都有很好的支持。
Redis是一个基于Key-Value对的NoSQL数据库,开发维护很活跃。虽然它是一个Key-Value数据库存储系统,但它自己支持MQ功能,因此彻底能够当作一个轻量级的队列服务来使用。对于RabbitMQ和Redis的入队和出队操做,各执行100万次,每10万次记录一次执行时间。测试数据分为128Bytes、512Bytes、1K和10K四个不一样大小的数据。实验代表:入队时,当数据比较小时Redis的性能要高于RabbitMQ,而若是数据大小超过了10K,Redis则慢的没法忍受;出队时,不管数据大小,Redis都表现出很是好的性能,而RabbitMQ的出队性能则远低于Redis。
ZeroMQ号称最快的消息队列系统,尤为针对大吞吐量的需求场景。ZMQ可以实现RabbitMQ不擅长的高级/复杂的队列,可是开发人员须要本身组合多种技术框架,技术上的复杂度是对这MQ可以应用成功的挑战。ZeroMQ具备一个独特的非中间件的模式,你不须要安装和运行一个消息服务器或中间件,由于你的应用程序将扮演这个服务器角色。你只须要简单的引用ZeroMQ程序库,可使用NuGet安装,而后你就能够愉快的在应用程序之间发送消息了。可是ZeroMQ仅提供非持久性的队列,也就是说若是宕机,数据将会丢失。其中,Twitter的Storm 0.9.0之前的版本中默认使用ZeroMQ做为数据流的传输(Storm从0.9版本开始同时支持ZeroMQ和Netty做为传输模块)。
ActiveMQ是Apache下的一个子项目。 相似于ZeroMQ,它可以以代理人和点对点的技术实现队列。同时相似于RabbitMQ,它少许代码就能够高效地实现高级应用场景。
Kafka是Apache下的一个子项目,是一个高性能跨语言分布式发布/订阅消息队列系统,而Jafka是在Kafka之上孵化而来的,即Kafka的一个升级版。具备如下特性:快速持久化,能够在O(1)的系统开销下进行消息持久化;高吞吐,在一台普通的服务器上既能够达到10W/s的吞吐速率;彻底的分布式系统,Broker、Producer、Consumer都原生自动支持分布式,自动实现负载均衡;支持Hadoop数据并行加载,对于像Hadoop的同样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka经过Hadoop的并行加载机制统一了在线和离线的消息处理。Apache Kafka相对于ActiveMQ是一个很是轻量级的消息系统,除了性能很是好以外,仍是一个工做良好的分布式系统。
关键概念
Kafka集群包含一个或多个服务器,这种服务器被称为broker。每一个broker一般就是一台物理机器,在上面运行kafka server的一个实例,全部这些broker实例组成kafka的服务器集群。每一个broker会给本身分配一个惟一的broker id。broker集群是经过zookeeper集群来管理的。每一个broker都会注册到zookeeper上,有某个机器挂了,有新的机器加入,zookeeper都会收到通知。在0.9.0中,producer/consumer已经不会依赖Zookeeper来获取集群的配置信息,而是经过任意一个broker来获取整个集群的配置信息。
每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic,Topic这是一个逻辑上的概念,而Partition是物理上的概念。(物理上,不一样Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic便可生产或消费数据而没必要关心数据存于何处)。因此咱们在谈论topic每每和partition一块儿讨论,二者有紧密的联系:
每一个topic将被分红多个partition(区),每一个partition在存储层面是append log文件。任何发布到此partition的消息都会被直接追加到log文件的尾部,每条消息在文件中的位置称为offset(偏移量),offset为一个long型数字,它是惟一标记一条消息。它惟一的标记一条消息。kafka并无提供其余额外的索引机制来存储offset,由于在kafka中几乎不容许对消息进行“随机读写”。
Parition是物理上的概念,每一个Topic包含一个或多个Partition,不一样Partition可位于不一样节点(kafka server实例)。同时每一个Partition在物理上对应一个本地文件夹,每一个Partition包含一个或多个Segment,每一个Segment包含一个数据文件和一个与之对应的索引文件。在逻辑上,能够把一个Partition看成一个很是长的数组,可经过这个“数组”的索引(offset)去访问其数据。
每一条消息被发送到broker中,会根据partition规则(有默认规则,固然也能够自定义规则)选择被存储到哪个partition。若是partition规则设置的合理,全部消息能够均匀分布到不一样的partition里,这样就实现了水平扩展。(若是一个topic对应一个文件,那这个文件所在的机器I/O将会成为这个topic的性能瓶颈,而partition解决了这个问题)。在建立topic时能够在$KAFKA_HOME/config/server.properties中指定这个partition的数量,固然能够在topic建立以后去修改partition的数量。
在发送一条消息时,能够指定这个消息的key,producer根据这个key和partition机制来判断这个消息发送到哪一个partition。partition机制能够经过指定producer的partition.class这一参数来指定,该class必须实现kafka.producer.Partitioner接口。
一方面,因为不一样Partition可位于不一样机器,所以能够充分利用集群优点,实现机器间的并行处理。另外一方面,因为Partition在物理上对应一个文件夹,即便多个Partition位于同一个节点,也可经过配置让同一节点上的不一样Partition置于不一样的disk drive上,从而实现磁盘间的并行处理,充分发挥多磁盘的优点。
利用多磁盘的具体方法是,将不一样磁盘mount到不一样目录,而后在server.properties中,将log.dirs设置为多目录(用逗号分隔)。Kafka会自动将全部Partition尽量均匀分配到不一样目录也即不一样目录(也即不一样disk)上。注:虽然物理上最小单位是Segment,但Kafka并不提供同一Partition内不一样Segment间的并行处理。由于对于写而言,每次只会写Partition内的一个Segment,而对于读而言,也只会顺序读取同一Partition内的不一样Segment。
负责发布消息到Kafka broker,并负责决定发布到哪一个分区。一般简单的由负载均衡机制随机选择分区,但也能够经过特定的分区函数选择分区。使用的更多的是第二种。
消息消费者,向Kafka broker读取消息的客户端。在消费消息时只须要指明topic名称便可,不须要关心具体的消息存在了哪一个borker上。注意,一个topic只能被一个group组中的一个cusmer来消费,可是能够被多个group组来同时消费;consumer只能修改被commit状态的消息;
发布消息一般有两种模式:队列模式(queuing)和发布-订阅模式(publish-subscribe)。队列模式中,consumers能够同时从服务端读取消息,每一个消息只被其中一个consumer读到;发布-订阅模式中消息被广播到全部的consumer中。Consumers能够加入一个consumer 组,共同竞争一个topic,topic中的消息将被分发到组中的一个成员中。同一组中的consumer能够在不一样的程序中,也能够在不一样的机器上。若是全部的consumer都在一个组中,这就成为了传统的队列模式,在各consumer中实现负载均衡。若是全部的consumer都不在不一样的组中,这就成为了发布-订阅模式,全部的消息都被分发到全部的consumer中。更常见的是,每一个topic都有若干数量的consumer组,每一个组都是一个逻辑上的“订阅者”,为了容错和更好的稳定性,每一个组由若干consumer组成。这其实就是一个发布-订阅模式,只不过订阅者是个组而不是单个consumer。
由两个机器组成的集群拥有4个分区 (P0-P3) 2个consumer组. A组有两个consumerB组有4个
相比传统的消息系统,Kafka能够很好的保证有序性。
传统的队列在服务器上保存有序的消息,若是多个consumers同时从这个服务器消费消息,服务器就会以消息存储的顺序向consumer分发消息。虽然服务器按顺序发布消息,可是消息是被异步的分发到各consumer上,因此当消息到达时可能已经失去了原来的顺序,这意味着并发消费将致使顺序错乱。为了不故障,这样的消息系统一般使用“专用consumer”的概念,其实就是只容许一个消费者消费消息,固然这就意味着失去了并发性。
在这方面Kafka作的更好,经过分区的概念,Kafka能够在多个consumer组并发的状况下提供较好的有序性和负载均衡。将每一个分区分只分发给一个consumer组,这样一个分区就只被这个组的一个consumer消费,就能够顺序的消费这个分区的消息。由于有多个分区,依然能够在多个consumer组之间进行负载均衡。注意consumer组的数量不能多于分区的数量,也就是有多少分区就容许多少并发消费。
Kafka只能保证一个分区以内消息的有序性,在不一样的分区之间是不能够的,这已经能够知足大部分应用的需求。若是须要topic中全部消息的有序性,那就只能让这个topic只有一个分区,固然也就只有一个consumer组消费它。
每一个Consumer属于一个特定的Consumer Group(可为每一个Consumer指定group name,若不指定group name则属于默认的group)。
每一个topic的partion的全部消息,都不是只存1份,而是在多个broker上冗余存储,从而提升系统的可靠性。这多台机器就叫一个replica集合。
在这个replica集合中,须要选出1个leader,剩下的是follower。也就是master/slave。
发送消息的时候,只会发送给leader,而后leader再把消息同步给followers(以pull的方式,followers去leader上pull,而不是leader push给followers)。
那这里面就有一个问题:leader收到消息以后,是直接返回给producer呢,仍是等全部followers都写完消息以后,再返回? 关于这个请看下面博客中的kafka机制内容
关键点:这里replica/leader/follower都是逻辑概念,而且是相对”partion”来说的,而不是”topic”。也就说,同一个topic的不一样partion,对于的replica集合能够是不同的。
好比 :
“abc-0” <1,3,5> //abc_0的replica集合是borker 1, 3, 5, leader是1, follower是3, 5
“abc-1” <1,3,7> //abc_1的replica集合是broker 1, 3, 7,leader是1, follower是3, 7
“abc_2” <3,7,9>
“abc_3” <1,7,9>
“abc_4” <1,3,5>
持久化文件删除策略
什么是数据分片
复制备份
kafka将每一个partition数据复制到多个server上,任何一个partition有一个leader和多个follower(能够没有);备份的个数能够经过broker配置文件来设定.leader处理全部的read-write请求,follower须要和leader保持同步.Follower和consumer同样,消费消息并保存在本地日志中;leader负责跟踪全部的follower状态,若是follower"落后"太多或者失效,leader将会把它从replicas同步列表中删除.当全部的follower都将一条消息保存成功,此消息才被认为是"committed",那么此时consumer才能消费它.即便只有一个replicas实例存活,仍然能够保证消息的正常发送和接收,只要zookeeper集群存活便可.(不一样于其余分布式存储,好比hbase须要"多数派"存活才行)
leader选举
使用场景
对于消息队列的使用,表面上看起来很简单,一端往里面放,一端从里面取。但就在这一放一取中,存在着诸多策略。
所谓ACK,是指服务器收到消息以后,是存下来以后,再给客户端返回,仍是直接返回。很显然,是否ACK,是影响性能的一个重要指标。在kafka中,request.required.acks有3个取值,分别对应3种策略:
request.required.acks
//0: 不等服务器ack就返回了,性能最高,可能丢数据
//1. leader确认消息存下来了,再返回
//all: leader和当前ISR中全部replica都确认消息存下来了,再返回(这种方式最可靠)
备注:在0.9.0之前的版本,是用-1表示all
所谓异步发送,就是指客户端有个本地缓冲区,消息先存放到本地缓冲区,而后有后台线程来发送。
在0.8.2和0.8.2以前的版本中,同步发送和异步发送是分开实现的,用的Scala语言。从0.8.2开始,引入了1套新的Java版的client api。在这套api中,同步其实是用异步间接实现的:
在异步发送下,有如下4个参数须要配置:
(1)队列的最大长度
buffer.memory //缺省为33554432, 即32M
(2)队列满了,客户端是阻塞,仍是抛异常出来(缺省是true)
block.on.buffer.full
//true: 阻塞消息
//false:抛异常
(3)发送的时候,能够批量发送的数据量
batch.size //缺省16384字节,即16K
(4)最长等多长时间,批量发送
linger.ms //缺省是0
//相似TCP/IP协议中的linger algorithm,> 0 表示发送的请求,会在队列中积攥,而后批量发送。
很显然,异步发送能够提升发送的性能,但一旦客户端挂了,就可能丢数据。
对于RabbitMQ, ActiveMQ,他们都强调可靠性,所以不容许非ACK的发送,也没有异步发送模式。Kafka提供了这个灵活性,容许使用者在性能与可靠性之间作权衡。
(5)消息的最大长度
max.request.size //缺省是1048576,即1M
这个参数会影响batch的大小,若是单个消息的大小 > batch的最大值(16k),那么batch会相应的增大
全部的消息队列都要面对一个问题,是broker把消息Push给消费者呢,仍是消费者主动去broker Pull消息?
kafka选择了pull的方式,为何呢? 由于pull的方式更灵活:消息发送频率应该如何,消息是否能够延迟而后batch发送,这些信息只有消费者本身最清楚!
所以把控制权交给消费者,消费者本身控制消费的速率,当消费者处理消息很慢时,它能够选择减缓消费速率;当处理消息很快时,它能够选择加快消费速率。而在push的方式下,要实现这种灵活的控制策略,就须要额外的协议,让消费者告诉broker,要减缓仍是加快消费速率,这增长了实现的复杂性。
另外pull的方式下,消费者能够很容易的自适应控制消息是batch的发送,仍是最低限度的减小延迟,每来1个就发送1个。
在消费端,全部消息队列都要解决的一个问题就是“消费确认问题”:消费者拿到一个消息,而后处理这个消息的时候挂了,若是这个时候broker认为这个消息已经消费了,那这条消息就丢失了。
一个解决办法就是,消费者在消费完以后,再往broker发个confirm消息。broker收到confirm消息以后,再把消息删除。
要实现这个,broker就要维护每一个消息的状态,已发送/已消费,很显然,这会增大broker的实现难度。同时,这还有另一个问题,就是消费者消费完消息,发送confirm的时候,挂了。这个时候会出现重复消费的问题。
kafka没有直接解决这个问题,而是引入offset回退机制,变相解决了这个问题。在kafka里面,消息会存放一个星期,才会被删除。而且在一个partion里面,消息是按序号递增的顺序存放的,所以消费者能够回退到某一个历史的offset,进行从新消费。
固然,对于重复消费的问题,须要消费者去解决。
在某些业务场景下,须要消息的顺序不能乱:发送顺序和消费顺序要严格一致。而在kafka中,同一个topic,被分红了多个partition,这多个partition之间是互相独立的。
之因此要分红多个partition,是为了提升并发度,多个partition并行的进行发送/消费,但这却没有办法保证消息的顺序问题。
一个解决办法是,一个topic只用一个partition,但这样很显然限制了灵活性。
还有一个办法就是,全部发送的消息,用同一个key,这样一样的key会落在一个partition里面。
咱们都知道,操做系统自己是有page cache的。即便咱们用无缓冲的io,消息也不会当即落到磁盘上,而是在操做系统的page cache里面。操做系统会控制page cache里面的内容,何时写回到磁盘。在应用层,对应的就是fsync函数。
咱们能够指定每条消息都调用一次fsync存盘,但这会较低性能,也增大了磁盘IO。也可让操做系统去控制存盘。
一个完美的消息队列,应该作到消息的“不重不漏”,这里面包含了4重语义:
消息不会重复存储;
消息不会重复消费;
消息不会丢失存储;
消息不会丢失消费。
先说第1个:重复存储。发送者发送一个消息以后,服务器返回超时了。那请问,这条消息是存储成功了,仍是没有呢?
要解决这个问题:发送者须要给每条消息增长一个primary key,同时服务器要记录全部发送过的消息,用于判重。很显然,要实现这个,代价很大
重复消费:上面说过了,要避免这个,消费者须要消息confirm。但一样,会引入其余一些问题,好比消费完了,发送confirm的时候,挂了怎么办? 一个消息一直处于已发送,但没有confirm状态怎么办?
丢失存储:这个已经解决
丢失消费:同丢失存储同样,须要confirm。
总结一下:真正作到不重不漏,exactly once,是很难的。这个须要broker、producer、consumer和业务方的协调配合。
在kafka里面,是保证消息不漏,也就是at least once。至于重复消费问题,须要业务本身去保证,好比业务加判重表。
常遇到的问题
因为一些博客,在kafka介绍方面已经很是的完善,因此这里只收集一些有内涵的文章来供你们学习参考:
学习连接