一句话归纳kafka的核心功能就是:高性能的消息发送与高性能的消息消费。java
kafka刚推出的时候是以消息引擎的身份出现的,它具备强大的消息传输效率和完备的分布式解决方案,随着版本更新,在kafka0.10.0.0版推出了流式处理组件——Kafka Streams,使kafka交由下游数据处理平台作的事也能够本身作,自此kafka在消息引擎的基础上正式成为了一个流式处理框架。但不管是消息引擎仍是流式处理平台,kafka的处理架构从未质变,归纳以下:正则表达式
图 kafka简要架构图算法
总结就是三句话:数据库
说到消息引擎,和它相似的术语就是消息队列和消息中间件,我的感受称kafka为消息引擎更合理。由于“消息队列”名字给出了一个很不许确的暗示,仿佛它就是以队列的形式实现的;而消息中间件有点过分强调了“中间件”之嫌,使其真实用途不够明显。apache
消息引擎系统既然是在不一样应用之间传输消息的系统,那么在设计时须要重点考虑的关键因素就是:消息设计、传输协议设计和消息引擎范型。编程
消息设计bootstrap
消息引擎系统在设计消息时必定要考虑语义的清晰和格式上的通用性,消息一般都采用结构化的方式进行设计,好比XML格式、JSON格式的消息等,而kafka的消息是用二进制方式来保存的,但依然是结构化的消息。数组
传输协议设计缓存
广义上的传输协议包括任何可以在不一样系统间传输消息或是执行语义操做的协议或框架,好比RPC及序列化框架、Google的ProtoBuffers、阿里系的Dubbo等,而kafka本身设计了一套二进制的消息传输协议。(后面再讲)传输协议做为一个基础构建块,它服务于消息引擎系统实现的消息引擎范型。安全
消息引擎范型
最多见的两种消息引擎范型是消息队列模型和发布/订阅模型。
消息队列模型是基于队列提供消息传输服务的,其定义了消息队列、发送者、接收者,提供的是一种点对点的消息传递方式。一旦消息被消费就会从队列中移除该消息,每条消息由一个发送者生产出来,且只被一个消费者处理——发送者和消费者是一对一的关系,相似生活中以前的电话接线生的工做。
发布/订阅模型有主题的概念,一个主题能够理解为逻辑语义相近的消息的容器,该模型也定义了相似生产者、消费者的角色,即发布者(publisher)和订阅者(subscriber)。发布者将消息生产出来发送到指定的topic中,全部订阅了该topic的订阅者均可以接受到该topic下的全部消息,相似生活中报纸的订阅。
kafka经过引入消息组(consumer group)来同时支持这两种模型。(后面再讲)
kafka的设计初衷就是为了解决互联网公司超大量级数据的实时传输,概要设计关键点:吞吐量/延时、消息持久化、负载均衡和故障转移、伸缩性。
一、吞吐量/延时
kafka的吞吐量就是指每秒可以处理的消息数或者每秒能处理的字节数。kafka的延时能够表示客户端发起请求与服务器处理请求并发送响应给客户端之间的这段时间。在实际使用场景中,这两个指标一般是一个矛盾体,但也不是等比例的此消彼长的关系。
kafka写入端实现高吞吐量低延时的方法原理:利用操做系统的页缓存和采用追加写入消息的方式。
kafka会持久化全部数据到磁盘,可是本质上每次写入操做都只是把数据写入到操做系统的页缓存中,而后由操做系统自行决定什么时候把页缓存中的数据写回磁盘上。正是得益于这种对磁盘的使用方式,使得kafka的写入操做是很快的。
这样设计有3个主要优点:
kafka在设计时采用了追加写入消息的方式,即只能在日志文件末尾追加写入新的消息,且不能修改已写入的消息,所以它属于典型的磁盘顺序访问型操做。
kafka消费端实现高吞吐量低延时的方法原理:kafka把消息写入操做系统的页缓存中,一样地,kafka在读取消息时会首先尝试从操做系统的页缓存中读取,且大部分消息极可能依然存在于页缓存中,若是命中就把消息经页缓存直接发送到网络的socket上,不用“穿透”到底层的物理磁盘上获取消息,同时这个过程用到了大名鼎鼎的零拷贝(zero copy)技术。
补充说明:传统的Linux操做系统中的I/O接口是依托于数据拷贝来是实现的,在零拷贝技术出现以前,一个I/O操做会将同一份数据进行屡次拷贝,数据传输过程当中还涉及到内核态与用户态的上下文切换,CPU的开销很是大,极大限制了操做系统高效进行数据传输的能力,而零拷贝技术很好的改善了这个问题。
【总结】kafka依靠下面4点达到了高吞吐量、低延时的设计目标:
二、消息持久化
kafka是要持久化消息到磁盘上的,这样作的好处是:
普通系统在实现持久化时可能会先尽可能使用内存,当内存资源耗尽时再一次性的把数据“刷盘”,而kafka则反其道而行之,全部数据都会当即被写入文件系统的持久化日志中,以后kafka服务器才会返回结果给客户端通知它们消息已被成功写入。这样能减小kafka程序对内存的消耗从而将节省出来的内存留给页缓存使用,更进一步提高性能。
三、负载均衡和故障转移
负载均衡就是指让系统的负载根据必定的规则均衡地分配在全部参与工做的服务器上,从而最大限度的提高系统总体的运行效率。
对于kafka来讲就是,每台服务器broker都有均等的机会为kafka的客户提供服务,能够把负载分散到全部集群中的机器上。
kafka经过智能化的分区领导者选举来实现负载均衡,kafka默认提供智能的leader选举算法,可在集群的全部机器上以均等机会分散各个partition的leader,从而总体上实现负载均衡。
kafka的故障转移是经过使用会话机制实现的,每台kafka服务器启动后会以会话的形式把本身注册到zookeeper服务器上。一旦该服务器运转出现问题,与zookeeper的会话便不能维持从而超时失效,此时kafka集群会选举出另外一台服务器来彻底替代这台服务器继续提供服务。
四、伸缩性
伸缩性是指向分布式系统中增长额外的计算资源好比CPU、内存、存储或带宽等时吞吐量提高的能力。
若是一个CPU的运算能力是U,那么两个CPU的运算能力咱们天然但愿是2U,便可以线性的扩容计算能力,可是因为不少隐藏的“单点”瓶颈致使实际中几乎不可能达到。阻碍线性扩容的一个很常见的因素就是状态的保存,由于不管哪类分布式系统,集群中的每台服务器必定会维护不少内部状态,若是有服务器本身来保存这些状态信息,则必需要处理一致性的问题。相反,若服务器是无状态的,状态的保存和管理交由专门的协调服务来作好比zookeeper,那么整个集群的服务器之间就无需繁重的状态共享,就极大地下降了维护复杂度。假若要扩容集群节点,只需简单的启动新的节点机器进行自动负载均衡就能够了。kafka正是采用上述思想,将每台kafka服务器上的状态统一交由zookeeper保管,扩展kafka集群时只需启动新的kafka服务器便可。说明:kafka服务器上并非全部状态都不保存,之保存了很轻量级的内部状态,所以整个集群间维护状态一致性的代价很低。
一、消息
消息由消息头部、key和value组成。kafka中的消息格式由不少字段组成,其中不少字段都是用于管理消息的元数据字段能,对用户是透明的。V1版本的消息格式以下图(不一样版本可能会有稍微差别):
图 消息的完整格式
kafka使用紧凑的二进制字节数组来保存字段,也就是没有多余的比特位浪费。一般的Java堆上内存分配,即便有重排各个字段在内存的布局以减小内存使用量的优化措施,但仍有部分字节用于补齐之用。同时,运行Java的操做系统一般都默认开启了页缓存机制,也就是说堆上保存的对象极可能在页缓存中还保留一份,这就形成了极大的资源浪费。kafka在消息设计时直接使用紧凑的二进制字节数组ByteBuffer而不是独立的对象,避开了繁重的java堆上内存分配。所以,咱们至少可以访问多一倍的可用内存。还有一点,大量使用页缓存而非堆内存还有一个好处——数据不丢失,即当出现kafka broker进程崩溃时,堆内存上的数据也一并消失,但页缓存的数据依然存在。
二、主题和分区即topic和partition:
topic只是一个逻辑概念,表明一类消息,也能够认为是消息被发送到的地方,一般咱们可使用topic来区分实际业务。
kafka中的topic一般都会被多个消费者订阅,出于性能的考量,kafka并非topic-message的两级结构,而是采用topic-partition-message的三级结构来分散负载。topic与partition关系以下图.
图 topic和partition
kafka的partition实际上并无太多的业务含义,它的引入就是单纯的为了提高系统的吞吐量。
topic是有多个partition组成的,而partition是不可修改的有序消息序列,也能够说是有序的消息日志。每一个partition有本身专属的partition号,一般是从0开始。用户对partition惟一 能作的就是在消息序列的末尾追加写入消息。partition上的每条消息都会被分配一个惟一的序列号——位移。位移值也是从0开始顺序递增的整数,经过位移信息能够惟必定位到某partition下的一条信息。
三、位移offset
topic partition下的每条消息都被分配一个位移值,而在kafka消费者端也有位移的概念,注意区分。每条消息在某个partition的位移是固定的,但消费该partition的消费者的位移会随着消费进度不断前移,但不会超过前者。所以,从此讨论位移的时候必定给出清晰的上下文环境。
综上,能够断言kafka中的一条消息其实就是一个<topic, partition,offset>三元组。
四、replica副本、leader、follower
kafka中的分区partition是有序消息日志,那为了实现高可靠性,经过冗余机制——备份多份日志,而这些备份日志在kafka中被称为副本(replica),它们存在的惟一目的就是防止数据丢失。
kafka中的replica分为两个角色:领导者(leader)和追随者(follower)(相似过去的主备的提法(Master-slave)),也即副本分为两类:领导者副本(leader replica)和追随者副本(follower replica)。follower replica是不能提供服务给客户端的,也即不负责响应客户端发来的消息写入和消息消费请求,它只是被动地向领导者副本获取数据,保持与leader的同步,follower存在的惟一价值就是充当leader的候补,一旦leader replica所在的broker宕机,kafka会从剩余的replica中选举出新的leader继续提供服务。
图 kafka的leader-follower系统
五、ISR(与leader replica保持同步的replica集合)
好比一个partition能够配置N个replica,那么是否就觉得着该partition能够容忍N-1个replica失效而不丢失数据呢?答案是“否”。
kafka为partition动态维护一个replica集合,该集合中的全部replica保存的消息日志都与leader replica保持同步状态,只有这个集合中的replica才能被选举为leader,也只有该集合中全部replica都接收到了同一条消息,kafka才会将该消息置于“已提交”状态,即认为这条消息发送成功。kafka能保证只要ISR集合中至少存在一个replica,那些“已提交”状态的消息就不会丢失——两个关键点:第一,ISR中至少存在一个“活着的”replica;第二,“已提交”消息。
正常状况下,partition的全部replica都应该与leader replica保持同步,即全部的replica都在ISR中,但因各类缘由,小部分replica可能开始落后于leader replica的进度,当其滞后到必定程度时,kafka会将这些replica“踢出”ISR。相反,当这些replica从新“追上”了leader replica的进度时,kafka又会将它们加回到ISR中。这些都是自动维护的,不需人工干预。
一、消息传输:替代传统的消息总线等。
二、网站行为日志追踪:鉴于点击流数据量很大,kafka超强的吞吐量特性就有了用武之地。网站上的用户操做以消息的形式发送到kafka的某个对应topic中,而后使用机器学习或其余实时处理框架来帮助收集并分析。
三、审计数据收集:从各个运维应用程序处实时汇总操做步骤信息进行集中式管理,同时支持持久化特性,方便后续离线审计。
四、日志收集:各个机器上的分散日志,经过kafka进行全量收集,并集中送往下游的分布式存储如hdfs中。相对于其余主流的日志抽取框架好比flume,kafka有更好的性能,并且提供了完备的可靠性解决方案,同时还有低延时的特色。
五、流式处理:新版本kafka才推出的流式处理组件kafka streams,相对于典型的流式处理框架如Apache Storm、Apache Samza、Spark、Apache Flink等竞争力如何,让时间给出答案吧。
自1.0.0版本开始,kafka版本号正式从原来的四位升级到了如今的3位,格式是<major>.<minor>.<patch>。
在kafka世界中,一般把producer和consumer统称为客户端即clients,这是与服务器端即broker相对应的。
选择kafka版本时要注意的几个分界点为:0.8版本才加入了集群间的备份机制;0.9.0.0版本开始才支持kafka security功能;0.10.0.0(含)以后的版本才有了流式处理组件kafka streams;但建议选择相对较新版本,功能更完善bug更少咯。
2014年kafka的创始人创办了公司——Confluent.io,从事商业化Kafka工具开发以及提供实时流式处理方面的产品。另外,confluent还分为开源版本和企业版本,企业版本中提供了对底层kafka集群完整的可视化监控解决方案以及一些辅助系统帮助管理集群,而开源版本与Apache社区的kafka并没有太大区别。
略
一、操做系统选型
除了现状的确是Linux服务器数量最多,单论它与kafka自己的相适性,Linux也要比Windows等操做系统更加适合部署kafka,能想到的缘由有两个:I/O模型的使用和数据网络传输效率。
二、磁盘选型
使用机械硬盘彻底能够知足kafka集群的使用,固然SSD更好。
关于JBOD(一堆普通磁盘的意思)和RAID(磁盘阵列)的选择,即便用一堆普通商用磁盘进行安装仍是搭建专属的RAID呢?答案是具体问题具体分析。追求性价比的公司能够考虑使用JBOD.
三、磁盘容量规划
主要考虑如下因素:
四、内存规划
kafka对于Java堆内存的使用不是不少,kafka将消息写入页缓存,通常状况下,broker所需的堆内存都不会超过6GB。
对于内存的规划建议以下:
五、CPU规划
要追求多核而非高时钟频率。
六、带宽选择
规划建议为:
七、kafka集群涉及的主要几类参数:
kafka内置有Java版本producer,而当前Apache kafka支持的第三方clients库有不少,这些第三方库基本上都是由非Apache kafka社区的人维护的,用户下载的是Apache kafka的话默认是不包含这些库的,须要单独下载对应的库。
Apache kafka封装了一套二进制通讯协议,对于producer而言,用户几乎可使用任意语言按照该协议进行编程,从而实现向kafka发送消息。
实际上内置的Java版本producer和上面列出的全部第三方库在底层都是相同的实现原理,这组协议本质上为不一样的协议类型分别定义了专属的紧凑二进制字节数组格式,而后经过socket发送给合适的broker,以后等待broker处理完成后返回响应给producer。这样设计的好处就是具备良好的统一性——即全部的协议类型都是统一格式的,而且因为是自定义的二进制格式,这套协议不依赖任何外部序列号框架,从而显得轻量级也具备好的扩展性。
说到producer,它的主要功能就是向某个topic的某个分区发送一条消息,因此它首先须要肯定到底要向topic的哪一个分区写入消息——这就是分区器作的事。
kafka producer提供了一个默认的分区器,对于每条待发送的消息,若是该消息指定了key,那么partitioner会根据key的哈希值来选择目标分区;若这条消息没有指定key,则partitioner使用轮训的方式确认目标分区,从而最大限度的保证消息在全部分区上的均匀性。
固然,producer提供了用户自行指定目标分区的API,即用户在消息发送时跳过partitioner直接指定要发送到的分区。另外,producer也容许用户实现自定义的分区策略而不使用默认的分区器。
第二,确认了目标分区以后,producer要作的第二个事就是寻找这个分区对应的leader,也就是该分区leader副本所在的kafka broker。所以,在发送消息时,producer也就有了多种选择来实现消息发送(好比不等待任何副本的响应便返回成功、只是等待leader副本响应写入操做后再返回成功等)。
producer简言之就是将用户待发送的消息封装成一个ProducerRecord对象,而后使用KafkaProducer.send方法进行发送。具体过程为:Producer首先使用一个线程(用户主线程,也即用户启动Producer的线程)将待发送的消息封装进一个ProducerRecord类实例,而后将其序列化以后发送给partitioner,再结合本地缓存的元数据信息由partitioner来肯定目标分区后一同发送到位于producer程序中的一块内存缓冲区中。而KafkaProducer中的另外一个专门的sender I/O线程则负责实时地从该缓冲区中提取出准备就绪的消息封装进一个批次(batch),统一发送给对应的broker。工做流程图以下图。
图 Java版本producer的工做流程
一、构造producer实例大体步骤
1>构造一个java.util.Properties对象,而后至少指定bootstrap.servers 、key.serializer、value.serializer这三个属性。
对于bootstrap.servers参数,若kafka集群中机器数不少,可只需指定部分broker便可,producer会经过该参数找到并发现集群中全部的broker。被发送到broker端的任何消息的格式必须是字节数组,所以消息的各个组件必须首先作序列化,而后才能发送到broker。必定注意的是,key.serializer和value.serializer两个参数必须是全限定名。
2>使用上一步中建立的Properties实例构造KafkaProducer对象。
/** 建立producer的时候同时指定key和value的序列化类,则不需在Properties中指定了。*/Serializer<String> keySerializer = new StringSerializer();Serializer<String> valueSerializer = new StringSerializer();Producer<String, String> producer = new KafkaProducer<String, String>(props, keySerializer, valueSerializer);
3>构造待发送的消息对象ProducerRecord,指定消息要被发送到的topic、分区及对应的key和value。注意,分区和key信息能够不用指定,有kafka自行肯定分区。
4>调用KafkaProducer的send方法发送消息。
经过Java提供的Future同时实现了同步发送和异步发送+回调(Callback)两种发送方式。而上文代码清单中的调用方式实现了第三种发送方式——fire and forget即发送以后无论发送结果,在实际中不被推荐使用。真是使用场景中,同步和异步发送方式才是最多见的两种方式。
异步发送:实际上全部的写入操做默认都是异步的。send方法提供了回调类参数来实现异步发送以及发送结果的响应,具体代码以下:
/** 发送消息后的回调类Callback其实是一个Java接口,用户能够建立自定义的Callback实现类来处理消息发送后的逻辑,* 只要该类实现org.apache.kafka.clients.producer.Callback接口便可。*/producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {//两个参数不会同时非空if(exception == null) {//消息发送成功}else {//执行错误处理逻辑
if(exception instanceof RetriableException) {
//处理可重试瞬时异常
}else {
//处理不可重试异常
}}}})
同步发送:调用Future.get()无限等待结果返回,即实现同步发送的效果,具体代码以下:
producer.send(record).get();//使用Future.get会一直等待直至Kafka broker将发送结果返回给producer程序.
【说明】不管同步发送和异步发送都有可能失败,当前kafka的错误类型包含两类:可重试异常和不可重试异常。全部可重试异常都继承自org.apache.kafka.common.errors.RetriableException抽象类。
5>关闭KafkaProducer。
producer程序结束时必定要关闭producer。提供有无参数的close方法和有超时参数close方法。在实际场景中,必定要慎用待超时参数的close方法。
二、producer的主要参数
acks:指定在给producer发送响应前,leader broker必需要确保已成功写入该消息的副本数。有3个取值:0、1和all。
acks | producer吞吐量 | 消息持久性 | 使用场景 |
0 | 最高 | 最差 | 一、彻底不关心消息是否发送成功; 二、容许消息丢失(好比统计服务器日志等) |
1 | 适中 | 适中 | 通常场景便可 |
all或-1 | 最差 | 最高 | 不能容忍消息丢失 |
buffer.memory:指定producer端用于缓存消息的缓冲区大小,单位是字节,咱们几乎能够认为该参数指定的内存大小就是producer程序使用的内存大小。
compression.type:指定是否压缩消息,默认是none。若要压缩直接指定压缩类型,目前kafka支持3中压缩算法:GZIP、Snappy和LZ4,根据实际使用经验producer结合LZ4的性能最好。
producer提供了默认的分区策略及对应的分区器供用户使用,但有时候用户可能想实现本身的分区策略,就须要用户自定义实现。若要使用自定义分区机制,用户须要作两件事:
一、在producer程序中建立一个类,实现org.apache.kafka.clients.producer.Partitioner接口。主要分区逻辑在Partitioner.partition中实现;
二、在用于构造KafkaProducer的Properties对象中设置partitioner.class参数。
kafka支持用户自定义消息序列化,须要完成的3件事:
一、定义数据对象格式;
二、建立自定义序列化类,实现org.apache.kafka.common.serialization.Serializer接口,在serializer方法中实现序列化逻辑;
三、在用于构造KafkaProducer的Properties对象中设置key.serializer或value.serializer.
实现定制化逻辑,实例实现了一个简单的双interceptor组成的拦截链。
KafkaProducer.send方法仅仅把消息放入缓冲区中,由一个专属I/O线程负责从缓冲区中提取消息并封装进消息batch中,而后发送出去,而这个过程当中存在着数据丢失的窗口:若I/O线程发送以前producer崩溃,则存在缓冲区中的消息所有丢失了。采用同步发送不会丢数据,可是性能会不好,实际场景中不推荐使用,所以最好能有一份配置,既使用异步方式还能有效避免数据丢失。
一、producer端配置
block.on.buffer.full = true
acks = all or –1
retries = Integer.MAX_VALUE
max.in.flight.requests.per.connection = 1
使用带回调机制的send发送消息,即KafkaProducer.sent(record, callback)
Callback逻辑中显式地当即关闭producer,使用close
二、broker端参数配置
unclean.leader.election.enable = false
replication.factor = 3
min.insync.replicas = 2
replication.factor > min.insync.replicas
enable.auto.commit = false
存在两种基本的使用方法:多线程单KafkaProducer实例 + 多线程多KafkaProducer实例。
两种KafkaProducer使用方式比较
说明 | 优点 | 劣势 | |
单KafkaProducer实例 |
全部线程共享一个KafkaProducer实例 |
实现简单,性能好 |
一、全部线程共享一个内存缓冲区,可能须要较多内存;二、一旦producer某个线程崩溃致使KafkaProducer实例被“破坏”,则全部用户线程都没法工做。 |
多KafkaProducer实例 |
每一个线程维护本身专属的KafkaProducer实例 |
一、每一个用户线程拥有专属的KafkaProducer实例、缓冲区空间及一组对应的配置参数,能够进行细粒度的调优;二、单个KafkaProducer崩溃不会影响其余producer线程工做 |
须要较大的内存分配开销 |
【建议】若是是对分区数很少的Kafka集群而言,推荐使用第一种方法,即在多个producer用户线程中共享一个KafkaProducer实例;若对那些拥有超多分区的集群而言,采用第二种方法具备较高的可控性,方便producer的后续管理。
一、版本对比
新旧版本consumer对比
|
编程语言 |
位移管理 |
API包名 |
主要使用类 |
|
新版本 |
使用消费者组(consumer group) |
Java |
新版本把位移提交到kafka的一个内部topic(__consumer_offsets)上。注意这个topic名字的前面有两个下划线 |
org.apache.kafka.clients.consumer.* |
KafkaConsumer |
旧版本 | 使用low-level consumer,分high-level和low-level两种API. | Scala | 旧版本把位移提交到zookeeper。 | kafka.consumer.* | ZookeeperConsumerConnector SimpleConsumer |
二、consumer分类
consumer分为两类:消费者组(consumer group)和独立消费者(standalone consumer),其中前者是由多个消费者实例(consumer instance)构成一个总体进行消费,然后者则单独执行消费操做。咱们在讨论或开发consumer程序的时候,必须明确消费者上下文信息,即所使用的consumer的版本以及consumer的分类。
【消费者组】
kafka就是经过consumer group实现了对基于队列和基于发布/订阅两种消息引擎模型的支持的:
group.id惟一标示一个consumer group,一个consumer实例能够是一个线程或是运行在其余机器上的进程。
三、位移相关说明
这里的位移指的是consumer端的offset,与分区日志中的offset是不一样的含义。
不少消息引擎是把消费端的offset保存在服务器端(broker),这样作的好处是实现简单,但会存在下面的问题:
而kafka选择让consumer group保存offset,只须要保存一个长整型数据便可。当前kafka consumer在内部使用一个map来保存期订阅topic所属分区的offset。
新版本consumer把位移提交到kafka的一个内部topic(__consumer_offsets)上,用户应尽可能避免执行该topic的任何操做。
一、构造consumer实例大体步骤
一、构造一个java.util.Properties对象,至少指定bootstrap.servers、key.deserializer、value.deserializer和group.id的值;
二、使用上一步建立的Properties实例构造KafkaConsumer对象;
三、调用KafkaConsumer.subscribe方法订阅consumer group感兴趣的topic列表;
注意subscribe方法不是增量式的,后续的subscribe调用会彻底覆盖以前的订阅语句。
四、循环调用KafkaConsumer.poll方法获取封装在ConsumerRecord的topic消息;
poll函数的参数是一个超时设定,一般若是consumer拿到了足够多的可用数据,那么它可当即从该方法返回;但若当前没有足够多的数据可供返回,consumer会处于阻塞状态,这个超时参数即控制阻塞的最大时间。这个超时设定给予了用户可以在consumer消费的同时按期去执行其余任务(但不知道具体实现)。不然设定一个比较大的值甚至是Integer.MAX_VALUE是不错的建议。
五、处理获取到的ConsumerRecord对象;
拿到这些kafka消息后consumer一般都包含处理逻辑,也即consumer的目的不只是要从kafka处读取消息,还要对获取到的消息进行有意义的业务级处理。从kafka consumer的角度来讲,poll方法返回即认为consumer成功消费了消息,但咱们用户的观点一般认为是执行完真正的业务级处理以后才算消费完毕。所以,对于“consumer处理太慢”的问题要从两个方面定位明确瓶颈:第一,若是是poll返回消息的速度过慢,那么能够调节相应的参数来提高poll方法的效率;第二,若消息的业务级处理逻辑过慢,则应该考虑简化处理逻辑或把处理逻辑放入单独的线程执行。
六、关闭KafkaConsumer。
consumer脚本命令:目前来讲,kafka全部命令行脚本表示相同含义的参数都不是统一的名字,好比consumer脚本中的名字是bootstrap-server,到了producer脚本中变成了broker-list,而在建立主题脚本中又变成了zookeeper。
一、订阅列表
在consumer group订阅topic列表使用下面语句便可:
consumer.subscribe(Arrays.asList("topic1","topic2","topic3"));
在独立consumer(standalone consumer),订阅列表则使用下面语句实现手动订阅:
TopicPartition tp1 = new TopicPartition("topic-name", 0);TopicPartition tp2 = new TopicPartition("topic-name", 1);consumer.assign(Arrays.asList(tp1, tp2));
二、基于正则表达式订阅topic
使用基于正则表达式的订阅必须指定ConsumerRebalanceListener,该类是一个回调接口,用户须要经过实现这个接口来是吸纳consumer分区分配方案发生变动时的逻辑。
若是用户使用的是自动提交(即设置enable.auto.commit=true),则一般不用理会这个类,用下面实现类便可。
consumer.subscribe(Pattern.compile("kafka-.*"), new NoOpConsumerRebalanceListener());
可是当用户手动提交位移的,则至少要在ConsumerRebalanceListener实现类的onPartitionsRevoked方法中处理分区分配方案变动时的位移提交。
一、poll的内部原理
kafka的consumer是用来读取消息的,且要可以同时读取多个topic的多个分区的消息。若要实现并行的消息读取,一种方法是使用多线程的方式,为每一个要读取的分区都建立一个专有的线程去消费(这其实就是旧版本consumer采用的方式);另外一种方法是采用相似Linux I/O模型的poll或select等,使用一个线程来同时管理多个socket链接,即同时与多个broker通讯实现消息的并行读取(这就是新版consumer最重要的设计改变)。
新版本Java consumer是一个多线程或说是一个双线程的Java进程:建立KafkaConsumer的线程被称为用户主线程,同时consumer在后台会建立一个心跳线程。KafkaConsumer的poll方法在用户主线程中运行,而一旦consumer订阅了topic,全部的消费逻辑包括coordinator的协调、消费者组的rebalance以及数据的获取都会在主逻辑poll方法的一次调用中被执行。
二、poll使用方法
KafkaConsumer.poll方法引入参数的做用:
poll的使用方法总结以下:
须要按期执行的代码:
try {while (isRunning){//将isRunning标示为volatile型,而后在其余线程中设置isRunning=false来控制consumer的结束。// 四、循环调用KafkaConsumer.poll方法获取封装在ConsumerRecord的topic消息;ConsumerRecords<String, String> records = consumer.poll(1000);// 五、处理获取到的ConsumerRecord对象;for (ConsumerRecord<String, String> record : records) {LOG.info("topic = %s, partition = %d, offset = %d", record.topic(), record.partition(), record.offset());}}} finally {// 千万不要忘记!!关闭KafkaConsumer。它不只会清除consumer建立的各类socket资源,还会通知消费者组coordinator主动离组从而更快的开启新一轮rebalance。consumer.close();}
不须要按期执行的代码:
try {while (true){//设置为trueConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);//在consumer程序未获取到足够多数据时无限等待,而后经过捕获WakeupException异常来判断consumer是否结束。须要在另外一个线程中调用consumer.wakeup()方法来触发consumer的关闭。// 五、处理获取到的ConsumerRecord对象;for (ConsumerRecord<String, String> record : records) {LOG.info("topic = %s, partition = %d, offset = %d", record.topic(), record.partition(), record.offset());}}} catch(WakeupException e) {
// 此处忽略此异常的处理
}finally {
// 千万不要忘记!!关闭KafkaConsumer。它不只会清除consumer建立的各类socket资源,还会通知消费者组coordinator主动离组从而更快的开启新一轮rebalance。consumer.close();}
说明:KafkaConsumer不是线程安全的,但有一个例外就是wakeup方法,用户能够安全地在另外一个线程中调用consumer.wakeup(). 其余KafkaConsumer方法都不能同时在多线程中使用。
offset对于consumer很是重要,由于它是实现消息交付语义保证的基石,常见的3种消息交付语义保证以下:
consumer默认是自动提交的,优点是下降用户的开发成本,劣势是用户不能细粒度地处理位移的提交。
所谓的手动位移提交就是用户自行肯定消息什么时候被真正处理完并能够提交位移。一个典型的应用场景是:用户须要对poll方法返回的消息集合中的消息执行业务级处理,用户想要确保只有消息被真正处理完成后再提交位移,若是使用自动位移提交则没法保证这种时序性,这种状况就必须使用手动位移提交。
设置使用手动位移提交的步骤:
自动提交和手动提交的比较
使用方法 | 优点 | 劣势 | 交付语义保证 | 使用场景 | |
自动提交 | 默认不用配置或显示设置enable.auto.commit=true | 开发成本低,简单易用 | 没法实现精确控制,位移提交失败后不易处理 | 可能形成消息丢失,最多实现“最少一次”处理语义 | 对消息交付语义无需求,容忍必定的消息丢失 |
手动提交 | 设置enable.auto.commit=false;手动调用commitSync或commitAsync提交位移 | 可精确控制位移提交行为 | 额外的开发成本,须自行处理位移提交 | 易实现“最少一次”处理语义,依赖外部状态鹅考实现“精确一次”处理语义 | 消息处理逻辑重,不容许消息消失,至少要求“最少一次”处理语义 |
手动提交位移API进一步细分为同步手动提交和异步手动提交,即commitSync和commitAsync方法。当用户调用上面两个方法时,consumer会为全部它订阅的分区提交位移。它们还有带参数的重载方法。用户调用带参数的方法时须要指定一个Map显示地告诉kafka为哪些分区提交位移。consumer只对它所拥有的分区作提交时更合理的行为,所以跟推荐带参数的重载方法。下面是一段典型的手动提交部分分区位移的代码:
//下面代码按照分区级别进行位移提交。它首先对poll方法返回的消息集合按照分区进行分组,而后每一个分区下的消息待处理完成后构造一个Map对象统一提交位移,从而实现了细粒度控制位移提交。
try {
while (running) {ConsumerRecord<String, String> records = consumer.poll(1000);for (TopicPartition partition : records.partitions()) {List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);for (ConsumerRecord<String, String> record : partitionRecords) {System.out.println(record.offset() + ": " + record.value());}long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));}}} finally {consumer.close();}
7、rebalance监听器(详见Demo代码)
新版本consumer默认是把位移提交到__consumer_offsets中,其实kafka也支持用户把位移提交到外部存储中,好比数据库中。若要实现这个功能,用户就必须使用rebalance监听器。
【注意】使用rebalance监听器的前提是用户使用consumer group,若使用的是consumer或是直接手动分配分区,那么rebalance监听器是无效的。
kafka consumer从broker端获取消息的格式是字节数组。自定义解序列化的步骤(同自定义序列化相似):
下面介绍两种多线程消费的方法及实例代码:
一、每一个线程维护一个KafkaConsumer
在这个方法中用户建立多个线程来消费topic数据,每一个线程都会建立专属于该线程的KafkaConsumer实例,如图.
由图可知,consumer group由多个线程的KafkaConsumer组成,每一个线程负责消费固定数目的分区。
二、单KafkaConsumer实例+多worker线程
本方法将消息的获取与消息的处理解耦,把后者放入单独的工做者线程中,即所谓的woker线程中,同时在全局维护一个或若干个consumer实例执行消息获取任务,以下图。
本例使用全局的kafkaConsumer实例执行消息获取,而后把获取到的消息集合交给线程池中的worker线程执行工做,以后worker线程完成处理后上报位移状态,由全局consumer提交位移。
待补充。。。。
待补充。。。。