kafka 基础知识梳理及集群环境部署记录

 

1、kafka基础介绍html

0. kakfa概述java

Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica)开源消息系统,由Scala写成,是由Apache软件基金会开发的一个开源消息系统项目,该项目的目标是为处理实时数据提供一个统1、高通量、低等待的平台。kafka基于zookeeper协调的分布式消息系统,它的最大的特性就是能够实时的处理大量数据以知足各类需求场景:好比基于hadoop的批处理系统、低延迟的实时系统、storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源 项目。node

kafka是一个分布式消息队列:生产者、消费者的功能。它提供了相似于JMS的特性,可是在设计实现上彻底不一样,此外它并非JMS规范的实现。Kafka对消息保存时根据Topic进行归类,发送消息者称为Producer,消息接受者称为Consumer,此外kafka集群有多个kafka实例组成,每一个实例(server)成为broker。不管是kafka集群,仍是producer和consumer都依赖于zookeeper集群保存一些meta信息,来保证系统可用性linux

kafka是一种高吞吐量的分布式发布订阅消息系统,它能够处理消费者规模的网站中的全部动做流数据。这种动做(网页浏览,搜索和其余用户的行动)是在现代网络上的许多社会功能的一个关键因素。这些数据一般是因为吞吐量的要求而经过处理日志和日志聚合来解决。nginx

消息队列的性能好坏,其文件存储机制设计是衡量一个消息队列服务技术水平和最关键指标之一,Kafka能够实现高效文件存储,实际应用效果极好。git

1. kafka名词解释(架构的四个部分)github

  • Topic在Kafka中,使用一个类别属性来划分数据的所属类,划分数据的这个类别称为topic。若是把Kafka看作为一个数据库,topic能够理解为数据库中的一张表,topic的名字即为表名。消息以topic为类别记录,Kafka将消息种子(Feed)分门别类,每一类的消息称之为一个主题(Topic)。
  • Partition物理上的分区,topic中的数据分割为一个或多个partition。每一个topic至少有一个partition。每一个partition中的数据使用多个segment文件存储。partition中的数据是有序的,partition间的数据丢失了数据的顺序。若是topic有多个partition,消费数据时就不能保证数据的顺序。在须要严格保证消息的消费顺序的场景下,须要将partition数目设为1。
  • Partition offset每条消息都有一个当前Partition下惟一的64字节的offset,它指明了这条消息的起始位置。
  • Replicas of partition副本是一个分区的备份。副本不会被消费者消费,副本只用于防止数据丢失,即消费者不从为follower的partition中消费数据,而是从为leader的partition中读取数据。
  • Broker以集群的方式运行,能够由一个或多个服务组成,每一个服务叫作一个broker;消费者能够订阅一个或多个主题(topic),并从Broker拉数据,从而消费这些已发布的消息。每一个消息(也叫做record记录,也被称为消息)是由一个key,一个value和时间戳构成。
    • Kafka 集群包含一个或多个服务器,服务器节点称为broker。
    • broker存储topic的数据。若是某topic有N个partition,集群有N个broker,那么每一个broker存储该topic的一个partition。
    • 若是某topic有N个partition,集群有(N+M)个broker,那么其中有N个broker存储该topic的一个partition,剩下的M个broker不存储该topic的partition数据。
    • 若是某topic有N个partition,集群中broker数目少于N个,那么一个broker存储该topic的一个或多个partition。在实际生产环境中,尽可能避免这种状况的发生,这种状况容易致使Kafka集群数据不均衡。
  • Producer生产者即数据的发布者,该角色将消息发布到Kafka的topic中。broker接收到生产者发送的消息后,broker将该消息追加到当前用于追加数据的segment文件中。生产者发送的消息,存储到一个partition中,生产者也能够指定数据存储的partition。
  • Consumer消费者能够从broker中读取数据。消费者能够消费多个topic中的数据。
  • Leader每一个partition有多个副本,其中有且仅有一个做为Leader,Leader是当前负责数据的读写的partition。
  • FollowerFollower跟随Leader,全部写请求都经过Leader路由,数据变动会广播给全部Follower,Follower与Leader保持数据同步。若是Leader失效,则从Follower中选举出一个新的Leader。当Follower与Leader挂掉、卡住或者同步太慢,leader会把这个follower从“in sync replicas”(ISR)列表中删除,从新建立一个Follower。

Kafka的架构

  • BrokerKafka的broker是无状态的,broker使用Zookeeper维护集群的状态。Leader的选举也由Zookeeper负责。
  • ZookeeperZookeeper负责维护和协调broker。当Kafka系统中新增了broker或者某个broker发生故障失效时,由ZooKeeper通知生产者和消费者。生产者和消费者依据Zookeeper的broker状态信息与broker协调数据的发布和订阅任务。
  • Producer生产者将数据推送到broker上,当集群中出现新的broker时,全部的生产者将会搜寻到这个新的broker,并自动将数据发送到这个broker上。
  • Consumer由于Kafka的broker是无状态的,因此consumer必须使用partition offset来记录消费了多少数据。若是一个consumer指定了一个topic的offset,意味着该consumer已经消费了该offset以前的全部数据。consumer能够经过指定offset,从topic的指定位置开始消费数据。consumer的offset存储在Zookeeper中。

Kafka的Producer

  • Producer客户端负责消息的分发web

  • kafka集群中的任何一个broker均可以向producer提供metadata信息,这些metadata中包含"集群中存活的servers列表"、"partitions leader列表"等信息;算法

  • 当producer获取到metadata信息以后, producer将会和Topic下全部partition leader保持socket链接;数据库

  • 消息由producer直接经过socket发送到broker,中间不会通过任何"路由层"。事实上,消息被路由到哪一个partition上由producer客户端决定,好比能够采用"random""key-hash""轮询"等。

    若是一个topic中有多个partitions,那么在producer端实现"消息均衡分发"是必要的。

  • 在producer端的配置文件中,开发者能够指定partition路由的方式。

  • Producer消息发送的应答机制

    设置发送数据是否须要服务端的反馈,有三个值0,1,-1

    0: producer不会等待broker发送ack

    1: 当leader接收到消息以后发送ack

    -1: 当全部的follower都同步消息成功后发送ack

    request.required.acks=0

Kafka的Consumer

kafka只支持Topic

  • 每一个group中能够有多个consumer,每一个consumer属于一个consumer group;一般状况下,一个group中会包含多个consumer,这样不只能够提升topic中消息的并发消费能力,并且还能提升"故障容错"性,若是group中的某个consumer失效那么其消费的partitions将会有其余consumer自动接管。

  • 对于Topic中的一条特定的消息,只会被订阅此Topic的每一个group中的其中一个consumer消费,此消息不会发送给一个group的多个consumer;那么一个group中全部的consumer将会交错的消费整个Topic,每一个group中consumer消息消费互相独立,咱们能够认为一个group是一个"订阅"者。

  • 在kafka中,一个partition中的消息只会被group中的一个consumer消费(同一时刻);
    一个Topic中的每一个partions,只会被一个"订阅者"中的一个consumer消费,不过一个consumer能够同时消费多个partitions中的消息。

  • kafka的设计原理决定,对于一个topic,同一个group中不能有多于partitions个数的consumer同时消费,不然将意味着某些consumer将没法获得消息。

kafka只能保证一个partition中的消息被某个consumer消费时是顺序的;事实上,从Topic角度来讲,当有多个partitions时,消息仍不是全局有序的。

注意:分布式系统中,对于同一个消费者建议加分布式锁,避免重复消费

2. Kafka的特性(优点)
- 高性能:kafka每秒能够处理几十万条消息,它的延迟最低只有几毫秒,每一个topic能够分多个partition, consumer group 对partition进行consume操做。Kafka在数据发布和订阅过程当中都能保证数据的高吞吐量。即使在TB级数据存储的状况下,仍然能保证稳定的性能。即高吞吐量,低延迟!
- 可扩展性:kafka集群支持热扩展
- 持久性、可靠性:消息被持久化到本地磁盘,而且支持数据备份防止数据丢失。Kafka是一个具备分区机制、副本机制和容错机制的分布式消息系统
- 容错性:容许集群中节点失败(若副本数量为n,则容许n-1个节点失败)
- 高并发:支持数千个客户端同时读写

3. kafka有四个核心API
- 应用程序使用producer API发布消息到1个或多个topic中。
- 应用程序使用consumer API来订阅一个或多个topic,并处理产生的消息。
- 应用程序使用streams API充当一个流处理器,从1个或多个topic消费输入流,并产生一个输出流到1个或多个topic,有效地将输入流转换到输出流。
- connector API容许构建或运行可重复使用的生产者或消费者,将topic连接到现有的应用程序或数据系统。

4. kafka基本原理
一般来说,消息模型能够分为两种:队列和发布-订阅式。队列的处理方式是一组消费者从服务器读取消息,一条消息只有其中的一个消费者来处理。在发布-订阅模型中,消息被广播给全部的消费者,接收到消息的消费者均可以处理此消息。Kafka为这两种模型提供了单一的消费者抽象模型: 消费者组(consumer group)。消费者用一个消费者组名标记本身。

一个发布在Topic上消息被分发给此消费者组中的一个消费者。假如全部的消费者都在一个组中,那么这就变成了queue模型。假如全部的消费者都在不一样的组中,那么就彻底变成了发布-订阅模型。更通用的, 咱们能够建立一些消费者组做为逻辑上的订阅者。每一个组包含数目不等的消费者,一个组内多个消费者能够用来扩展性能和容错。

而且,kafka可以保证生产者发送到一个特定的Topic的分区上,消息将会按照它们发送的顺序依次加入,也就是说,若是一个消息M1和M2使用相同的producer发送,M1先发送,那么M1将比M2的offset低,而且优先的出如今日志中。消费者收到的消息也是此顺序。若是一个Topic配置了复制因子(replication facto)为N,那么能够容许N-1服务器宕机而不丢失任何已经提交(committed)的消息。此特性说明kafka有比传统的消息系统更强的顺序保证。可是,相同的消费者组中不能有比分区更多的消费者,不然多出的消费者一直处于空等待,不会收到消息。

5. kafka应用场景

- 日志收集:一个公司能够用Kafka能够收集各类服务的log,经过kafka以统一接口服务的方式开放给各类consumer,例如hadoop、Hbase、Solr等。
- 消息系统:解耦和生产者和消费者、缓存消息等。
- 用户活动跟踪:Kafka常常被用来记录web用户或者app用户的各类活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,而后订阅者经过订阅这些topic来作实时的监控分析,或者装载到hadoop、数据仓库中作离线分析和挖掘。
- 运营指标:Kafka也常常用来记录运营监控数据。包括收集各类分布式应用的数据,生产各类操做的集中反馈,好比报警和报告。
- 流式处理:好比spark streaming和storm
- 事件源
- 构建实时的流数据管道,可靠地获取系统和应用程序之间的数据。
- 构建实时流的应用程序,对数据流进行转换或反应。

6. 主题和日志 (Topic和Log)

每个分区(partition)都是一个顺序的、不可变的消息队列,而且能够持续的添加。分区中的消息都被分了一个序列号,称之为偏移量(offset),在每一个分区中此偏移量都是惟一的。Kafka集群保持全部的消息,直到它们过时,不管消息是否被消费了。实际上消费者所持有的仅有的元数据就是这个偏移量,也就是消费者在这个log中的位置。 这个偏移量由消费者控制:正常状况当消费者消费消息的时候,偏移量也线性的的增长。可是实际偏移量由消费者控制,消费者能够将偏移量重置为更老的一个偏移量,从新读取消息。 能够看到这种设计对消费者来讲操做自如, 一个消费者的操做不会影响其它消费者对此log的处理。 再说说分区。Kafka中采用分区的设计有几个目的。一是能够处理更多的消息,不受单台服务器的限制。Topic拥有多个分区意味着它能够不受限的处理更多的数据。第二,分区能够做为并行处理的单元,稍后会谈到这一点。

7. 分布式(Distribution)

 Log的分区被分布到集群中的多个服务器上。每一个服务器处理它分到的分区。根据配置每一个分区还能够复制到其它服务器做为备份容错。 每一个分区有一个leader,零或多个follower。Leader处理此分区的全部的读写请求,而follower被动的复制数据。若是leader宕机,其它的一个follower会被推举为新的leader。 一台服务器可能同时是一个分区的leader,另外一个分区的follower。 这样能够平衡负载,避免全部的请求都只让一台或者某几台服务器处理。

8. Kakfa Broker Leader的选举
Kakfa Broker集群受Zookeeper管理。全部的Kafka Broker节点一块儿去Zookeeper上注册一个临时节点,由于只有一个Kafka Broker会注册成功,其余的都会失败,因此这个成功在Zookeeper上注册临时节点的这个Kafka Broker会成为Kafka Broker Controller,其余的Kafka broker叫Kafka Broker follower。(这个过程叫Controller在ZooKeeper注册Watch)。这个Controller会监听其余的Kafka Broker的全部信息,若是这个kafka broker controller宕机了,在zookeeper上面的那个临时节点就会消失,此时全部的kafka broker又会一块儿去Zookeeper上注册一个临时节点,由于只有一个Kafka Broker会注册成功,其余的都会失败,因此这个成功在Zookeeper上注册临时节点的这个Kafka Broker会成为Kafka Broker Controller,其余的Kafka broker叫Kafka Broker follower。例如:一旦有一个broker宕机了,这个kafka broker controller会读取该宕机broker上全部的partition在zookeeper上的状态,并选取ISR列表中的一个replica做为partition leader(若是ISR列表中的replica全挂,选一个幸存的replica做为leader; 若是该partition的全部的replica都宕机了,则将新的leader设置为-1,等待恢复,等待ISR中的任一个Replica"活"过来,而且选它做为Leader;或选择第一个"活"过来的Replica(不必定是ISR中的)做为Leader),这个broker宕机的事情,kafka controller也会通知zookeeper,zookeeper就会通知其余的kafka broker。

顺便说下曾经发生过的一个bug:TalkingData使用Kafka0.8.1的时候,kafka controller在Zookeeper上注册成功后,它和Zookeeper通讯的timeout时间是6s,也就是若是kafka controller若是有6s中没有和Zookeeper作心跳,那么Zookeeper就认为这个kafka controller已经死了,就会在Zookeeper上把这个临时节点删掉,那么其余Kafka就会认为controller已经没了,就会再次抢着注册临时节点,注册成功的那个kafka broker成为controller,而后,以前的那个kafka controller就须要各类shut down去关闭各类节点和事件的监听。可是当kafka的读写流量都很是巨大的时候,TalkingData的一个bug是,因为网络等缘由,kafka controller和Zookeeper有6s中没有通讯,因而从新选举出了一个新的kafka controller,可是原来的controller在shut down的时候老是不成功,这个时候producer进来的message因为Kafka集群中存在两个kafka controller而没法落地。致使数据淤积。
这里曾经还有一个bug,TalkingData使用Kafka0.8.1的时候,当ack=0的时候,表示producer发送出去message,只要对应的kafka broker topic partition leader接收到的这条message,producer就返回成功,无论partition leader 是否真的成功把message真正存到kafka。当ack=1的时候,表示producer发送出去message,同步的把message存到对应topic的partition的leader上,而后producer就返回成功,partition leader异步的把message同步到其余partition replica上。当ack=all或-1,表示producer发送出去message,同步的把message存到对应topic的partition的leader和对应的replica上以后,才返回成功。可是若是某个kafka controller 切换的时候,会致使partition leader的切换(老的 kafka controller上面的partition leader会选举到其余的kafka broker上),可是这样就会致使丢数据。

9. Topic & Partition
Topic至关于传统消息系统MQ中的一个队列queue,producer端发送的message必须指定是发送到哪一个topic,可是不须要指定topic下的哪一个partition,由于kafka会把收到的message进行load balance,均匀的分布在这个topic下的不一样的partition上( hash(message) % [broker数量] )。物理上存储上,这个topic会分红一个或多个partition,每一个partiton至关因而一个子queue。在物理结构上,每一个partition对应一个物理的目录(文件夹),文件夹命名是[topicname]_[partition]_[序号],一个topic能够有无数多的partition,根据业务需求和数据量来设置。在kafka配置文件中可随时更高num.partitions参数来配置更改topic的partition数量,在建立Topic时经过参数指定parittion数量。Topic建立以后经过Kafka提供的工具也能够修改partiton数量。
通常来讲,a)一个Topic的Partition数量大于等于Broker的数量,能够提升吞吐率;b)同一个Partition的Replica尽可能分散到不一样的机器,高可用。
当add a new partition的时候,partition里面的message不会从新进行分配,原来的partition里面的message数据不会变,新加的这个partition刚开始是空的,随后进入这个topic的message就会从新参与全部partition的load balance

10. Partition Replica
每一个partition能够在其余的kafka broker节点上存副本,以便某个kafka broker节点宕机不会影响这个kafka集群。存replica副本的方式是按照kafka broker的顺序存。例若有5个kafka broker节点,某个topic有3个partition,每一个partition存2个副本,那么partition1存broker1,broker2,partition2存broker2,broker3。。。以此类推(replica副本数目不能大于kafka broker节点的数目,不然报错。这里的replica数其实就是partition的副本总数,其中包括一个leader,其余的就是copy副本)。这样若是某个broker宕机,其实整个kafka内数据依然是完整的。可是,replica副本数越高,系统虽然越稳定,可是回来带资源和性能上的降低;replica副本少的话,也会形成系统丢数据的风险。
a)怎样传送消息:producer先把message发送到partition leader,再由leader发送给其余partition follower。(若是让producer发送给每一个replica那就太慢了)
b) 在向Producer发送ACK前须要保证有多少个Replica已经收到该消息:根据ack配的个数而定
c) 怎样处理某个Replica不工做的状况:若是这个部工做的partition replica不在ack列表中,就是producer在发送消息到partition leader上,partition leader向partition follower发送message没有响应而已,这个不会影响整个系统,也不会有什么问题。若是这个不工做的partition replica在ack列表中的话,producer发送的message的时候会等待这个不工做的partition replca写message成功,可是会等到time out,而后返回失败由于某个ack列表中的partition replica没有响应,此时kafka会自动的把这个部工做的partition replica从ack列表中移除,之后的producer发送message的时候就不会有这个ack列表下的这个部工做的partition replica了。
d)怎样处理Failed Replica恢复回来的状况:若是这个partition replica以前不在ack列表中,那么启动后从新受Zookeeper管理便可,以后producer发送message的时候,partition leader会继续发送message到这个partition follower上。若是这个partition replica以前在ack列表中,此时重启后,须要把这个partition replica再手动加到ack列表中。(ack列表是手动添加的,出现某个部工做的partition replica的时候自动从ack列表中移除的)

11. Partition leader与follower
partition也有leader和follower之分。leader是主partition,producer写kafka的时候先写partition leader,再由partition leader push给其余的partition follower。partition leader与follower的信息受Zookeeper控制,一旦partition leader所在的broker节点宕机,zookeeper会冲其余的broker的partition follower上选择follower变为parition leader。

12. 消息投递可靠性
一个消息如何算投递成功,Kafka提供了三种模式:
- 第一种是啥都无论,发送出去就看成成功,这种状况固然不能保证消息成功投递到broker;
- 第二种是Master-Slave模型,只有当Master和全部Slave都接收到消息时,才算投递成功,这种模型提供了最高的投递可靠性,可是损伤了性能;
- 第三种模型,即只要Master确认收到消息就算投递成功;实际使用时,根据应用特性选择,绝大多数状况下都会中和可靠性和性能选择第三种模型
消息在broker上的可靠性,由于消息会持久化到磁盘上,因此若是正常stop一个broker,其上的数据不会丢失;可是若是不正常stop,可能会使存在页面缓存来不及写入磁盘的消息丢失,这能够经过配置flush页面缓存的周期、阈值缓解,可是一样会频繁的写磁盘会影响性能,又是一个选择题,根据实际状况配置。
消息消费的可靠性,Kafka提供的是“At least once”模型,由于消息的读取进度由offset提供,offset能够由消费者本身维护也能够维护在zookeeper里,可是当消息消费后consumer挂掉,offset没有即时写回,就有可能发生重复读的状况,这种状况一样能够经过调整commit offset周期、阈值缓解,甚至消费者本身把消费和commit offset作成一个事务解决,可是若是你的应用不在意重复消费,那就干脆不要解决,以换取最大的性能

13. kafka相关特性说明

- message状态:在Kafka中,消息的状态被保存在consumer中,broker不会关心哪一个消息被消费了被谁消费了,只记录一个offset值(指向partition中下一个要被消费的消息位置),这就意味着若是consumer处理很差的话,broker上的一个消息可能会被消费屡次。
- message持久化:Kafka中会把消息持久化到本地文件系统中,而且保持o(1)极高的效率。咱们众所周知IO读取是很是耗资源的性能也是最慢的,这就是为了数据库的瓶颈常常在IO上,须要换SSD硬盘的缘由。可是Kafka做为吞吐量极高的MQ,却能够很是高效的message持久化到文件。这是由于Kafka是顺序写入o(1)的时间复杂度,速度很是快。也是高吞吐量的缘由。因为message的写入持久化是顺序写入的,所以message在被消费的时候也是按顺序被消费的,保证partition的message是顺序消费的。通常的机器,单机每秒100k条数据。
- message有效期:Kafka会长久保留其中的消息,以便consumer能够屡次消费,固然其中不少细节是可配置的。
- Produer : Producer向Topic发送message,不须要指定partition,直接发送就行了。kafka经过partition ack来控制是否发送成功并把信息返回给producer,producer能够有任意多的thread,这些kafka服务器端是不care的。Producer端的delivery guarantee默认是At least once的。也能够设置Producer异步发送实现At most once。Producer能够用主键幂等性实现Exactly once
- Kafka高吞吐量: Kafka的高吞吐量体如今读写上,分布式并发的读和写都很是快,写的性能体如今以o(1)的时间复杂度进行顺序写入。读的性能体如今以o(1)的时间复杂度进行顺序读取, 对topic进行partition分区,consume group中的consume线程能够以很高能性能进行顺序读。
- Kafka delivery guarantee(message传送保证):(1)At most once消息可能会丢,绝对不会重复传输;(2)At least once 消息绝对不会丢,可是可能会重复传输;(3)Exactly once每条信息确定会被传输一次且仅传输一次,这是用户想要的。
- 批量发送:Kafka支持以消息集合为单位进行批量发送,以提升push效率。
- push-and-pull : Kafka中的Producer和consumer采用的是push-and-pull模式,即Producer只管向broker push消息,consumer只管从broker pull消息,二者对消息的生产和消费是异步的。
- Kafka集群中broker之间的关系:不是主从关系,各个broker在集群中地位同样,咱们能够随意的增长或删除任何一个broker节点。
- 负载均衡方面: Kafka提供了一个 metadata API来管理broker之间的负载(对Kafka0.8.x而言,对于0.7.x主要靠zookeeper来实现负载均衡)。
- 同步异步:Producer采用异步push方式,极大提升Kafka系统的吞吐率(能够经过参数控制是采用同步仍是异步方式)。
- 分区机制partition:Kafka的broker端支持消息分区partition,Producer能够决定把消息发到哪一个partition,在一个partition 中message的顺序就是Producer发送消息的顺序,一个topic中能够有多个partition,具体partition的数量是可配置的。partition的概念使得kafka做为MQ能够横向扩展,吞吐量巨大。partition能够设置replica副本,replica副本存在不一样的kafka broker节点上,第一个partition是leader,其余的是follower,message先写到partition leader上,再由partition leader push到parition follower上。因此说kafka能够水平扩展,也就是扩展partition。
- 离线数据装载:Kafka因为对可拓展的数据持久化的支持,它也很是适合向Hadoop或者数据仓库中进行数据装载。
- 实时数据与离线数据:kafka既支持离线数据也支持实时数据,由于kafka的message持久化到文件,并能够设置有效期,所以能够把kafka做为一个高效的存储来使用,能够做为离线数据供后面的分析。固然做为分布式实时消息系统,大多数状况下仍是用于实时的数据处理的,可是当cosumer消费能力降低的时候能够经过message的持久化在淤积数据在kafka。
- 插件支持:如今很多活跃的社区已经开发出很多插件来拓展Kafka的功能,如用来配合Storm、Hadoop、flume相关的插件。
- 解耦: 至关于一个MQ,使得Producer和Consumer之间异步的操做,系统之间解耦。
- 冗余: replica有多个副本,保证一个broker node宕机后不会影响整个服务。
- 扩展性: broker节点能够水平扩展,partition也能够水平增长,partition replica也能够水平增长。
- 峰值: 在访问量剧增的状况下,kafka水平扩展, 应用仍然须要继续发挥做用。
- 可恢复性: 系统的一部分组件失效时,因为有partition的replica副本,不会影响到整个系统。
- 顺序保证性:因为kafka的producer的写message与consumer去读message都是顺序的读写,保证了高效的性能。
- 缓冲:因为producer那面可能业务很简单,然后端consumer业务会很复杂并有数据库的操做,所以确定是producer会比consumer处理速度快,若是没有kafka,producer直接调用consumer,那么就会形成整个系统的处理速度慢,加一层kafka做为MQ,能够起到缓冲的做用。
- 异步通讯:做为MQ,Producer与Consumer异步通讯。

                                             kafka部分名称解释                                                      

 

Kafka中发布订阅的对象是topic。咱们能够为每类数据建立一个topic,把向topic发布消息的客户端称做producer,从topic订阅消息的客户端称做consumer。Producers和consumers能够同时从多个topic读写数据。一个kafka集群由一个或多个broker服务器组成,它负责持久化和备份具体的kafka消息。
- Broker:Kafka节点,一个Kafka节点就是一个broker,多个broker能够组成一个Kafka集群。
- Topic:一类消息,消息存放的目录即主题,例如page view日志、click日志等均可以以topic的形式存在,Kafka集群可以同时负责多个topic的分发。
- Partition:topic物理上的分组,一个topic能够分为多个partition,每一个partition是一个有序的队列。
- Segment:partition物理上由多个segment组成,每一个Segment存着message信息。
- Producer : 生产message发送到topic。
- Consumer : 订阅topic消费message, consumer做为一个线程来消费。
- Consumer Group:一个Consumer Group包含多个consumer, 这个是预先在配置文件中配置好的。各个consumer(consumer 线程)能够组成一个组(Consumer group ),partition中的每一个message只能被组(Consumer group ) 中的一个consumer(consumer 线程 )消费,若是一个message能够被多个consumer(consumer 线程 ) 消费的话,那么这些consumer必须在不一样的组。Kafka不支持一个partition中的message由两个或两个以上的consumer thread来处理,即使是来自不一样的consumer group的也不行。它不能像AMQ那样能够多个BET做为consumer去处理message,这是由于多个BET去消费一个Queue中的数据的时候,因为要保证不能多个线程拿同一条message,因此就须要行级别悲观所(for update),这就致使了consume的性能降低,吞吐量不够。而kafka为了保证吞吐量,只容许一个consumer线程去访问一个partition。若是以为效率不高的时候,能够加partition的数量来横向扩展,那么再加新的consumer thread去消费。这样没有锁竞争,充分发挥了横向的扩展性,吞吐量极高。这也就造成了分布式消费的概念。

14. kafka一些原理概念
持久化
kafka使用文件存储消息(append only log),这就直接决定kafka在性能上严重依赖文件系统的自己特性.且不管任何OS下,对文件系统自己的优化是很是艰难的.文件缓存/直接内存映射等是经常使用的手段.由于kafka是对日志文件进行append操做,所以磁盘检索的开支是较小的;同时为了减小磁盘写入的次数,broker会将消息暂时buffer起来,当消息的个数(或尺寸)达到必定阀值时,再flush到磁盘,这样减小了磁盘IO调用的次数.对于kafka而言,较高性能的磁盘,将会带来更加直接的性能提高.

性能
除磁盘IO以外,咱们还须要考虑网络IO,这直接关系到kafka的吞吐量问题.kafka并无提供太多高超的技巧;对于producer端,能够将消息buffer起来,当消息的条数达到必定阀值时,批量发送给broker;对于consumer端也是同样,批量fetch多条消息.不过消息量的大小能够经过配置文件来指定.对于kafka broker端,彷佛有个sendfile系统调用能够潜在的提高网络IO的性能:将文件的数据映射到系统内存中,socket直接读取相应的内存区域便可,而无需进程再次copy和交换(这里涉及到"磁盘IO数据"/"内核内存"/"进程内存"/"网络缓冲区",多者之间的数据copy).
其实对于producer/consumer/broker三者而言,CPU的开支应该都不大,所以启用消息压缩机制是一个良好的策略;压缩须要消耗少许的CPU资源,不过对于kafka而言,网络IO更应该须要考虑.能够将任何在网络上传输的消息都通过压缩.kafka支持gzip/snappy等多种压缩方式.

负载均衡
kafka集群中的任何一个broker,均可以向producer提供metadata信息,这些metadata中包含"集群中存活的servers列表"/"partitions leader列表"等信息(请参看zookeeper中的节点信息). 当producer获取到metadata信息以后, producer将会和Topic下全部partition leader保持socket链接;消息由producer直接经过socket发送到broker,中间不会通过任何"路由层".
异步发送,将多条消息暂且在客户端buffer起来,并将他们批量发送到broker;小数据IO太多,会拖慢总体的网络延迟,批量延迟发送事实上提高了网络效率;不过这也有必定的隐患,好比当producer失效时,那些还没有发送的消息将会丢失。

Topic模型
其余JMS实现,消息消费的位置是有prodiver保留,以便避免重复发送消息或者将没有消费成功的消息重发等,同时还要控制消息的状态.这就要求JMS broker须要太多额外的工做.在kafka中,partition中的消息只有一个consumer在消费,且不存在消息状态的控制,也没有复杂的消息确认机制,可见kafka broker端是至关轻量级的.当消息被consumer接收以后,consumer能够在本地保存最后消息的offset,并间歇性的向zookeeper注册offset.因而可知,consumer客户端也很轻量级。
kafka中consumer负责维护消息的消费记录,而broker则不关心这些,这种设计不只提升了consumer端的灵活性,也适度的减轻了broker端设计的复杂度;这是和众多JMS prodiver的区别.此外,kafka中消息ACK的设计也和JMS有很大不一样,kafka中的消息是批量(一般以消息的条数或者chunk的尺寸为单位)发送给consumer,当消息消费成功后,向zookeeper提交消息的offset,而不会向broker交付ACK.或许你已经意识到,这种"宽松"的设计,将会有"丢失"消息/"消息重发"的危险.

消息传输一致
Kafka提供3种消息传输一致性语义:最多1次,最少1次,刚好1次。
最少1次:可能会重传数据,有可能出现数据被重复处理的状况;
最多1次:可能会出现数据丢失状况;
刚好1次:并非指真正只传输1次,只不过有一个机制。确保不会出现“数据被重复处理”和“数据丢失”的状况。

at most once: 消费者fetch消息,而后保存offset,而后处理消息;当client保存offset以后,可是在消息处理过程当中consumer进程失效(crash),致使部分消息未能继续处理.那么此后可能其余consumer会接管,可是由于offset已经提早保存,那么新的consumer将不能fetch到offset以前的消息(尽管它们尚没有被处理),这就是"at most once".
at least once: 消费者fetch消息,而后处理消息,而后保存offset.若是消息处理成功以后,可是在保存offset阶段zookeeper异常或者consumer失效,致使保存offset操做未能执行成功,这就致使接下来再次fetch时可能得到上次已经处理过的消息,这就是"at least once".
"Kafka Cluster"到消费者的场景中能够采起如下方案来获得“刚好1次”的一致性语义:
最少1次+消费者的输出中额外增长已处理消息最大编号:因为已处理消息最大编号的存在,不会出现重复处理消息的状况。

副本
kafka中,replication策略是基于partition,而不是topic;kafka将每一个partition数据复制到多个server上,任何一个partition有一个leader和多个follower(能够没有);备份的个数能够经过broker配置文件来设定。leader处理全部的read-write请求,follower须要和leader保持同步.Follower就像一个"consumer",消费消息并保存在本地日志中;leader负责跟踪全部的follower状态,若是follower"落后"太多或者失效,leader将会把它从replicas同步列表中删除.当全部的follower都将一条消息保存成功,此消息才被认为是"committed",那么此时consumer才能消费它,这种同步策略,就要求follower和leader之间必须具备良好的网络环境.即便只有一个replicas实例存活,仍然能够保证消息的正常发送和接收,只要zookeeper集群存活便可.
选择follower时须要兼顾一个问题,就是新leader server上所已经承载的partition leader的个数,若是一个server上有过多的partition leader,意味着此server将承受着更多的IO压力.在选举新leader,须要考虑到"负载均衡",partition leader较少的broker将会更有可能成为新的leader.

log
每一个log entry格式为"4个字节的数字N表示消息的长度" + "N个字节的消息内容";每一个日志都有一个offset来惟一的标记一条消息,offset的值为8个字节的数字,表示此消息在此partition中所处的起始位置..每一个partition在物理存储层面,有多个log file组成(称为segment).segment file的命名为"最小offset".kafka.例如"00000000000.kafka";其中"最小offset"表示此segment中起始消息的offset.
获取消息时,须要指定offset和最大chunk尺寸,offset用来表示消息的起始位置,chunk size用来表示最大获取消息的总长度(间接的表示消息的条数).根据offset,能够找到此消息所在segment文件,而后根据segment的最小offset取差值,获得它在file中的相对位置,直接读取输出便可.

分布式
kafka使用zookeeper来存储一些meta信息,并使用了zookeeper watch机制来发现meta信息的变动并做出相应的动做(好比consumer失效,触发负载均衡等)
Broker node registry: 当一个kafka broker启动后,首先会向zookeeper注册本身的节点信息(临时znode),同时当broker和zookeeper断开链接时,此znode也会被删除.
Broker Topic Registry: 当一个broker启动时,会向zookeeper注册本身持有的topic和partitions信息,仍然是一个临时znode.
Consumer and Consumer group: 每一个consumer客户端被建立时,会向zookeeper注册本身的信息;此做用主要是为了"负载均衡".一个group中的多个consumer能够交错的消费一个topic的全部partitions;简而言之,保证此topic的全部partitions都能被此group所消费,且消费时为了性能考虑,让partition相对均衡的分散到每一个consumer上.
Consumer id Registry: 每一个consumer都有一个惟一的ID(host:uuid,能够经过配置文件指定,也能够由系统生成),此id用来标记消费者信息.
Consumer offset Tracking: 用来跟踪每一个consumer目前所消费的partition中最大的offset.此znode为持久节点,能够看出offset跟group_id有关,以代表当group中一个消费者失效,其余consumer能够继续消费.
Partition Owner registry: 用来标记partition正在被哪一个consumer消费.临时znode。此节点表达了"一个partition"只能被group下一个consumer消费,同时当group下某个consumer失效,那么将会触发负载均衡(即:让partitions在多个consumer间均衡消费,接管那些"游离"的partitions)
当consumer启动时,所触发的操做:
A) 首先进行"Consumer id Registry";
B) 而后在"Consumer id Registry"节点下注册一个watch用来监听当前group中其余consumer的"leave"和"join";只要此znode path下节点列表变动,都会触发此group下consumer的负载均衡.(好比一个consumer失效,那么其余consumer接管partitions).
C) 在"Broker id registry"节点下,注册一个watch用来监听broker的存活状况;若是broker列表变动,将会触发全部的groups下的consumer从新balance.

总结:
1) Producer端使用zookeeper用来"发现"broker列表,以及和Topic下每一个partition leader创建socket链接并发送消息.
2) Broker端使用zookeeper用来注册broker信息,已经监测partition leader存活性.
3) Consumer端使用zookeeper用来注册consumer信息,其中包括consumer消费的partition列表等,同时也用来发现broker列表,并和partition leader创建socket链接,并获取消息。

Leader的选择
Kafka的核心是日志文件,日志文件在集群中的同步是分布式数据系统最基础的要素。
若是leaders永远不会down的话咱们就不须要followers了!一旦leader down掉了,须要在followers中选择一个新的leader.可是followers自己有可能延时过久或者crash,因此必须选择高质量的follower做为leader.必须保证,一旦一个消息被提交了,可是leader down掉了,新选出的leader必须能够提供这条消息。大部分的分布式系统采用了多数投票法则选择新的leader,对于多数投票法则,就是根据全部副本节点的情况动态的选择最适合的做为leader.Kafka并非使用这种方法。
Kafka动态维护了一个同步状态的副本的集合(a set of in-sync replicas),简称ISR,在这个集合中的节点都是和leader保持高度一致的,任何一条消息必须被这个集合中的每一个节点读取并追加到日志中了,才回通知外部这个消息已经被提交了。所以这个集合中的任何一个节点随时均可以被选为leader.ISR在ZooKeeper中维护。ISR中有f+1个节点,就能够容许在f个节点down掉的状况下不会丢失消息并正常提供服。ISR的成员是动态的,若是一个节点被淘汰了,当它从新达到“同步中”的状态时,他能够从新加入ISR.这种leader的选择方式是很是快速的,适合kafka的应用场景。
一个邪恶的想法:若是全部节点都down掉了怎么办?Kafka对于数据不会丢失的保证,是基于至少一个节点是存活的,一旦全部节点都down了,这个就不能保证了。
实际应用中,当全部的副本都down掉时,必须及时做出反应。能够有如下两种选择:
1. 等待ISR中的任何一个节点恢复并担任leader。
2. 选择全部节点中(不仅是ISR)第一个恢复的节点做为leader.
这是一个在可用性和连续性之间的权衡。若是等待ISR中的节点恢复,一旦ISR中的节点起不起来或者数据都是了,那集群就永远恢复不了了。若是等待ISR意外的节点恢复,这个节点的数据就会被做为线上数据,有可能和真实的数据有所出入,由于有些数据它可能还没同步到。Kafka目前选择了第二种策略,在将来的版本中将使这个策略的选择可配置,能够根据场景灵活的选择。
这种窘境不仅Kafka会遇到,几乎全部的分布式数据系统都会遇到。

副本管理
以上仅仅以一个topic一个分区为例子进行了讨论,但实际上一个Kafka将会管理成千上万的topic分区.Kafka尽可能的使全部分区均匀的分布到集群全部的节点上而不是集中在某些节点上,另外主从关系也尽可能均衡这样每一个几点都会担任必定比例的分区的leader.
优化leader的选择过程也是很重要的,它决定了系统发生故障时的空窗期有多久。Kafka选择一个节点做为“controller”,当发现有节点down掉的时候它负责在游泳分区的全部节点中选择新的leader,这使得Kafka能够批量的高效的管理全部分区节点的主从关系。若是controller down掉了,活着的节点中的一个会备切换为新的controller.

Leader与副本同步
对于某个分区来讲,保存正分区的"broker"为该分区的"leader",保存备份分区的"broker"为该分区的"follower"。备份分区会彻底复制正分区的消息,包括消息的编号等附加属性值。为了保持正分区和备份分区的内容一致,Kafka采起的方案是在保存备份分区的"broker"上开启一个消费者进程进行消费,从而使得正分区的内容与备份分区的内容保持一致。通常状况下,一个分区有一个“正分区”和零到多个“备份分区”。能够配置“正分区+备份分区”的总数量,关于这个配置,不一样主题能够有不一样的配置值。注意,生产者,消费者只与保存正分区的"leader"进行通讯。

Kafka容许topic的分区拥有若干副本,这个数量是能够配置的,你能够为每一个topic配置副本的数量。Kafka会自动在每一个副本上备份数据,因此当一个节点down掉时数据依然是可用的。
Kafka的副本功能不是必须的,你能够配置只有一个副本,这样其实就至关于只有一份数据。
建立副本的单位是topic的分区,每一个分区都有一个leader和零或多个followers.全部的读写操做都由leader处理,通常分区的数量都比broker的数量多的多,各分区的leader均匀的分布在brokers中。全部的followers都复制leader的日志,日志中的消息和顺序都和leader中的一致。followers向普通的consumer那样从leader那里拉取消息并保存在本身的日志文件中。
许多分布式的消息系统自动的处理失败的请求,它们对一个节点是否着(alive)”有着清晰的定义。Kafka判断一个节点是否活着有两个条件:
1. 节点必须能够维护和ZooKeeper的链接,Zookeeper经过心跳机制检查每一个节点的链接。
2. 若是节点是个follower,他必须能及时的同步leader的写操做,延时不能过久。
符合以上条件的节点准确的说应该是“同步中的(in sync)”,而不是模糊的说是“活着的”或是“失败的”。Leader会追踪全部“同步中”的节点,一旦一个down掉了,或是卡住了,或是延时过久,leader就会把它移除。至于延时多久算是“过久”,是由参数replica.lag.max.messages决定的,怎样算是卡住了,怎是由参数replica.lag.time.max.ms决定的。
只有当消息被全部的副本加入到日志中时,才算是“committed”,只有committed的消息才会发送给consumer,这样就不用担忧一旦leader down掉了消息会丢失。Producer也能够选择是否等待消息被提交的通知,这个是由参数acks决定的。
Kafka保证只要有一个“同步中”的节点,“committed”的消息就不会丢失。

===========================================================================
2、kafka集群环境部署记录

1)服务器信息

ip地址            主机名          安装软件
192.168.10.202    kafka01         zookeeper、kafka
192.168.10.203    kafka02         zookeeper、kafka
192.168.10.205    kafka03         zookeeper、kafka
192.168.10.206    kafka-manager   kafka-manager
 
4台机器关闭iptables和selinux
[root@kafka01 ~]# /etc/init.d/iptables stop
[root@kafka01 ~]# vim /etc/sysconfig/selinux
......
SELINUX=disabled
[root@kafka01 ~]# setenforce 0
[root@kafka01 ~]# getenforce
Permissive
 
4台机器作hosts绑定
[root@kafka01 ~]# vim /etc/hosts
......
192.168.10.202    kafka01    
192.168.10.203    kafka02
192.168.10.205    kafka03
192.168.10.206    kafka-manager

2)jdk安装(四台机器都要操做,安装1.7以上版本)

将jdk-8u131-linux-x64.rpm下载到/opt目录下
下载地址:https://pan.baidu.com/s/1pLaAjPp
提取密码:x27s
 
[root@kafka01 ~]# cd /usr/local/src/
[root@kafka01 src]# ll jdk-8u131-linux-x64.rpm
-rw-r--r--. 1 root root 169983496 Sep 28  2017 jdk-8u131-linux-x64.rpm
[root@kafka01 src]# rpm -ivh jdk-8u131-linux-x64.rpm
 
[root@kafka01 src]# vim /etc/profile
......
JAVA_HOME=/usr/java/jdk1.8.0_131
JAVA_BIN=/usr/java/jdk1.8.0_131/bin
PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/root/bin:/bin:/sbin/
CLASSPATH=.:/lib/dt.jar:/lib/tools.jar
export JAVA_HOME JAVA_BIN PATH CLASSPATH
 
[root@kafka01 src]# source /etc/profile
[root@kafka01 src]# java -version
java version "1.8.0_131"
Java(TM) SE Runtime Environment (build 1.8.0_131-b11)
Java HotSpot(TM) 64-Bit Server VM (build 25.131-b11, mixed mode)

3)安装及配置kafka(192.168.10.20二、192.168.10.20三、192.168.10.205三台机器以下一样操做)

1)安装三个节点的zookeeper(zookeeper集群部署能够参考:http://www.cnblogs.com/kevingrace/p/7879390.html)
[root@kafka01 ~]# cd /usr/local/src/
[root@kafka01 src]# wget http://apache.forsale.plus/zookeeper/zookeeper-3.4.10/zookeeper-3.4.10.tar.gz
[root@kafka01 src]# tar -zvxf zookeeper-3.4.10.tar.gz
[root@kafka01 src]# mkdir /data
[root@kafka01 src]# mv zookeeper-3.4.10 /data/zk

修改三个节点的zookeeper的配置文件,内容以下所示:
[root@kafka01 src]# mkdir -p /data/zk/data
[root@kafka01 src]# cp /data/zk/conf/zoo_sample.cfg /data/zk/conf/zoo_sample.cfg.bak
[root@kafka01 src]# cp /data/zk/conf/zoo_sample.cfg /data/zk/conf/zoo.cfg
[root@kafka01 src]# vim /data/zk/conf/zoo.cfg           #清空以前的内容,配置成下面内容
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/data/zk/data/zookeeper
dataLogDir=/data/zk/data/logs
clientPort=2181
maxClientCnxns=60
autopurge.snapRetainCount=3
autopurge.purgeInterval=1
server.1=192.168.10.202:2888:3888
server.2=192.168.10.203:2888:3888
server.3=192.168.10.205:2888:3888

===============
配置参数说明:
server.id=host:port:port:表示了不一样的zookeeper服务器的自身标识,做为集群的一部分,每一台服务器应该知道其余服务器的信息。
用户能够从"server.id=host:port:port" 中读取到相关信息。
在服务器的data(dataDir参数所指定的目录)下建立一个文件名为myid的文件,这个文件的内容只有一行,指定的是自身的id值。
好比,服务器"1"应该在myid文件中写入"1"。这个id必须在集群环境中服务器标识中是惟一的,且大小在1~255之间。
这同样配置中,zoo1表明第一台服务器的IP地址。第一个端口号(port)是从follower链接到leader机器的端口,第二个端口是用来进行leader选举时所用的端口。
因此,在集群配置过程当中有三个很是重要的端口:clientPort=218一、port:288八、port:3888。
===============

注意:若是想更换日志输出位置,除了在zoo.cfg加入"dataLogDir=/data/zk/data/logs"外,还须要修改zkServer.sh文件,大概修改方式地方在
125行左右,内容以下:
[root@kafka01 src]# cp /data/zk/bin/zkServer.sh /data/zk/bin/zkServer.sh.bak
[root@kafka01 src]# vim /data/zk/bin/zkServer.sh
.......
125 ZOO_LOG_DIR="$($GREP "^[[:space:]]*dataLogDir" "$ZOOCFG" | sed -e 's/.*=//')"     #添加这一行
126 if [ ! -w "$ZOO_LOG_DIR" ] ; then
127 mkdir -p "$ZOO_LOG_DIR"
128 fi

[root@kafka01 src]# diff /data/zk/bin/zkServer.sh /data/zk/bin/zkServer.sh.bak
125d124
< ZOO_LOG_DIR="$($GREP "^[[:space:]]*dataLogDir" "$ZOOCFG" | sed -e 's/.*=//')"

在启动zookeeper服务以前,还须要分别在三个zookeeper节点机器上建立myid,方式以下:
[root@kafka01 src]# mkdir /data/zk/data/zookeeper/
[root@kafka01 src]# echo 1 >  /data/zk/data/zookeeper/myid

=================================================================
另外两个节点的myid分别为二、3(注意这三个节点机器的myid决不能同样,配置文件等其余都是同样配置)
[root@kafka02 src]# mkdir /data/zk/data/zookeeper
[root@kafka02 src]# echo 2 >  /data/zk/data/zookeeper/myid

[root@kafka03 src]# mkdir /data/zk/data/zookeeper
[root@kafka03 src]# echo 3 >  /data/zk/data/zookeeper/myid
=================================================================

启动三个节点的zookeeper服务
[root@kafka01 src]# /data/zk/bin/zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /data/zk/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[root@kafka01 src]# ps -ef|grep zookeeper
root     25512     1  0 11:49 pts/0    00:00:00 /usr/java/jdk1.8.0_131/bin/java -Dzookeeper.log.dir=/data/zk/data/logs -Dzookeeper.root.logger=INFO,CONSOLE -cp /data/zk/bin/../build/classes:/data/zk/bin/../build/lib/*.jar:/data/zk/bin/../lib/slf4j-log4j12-1.6.1.jar:/data/zk/bin/../lib/slf4j-api-1.6.1.jar:/data/zk/bin/../lib/netty-3.10.5.Final.jar:/data/zk/bin/../lib/log4j-1.2.16.jar:/data/zk/bin/../lib/jline-0.9.94.jar:/data/zk/bin/../zookeeper-3.4.10.jar:/data/zk/bin/../src/java/lib/*.jar:/data/zk/bin/../conf:.:/lib/dt.jar:/lib/tools.jar -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.local.only=false org.apache.zookeeper.server.quorum.QuorumPeerMain /data/zk/bin/../conf/zoo.cfg
root     25555 24445  0 11:51 pts/0    00:00:00 grep zookeeper
[root@kafka01 src]# lsof -i:2181
COMMAND   PID USER   FD   TYPE  DEVICE SIZE/OFF NODE NAME
java    25512 root   23u  IPv6 8293793      0t0  TCP *:eforward (LISTEN)

查看三个节点的zookeeper角色
[root@kafka01 src]# /data/zk/bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /data/zk/bin/../conf/zoo.cfg
Mode: follower

[root@kafka02 src]# /data/zk/bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /data/zk/bin/../conf/zoo.cfg
Mode: leader

[root@kafka03 src]# /data/zk/bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /data/zk/bin/../conf/zoo.cfg
Mode: follower

—————————————————————————————————————————————————————————————————————————————————————————————
2)安装kafka(三个节点一样操做)
下载地址:http://kafka.apache.org/downloads.html
[root@kafka01 ~]# cd /usr/local/src/
[root@kafka01 src]# wget http://mirrors.shu.edu.cn/apache/kafka/1.1.0/kafka_2.11-1.1.0.tgz
[root@kafka01 src]# tar -zvxf kafka_2.11-1.1.0.tgz
[root@kafka01 src]# mv kafka_2.11-1.1.0 /data/kafka

进入kafka下面的config目录,修改配置文件server.properties:
[root@kafka01 src]# cp /data/kafka/config/server.properties /data/kafka/config/server.properties.bak
[root@kafka01 src]# vim /data/kafka/config/server.properties
broker.id=0
delete.topic.enable=true
listeners=PLAINTEXT://192.168.10.202:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka/data
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.flush.interval.messages=10000
log.flush.interval.ms=1000
log.retention.hours=168
log.retention.bytes=1073741824
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.10.202:2181,192.168.10.203:2181,192.168.10.205:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0

其余两个节点的server.properties只须要修改下面两行,其余配置都同样
[root@kafka02 src]# vim /data/kafka/config/server.properties 
[root@kafka02 src]# cat /data/kafka/config/server.properties
broker.id=1
......
listeners=PLAINTEXT://192.168.10.203:9092
.......

[root@kafka03 src]# vim /data/kafka/config/server.properties 
[root@kafka03 src]# cat /data/kafka/config/server.properties
broker.id=2
......
listeners=PLAINTEXT://192.168.10.205:9092
......

启动三个节点的kafka服务
[root@kafka01 src]# nohup /data/kafka/bin/kafka-server-start.sh /data/kafka/config/server.properties >/dev/null 2>&1 &
[root@kafka01 src]# lsof -i:9092
COMMAND   PID USER   FD   TYPE  DEVICE SIZE/OFF NODE NAME
java    26114 root   97u  IPv6 8298666      0t0  TCP kafka01:XmlIpcRegSvc (LISTEN)
java    26114 root  113u  IPv6 8298672      0t0  TCP kafka01:53112->kafka01:XmlIpcRegSvc (ESTABLISHED)
java    26114 root  114u  IPv6 8298673      0t0  TCP kafka01:XmlIpcRegSvc->kafka01:53112 (ESTABLISHED)

验证服务
随便在其中一台节点主机执行
[root@kafka01 src]# /data/kafka/bin/kafka-topics.sh --create --zookeeper 192.168.10.202:2181,192.168.10.203:2181,192.168.10.205:2181 --replication-factor 1 --partitions 1 --topic test
出现下面信息说明建立成功
Created topic "test".

而后再在其余主机查看上面建立的topic
[root@kafka02 src]# /data/kafka/bin/kafka-topics.sh --list --zookeeper 192.168.10.202:2181,192.168.10.203:2181,192.168.10.205:2181
test

到此,kafka集群环境已部署完成!

4)安装kafka集群管理工具kafka-manager
为了简化开发者和服务工程师维护Kafka集群的工做,yahoo构建了一个叫作Kafka管理器的基于Web工具,叫作 Kafka Manager。kafka-manager 项目地址:https://github.com/yahoo/kafka-manager。这个管理工具能够很容易地发现分布在集群中的哪些topic分布不均匀,或者是分区在整个集群分布不均匀的的状况。它支持管理多个集群、选择副本、副本从新分配以及建立Topic。同时,这个管理工具也是一个很是好的能够快速浏览这个集群的工具,kafka-manager有以下功能:
- 管理多个kafka集群
- 便捷的检查kafka集群状态(topics,brokers,备份分布状况,分区分布状况)
- 选择你要运行的副本
- 基于当前分区情况进行
- 能够选择topic配置并建立topic(0.8.1.1和0.8.2的配置不一样)
- 删除topic(只支持0.8.2以上的版本而且要在broker配置中设置delete.topic.enable=true)
- Topic list会指明哪些topic被删除(在0.8.2以上版本适用)
- 为已存在的topic增长分区
- 为已存在的topic更新配置
- 在多个topic上批量重分区
- 在多个topic上批量重分区(可选partition broker位置)

kafka-manager安装过程以下

下载安装 kafka-manager
想要查看和管理Kafka,彻底使用命令并不方便,咱们可使用雅虎开源的Kafka-manager,GitHub地址以下:
https://github.com/yahoo/kafka-manager

也可使用Git或者直接从Releases中下载,此处从下面的地址下载 1.3.3.7 版本:
https://github.com/yahoo/kafka-manager/releases

须要注意:
上面下载的是源码,下载后须要按照后面步骤进行编译。若是以为麻烦,能够直接下载编译好的kafka-manager-1.3.3.7.zip。
下载地址:https://pan.baidu.com/s/12j2DEt94WsWRY6dD9aR6BQ
提取密码:8x57

[root@kafka-manager src]# ls kafka-manager-1.3.3.7.zip 
kafka-manager-1.3.3.7.zip
[root@kafka-manager src]# unzip kafka-manager-1.3.3.7.zip
[root@kafka-manager src]# mv kafka-manager-1.3.3.7 /data/
[root@kafka-manager src]# cd /data/kafka-manager-1.3.3.7
[root@kafka-manager kafka-manager-1.3.3.7]# cd conf/
[root@kafka-manager conf]# cp application.conf application.conf.bak
[root@kafka-manager conf]# vim application.conf
......
#kafka-manager.zkhosts="localhost:2181"                  #注释这一行,下面添加一行
kafka-manager.zkhosts="192.168.10.202:2181,192.168.10.203:2181,192.168.10.205:2181"

启动kafka-manager
[root@kafka-manager conf]# nohup /data/kafka-manager-1.3.3.7/bin/kafka-manager >/dev/null 2>&1 &

----------------------------------------------------------------------------------------------------
须要注意:
kafka-manager 默认的端口是9000,可经过 -Dhttp.port,指定端口; -Dconfig.file=conf/application.conf指定配置文件:
[root@kafka-manager conf]# nohup bin/kafka-manager -Dconfig.file=/data/kafka-manager-1.3.3.7/conf/application.conf -Dhttp.port=8080 &
----------------------------------------------------------------------------------------------------

启动完毕后能够查看端口是否启动,因为启动过程须要一段时间,端口起来的时间可能会延后。
[root@kafka-manager conf]# lsof -i:9000
COMMAND   PID USER   FD   TYPE  DEVICE SIZE/OFF NODE NAME
java    27218 root  114u  IPv6 3766984      0t0  TCP *:cslistener (LISTEN)

最后就可使用http://192.168.10.206:9000访问了

 

kafka-mamager测试
新建 Cluster1
点击【Cluster】>【Add Cluster】打开以下添加集群的配置界面:
输入集群的名字(如Kafka-Cluster-test)和 Zookeeper 服务器地址(如localhost:2181),选择最接近的Kafka版本(如0.10.1.1)
-------------------------------------------------------------------
注意:若是没有在 Kafka 中配置过 JMX_PORT,千万不要选择第一个复选框。
Enable JMX Polling
若是选择了该复选框,Kafka-manager 可能会没法启动。
-------------------------------------------------------------------

其余broker的配置能够根据本身须要进行配置,默认状况下,点击【保存】时,会提示几个默认值为1的配置错误,须要配置为>=2的值。提示以下。

新建完成后,运行界面以下:

查看TOPIC 信息

查看broker信息

管理 kafka-mamager
新建主题
点击【Topic】>【Create】能够方便的建立并配置主题。以下显示。

因为集群只有三个节点,故replication factor最多只能设置为3

================================================
针对上面Topic->Create新建主题的配置,下面根据一张图讲解

在上图一个Kafka集群中,有两个服务器,每一个服务器上都有2个分区。P0,P3可能属于同一个主题,也多是两个不一样的主题。若是设置的Partitons和Replication Factor都是2,这种状况下该主题的分步就和上图中Kafka集群显示的相同,此时P0,P3是同一个主题的两个分区。P1,P2也是同一个主题的两个分区,Server1和Server2其中一个会做为Leader进行读写操做,另外一个经过复制进行同步。若是设置的Partitons和Replication Factor都是1,这时只会根据算法在某个Server上建立一个分区,能够是P0~4中的某一个(分区都是新建的,不是先存在4个而后从中取1个)。

相关文章
相关标签/搜索