kafka入门:简介、使用场景、设计原理、主要配置及集群搭建(转)

问题导读:
1.zookeeper在kafka的做用是什么?
2.kafka中几乎不容许对消息进行“随机读写”的缘由是什么?
3.kafka集群consumer和producer状态信息是如何保存的?
4.partitions设计的目的的根本缘由是什么?




 

1、入门
    一、简介
    Kafka is a distributed,partitioned,replicated commit logservice。它提供了相似于JMS的特性,可是在 设计实现上彻底不一样,此外它并非JMS规范的实现。kafka对消息保存时根据Topic进行归类,发送消息者成为Producer,消息接受者成为Consumer,此外kafka集群有多个kafka实例组成,每一个实例(server)成为broker。不管是kafka集群,仍是producer和consumer都依赖于zookeeper来保证系统可用性集群保存一些meta信息。
<ignore_js_op>  
 
   二、Topics/logs
    一个Topic能够认为是一类消息,每一个topic将被分红多个partition(区),每一个partition在存储层面是append log文件。任何发布到此partition的消息都会被直接追加到log文件的尾部,每条消息在文件中的位置称为offset(偏移量),offset为一个long型数字,它是惟一标记一条消息。它惟一的标记一条消息。kafka并无提供其余额外的索引机制来存储offset,由于在kafka中几乎不容许对消息进行“随机读写”。

 

<ignore_js_op>  

 

    kafka和JMS(Java Message Service)实现(activeMQ)不一样的是:即便消息被消费,消息仍然不会被当即删除.日志文件将会根据broker中的配置要求,保留必定的时间以后删除;好比log文件保留2天,那么两天后,文件会被清除,不管其中的消息是否被消费.kafka经过这种简单的手段,来释放磁盘空间,以及减小消息消费以后对文件内容改动的磁盘IO开支.
 
    对于consumer而言,它须要保存消费消息的offset,对于offset的保存和使用,有consumer来控制;当consumer正常消费消息时,offset将会"线性"的向前驱动,即消息将依次顺序被消费.事实上consumer可使用任意顺序消费消息,它只须要将offset重置为任意值..(offset将会保存在zookeeper中,参见下文)
 
    kafka集群几乎不须要维护任何consumer和producer状态信息,这些信息有zookeeper保存;所以producer和consumer的 客户端实现很是轻量级,它们能够随意离开,而不会对集群形成额外的影响.
 
    partitions的 设计目的有多个.最根本缘由是kafka基于文件存储.经过分区,能够将日志内容分散到多个server上,来避免文件尺寸达到单机磁盘的上限,每一个partiton都会被当前server(kafka实例)保存;能够将一个topic切分多任意多个partitions,来消息保存/消费的效率.此外越多的partitions意味着能够容纳更多的consumer,有效提高并发消费的能力.(具体原理参见下文).
 
    三、Distribution
    一个Topic的多个partitions,被分布在kafka集群中的多个server上;每一个server(kafka实例)负责partitions中消息的读写操做;此外kafka还能够配置partitions须要备份的个数(replicas),每一个partition将会被备份到多台机器上,以提升可用性.
 
    基于replicated方案,那么就意味着须要对多个备份进行调度;每一个partition都有一个 server为"leader";leader负责全部的读写操做,若是leader失效,那么将会有其余follower来接管(成为新的leader);follower只是单调的和leader跟进,同步消息便可..因而可知做为leader的server承载了所有的请求压力,所以从集群的总体考虑,有多少个partitions就意味着有多少个"leader",kafka会将"leader"均衡的分散在每一个实例上,来确保总体的性能稳定.
 
    Producers
    Producer将消息发布到指定的Topic中,同时Producer也能决定将此消息归属于哪一个partition;好比基于"round-robin"方式或者经过其余的一些算法等.
 
    Consumers
    本质上kafka只支持Topic.每一个consumer属于一个consumer group;反过来讲,每一个group中能够有多个consumer.发送到Topic的消息,只会被订阅此Topic的每一个group中的一个consumer消费.
 
    若是全部的consumer都具备相同的group,这种状况和queue模式很像;消息将会在consumers之间负载均衡.
    若是全部的consumer都具备不一样的group,那这就是"发布-订阅";消息将会广播给全部的消费者.
    在kafka中,一个partition中的消息只会被group中的一个consumer消费;每一个group中consumer消息消费互相独立;咱们能够认为一个group是一个"订阅"者,一个Topic中的每一个partions,只会被一个"订阅者"中的一个consumer消费,不过一个consumer能够消费多个partitions中的消息.kafka只能保证一个partition中的消息被某个consumer消费时,消息是顺序的.事实上,从Topic角度来讲,消息仍不是有序的.
 
    kafka的 设计原理决定,对于一个topic,同一个group中不能有多于partitions个数的consumer同时消费,不然将意味着某些consumer将没法获得消息.
 
    Guarantees
    1) 发送到partitions中的消息将会按照它接收的顺序追加到日志中
    2) 对于消费者而言,它们消费消息的顺序和日志中消息顺序一致.
    3) 若是Topic的"replicationfactor"为N,那么容许N-1个kafka实例失效.
 
2、使用场景
 
    一、Messaging   
    对于一些常规的消息系统,kafka是个不错的选择;partitons/replication和容错,可使kafka具备良好的扩展性和性能优点.不过到目前为止,咱们应该很清楚认识到,kafka并无提供JMS中的"事务性""消息传输担保(消息确认机制)""消息分组"等企业级特性;kafka只能使用做为"常规"的消息系统,在必定程度上,还没有确保消息的发送与接收绝对可靠(好比,消息重发,消息发送丢失等)
 
    二、Websit activity tracking
    kafka能够做为"网站活性跟踪"的最佳工具;能够将网页/用户操做等信息发送到kafka中.并实时监控,或者离线统计分析等

 

    三、Log Aggregation
    kafka的特性决定它很是适合做为"日志收集中心";application能够将操做日志"批量""异步"的发送到kafka集群中,而不是保存在本地或者DB中;kafka能够批量提交消息/压缩消息等,这对producer端而言,几乎感受不到性能的开支.此时consumer端可使hadoop等其余系统化的存储和分析系统.
 
3、设计原理
 
    kafka的 设计初衷是但愿做为一个统一的信息收集平台,可以实时的收集反馈信息,并须要可以支撑较大的数据量,且具有良好的容错能力.
 
    一、持久性
    kafka使用文件存储消息,这就直接决定kafka在性能上严重依赖文件系统的自己特性.且不管任何OS下,对文件系统自己的优化几乎没有可能.文件缓存/直接内存映射等是经常使用的手段.由于kafka是对日志文件进行append操做,所以磁盘检索的开支是较小的;同时为了减小磁盘写入的次数,broker会将消息暂时buffer起来,当消息的个数(或尺寸)达到必定阀值时,再flush到磁盘,这样减小了磁盘IO调用的次数.

二、性能
    须要考虑的影响性能点不少,除磁盘IO以外,咱们还须要考虑网络IO,这直接关系到kafka的吞吐量问题.kafka并无提供太多高超的技巧;对于producer端,能够将消息buffer起来,当消息的条数达到必定阀值时,批量发送给broker;对于consumer端也是同样,批量fetch多条消息.不过消息量的大小能够经过配置文件来指定.对于kafka broker端,彷佛有个sendfile系统调用能够潜在的提高网络IO的性能:将文件的数据映射到系统内存中,socket直接读取相应的内存区域便可,而无需进程再次copy和交换. 其实对于producer/consumer/broker三者而言,CPU的开支应该都不大,所以启用消息压缩机制是一个良好的策略;压缩须要消耗少许的CPU资源,不过对于kafka而言,网络IO更应该须要考虑.能够将任何在网络上传输的消息都通过压缩.kafka支持gzip/snappy等多种压缩方式.
 
    三、生产者
    负载均衡: producer将会和Topic下全部partition leader保持socket链接;消息由producer直接经过socket发送到broker,中间不会通过任何"路由层".事实上,消息被路由到哪一个partition上,有producer 客户端决定.好比能够采用"random""key-hash""轮询"等,若是一个topic中有多个partitions,那么在producer端实现"消息均衡分发"是必要的.
 
    其中partition leader的位置(host:port)注册在zookeeper中,producer做为zookeeper client,已经注册了watch用来监听partition leader的变动事件.
    异步发送:将多条消息暂且在客户端buffer起来,并将他们批量的发送到broker,小数据IO太多,会拖慢总体的网络延迟,批量延迟发送事实上提高了网络效率。不过这也有必定的隐患,好比说当producer失效时,那些还没有发送的消息将会丢失。

 

    四、消费者
    consumer端向broker发送"fetch"请求,并告知其获取消息的offset;此后consumer将会得到必定条数的消息;consumer端也能够重置offset来从新消费消息.
 
    在JMS实现中,Topic模型基于push方式,即broker将消息推送给consumer端.不过在kafka中,采用了pull方式,即consumer在和broker创建链接以后,主动去pull(或者说fetch)消息;这中模式有些优势,首先consumer端能够根据本身的消费能力适时的去fetch消息并处理,且能够控制消息消费的进度(offset);此外,消费者能够良好的控制消息消费的数量,batch fetch.
 
    其余JMS实现,消息消费的位置是有prodiver保留,以便避免重复发送消息或者将没有消费成功的消息重发等,同时还要控制消息的状态.这就要求JMS broker须要太多额外的工做.在kafka中,partition中的消息只有一个consumer在消费,且不存在消息状态的控制,也没有复杂的消息确认机制,可见kafka broker端是至关轻量级的.当消息被consumer接收以后,consumer能够在本地保存最后消息的offset,并间歇性的向zookeeper注册offset.因而可知,consumer 客户端也很轻量级.
 
<ignore_js_op>  



    五、消息传送机制
    对于JMS实现,消息传输担保很是直接:有且只有一次(exactly once).在kafka中稍有不一样:
    1) at most once: 最多一次,这个和JMS中"非持久化"消息相似.发送一次,不管成败,将不会重发.
    2) at least once: 消息至少发送一次,若是消息未能接受成功,可能会重发,直到接收成功.
    3) exactly once: 消息只会发送一次.
    at most once: 消费者fetch消息,而后保存offset,而后处理消息;当client保存offset以后,可是在消息处理过程当中出现了异常,致使部分消息未能继续处理.那么此后"未处理"的消息将不能被fetch到,这就是"at most once".
    at least once: 消费者fetch消息,而后处理消息,而后保存offset.若是消息处理成功以后,可是在保存offset阶段zookeeper异常致使保存操做未能执行成功,这就致使接下来再次fetch时可能得到上次已经处理过的消息,这就是"at least once",缘由offset没有及时的提交给zookeeper,zookeeper恢复正常仍是以前offset状态.
    exactly once: kafka中并无严格的去实现(基于2阶段提交,事务),咱们认为这种策略在kafka中是没有必要的.
    一般状况下"at-least-once"是咱们搜选.(相比at most once而言,重复接收数据总比丢失数据要好).
 
    六、复制备份
    kafka将每一个partition数据复制到多个server上,任何一个partition有一个leader和多个follower(能够没有);备份的个数能够经过broker配置文件来设定.leader处理全部的read-write请求,follower须要和leader保持同步.Follower和consumer同样,消费消息并保存在本地日志中;leader负责跟踪全部的follower状态,若是follower"落后"太多或者失效,leader将会把它从replicas同步列表中删除.当全部的follower都将一条消息保存成功,此消息才被认为是"committed",那么此时consumer才能消费它.即便只有一个replicas实例存活,仍然能够保证消息的正常发送和接收,只要zookeeper集群存活便可.(不一样于其余分布式存储,好比hbase须要"多数派"存活才行)
    当leader失效时,需在followers中选取出新的leader,可能此时follower落后于leader,所以须要选择一个"up-to-date"的follower.选择follower时须要兼顾一个问题,就是新leader server上所已经承载的partition leader的个数,若是一个server上有过多的partition leader,意味着此server将承受着更多的IO压力.在选举新leader,须要考虑到"负载均衡".
 
    7.日志
    若是一个topic的名称为"my_topic",它有2个partitions,那么日志将会保存在my_topic_0和my_topic_1两个目录中;日志文件中保存了一序列"log entries"(日志条目),每一个log entry格式为"4个字节的数字N表示消息的长度" + "N个字节的消息内容";每一个日志都有一个offset来惟一的标记一条消息,offset的值为8个字节的数字,表示此消息在此partition中所处的起始位置..每一个partition在物理存储层面,有多个log file组成(称为segment).segmentfile的命名为"最小offset".kafka.例如"00000000000.kafka";其中"最小offset"表示此segment中起始消息的offset.
<ignore_js_op>  
    其中每一个partiton中所持有的segments列表信息会存储在zookeeper中.
    当segment文件尺寸达到必定阀值时(能够经过配置文件设定,默认1G),将会建立一个新的文件;当buffer中消息的条数达到阀值时将会触发日志信息flush到日志文件中,同时若是"距离最近一次flush的时间差"达到阀值时,也会触发flush到日志文件.若是broker失效,极有可能会丢失那些还没有flush到文件的消息.由于 server意外实现,仍然会致使log文件格式的破坏(文件尾部),那么就要求当server启东是须要检测最后一个segment的文件结构是否合法并进行必要的修复.
    获取消息时,须要指定offset和最大chunk尺寸,offset用来表示消息的起始位置,chunk size用来表示最大获取消息的总长度(间接的表示消息的条数).根据offset,能够找到此消息所在segment文件,而后根据segment的最小offset取差值,获得它在file中的相对位置,直接读取输出便可.
    日志文件的删除策略很是简单:启动一个后台线程按期扫描log file列表,把保存时间超过阀值的文件直接删除(根据文件的建立时间).为了不删除文件时仍然有read操做(consumer消费),采起copy-on-write方式.
 
    八、分配
    kafka使用zookeeper来存储一些meta信息,并使用了zookeeper watch机制来发现meta信息的变动并做出相应的动做(好比consumer失效,触发负载均衡等)
    1) Broker node registry: 当一个kafkabroker启动后,首先会向zookeeper注册本身的节点信息(临时znode),同时当broker和zookeeper断开链接时,此znode也会被删除.
    格式: /broker/ids/[0...N]   -->host:port;其中[0..N]表示broker id,每一个broker的配置文件中都须要指定一个数字类型的id(全局不可重复),znode的值为此broker的host:port信息.
    2) Broker Topic Registry: 当一个broker启动时,会向zookeeper注册本身持有的topic和partitions信息,仍然是一个临时znode.
    格式: /broker/topics/[topic]/[0...N]  其中[0..N]表示partition索引号.
    3) Consumer and Consumer group: 每一个consumer 客户端被建立时,会向zookeeper注册本身的信息;此做用主要是为了"负载均衡".
    一个group中的多个consumer能够交错的消费一个topic的全部partitions;简而言之,保证此topic的全部partitions都能被此group所消费,且消费时为了性能考虑,让partition相对均衡的分散到每一个consumer上.
    4) Consumer id Registry: 每一个consumer都有一个惟一的ID(host:uuid,能够经过配置文件指定,也能够由系统生成),此id用来标记消费者信息.
    格式:/consumers/[group_id]/ids/[consumer_id]
    仍然是一个临时的znode,此节点的值为{"topic_name":#streams...},即表示此consumer目前所消费的topic + partitions列表.
    5) Consumer offset Tracking: 用来跟踪每一个consumer目前所消费的partition中最大的offset.
    格式:/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id]-->offset_value
    此znode为持久节点,能够看出offset跟group_id有关,以代表当group中一个消费者失效,其余consumer能够继续消费.
    6) Partition Owner registry: 用来标记partition被哪一个consumer消费.临时znode
    格式:/consumers/[group_id]/owners/[topic]/[broker_id-partition_id]-->consumer_node_id当consumer启动时,所触发的操做:
    A) 首先进行"Consumer id Registry";
    B) 而后在"Consumer id Registry"节点下注册一个watch用来监听当前group中其余consumer的"leave"和"join";只要此znode path下节点列表变动,都会触发此group下consumer的负载均衡.(好比一个consumer失效,那么其余consumer接管partitions).
    C) 在"Broker id registry"节点下,注册一个watch用来监听broker的存活状况;若是broker列表变动,将会触发全部的groups下的consumer从新balance.
<ignore_js_op>  
    1) Producer端使用zookeeper用来"发现"broker列表,以及和Topic下每一个partition leader创建socket链接并发送消息.
    2) Broker端使用zookeeper用来注册broker信息,已经监测partitionleader存活性.
    3) Consumer端使用zookeeper用来注册consumer信息,其中包括consumer消费的partition列表等,同时也用来发现broker列表,并和partition leader创建socket链接,并获取消息.
 
4、主要配置
 
    一、Broker配置

 

<ignore_js_op>  

 

    2.Consumer主要配置

 

<ignore_js_op>  

 

3.Producer主要配置

 

<ignore_js_op>  

 

 
以上是关于kafka一些基础说明,在其中咱们知道若是要kafka正常运行,必须配置zookeeper,不然不管是kafka集群仍是 客户端的生存者和消费者都没法正常的工做的,如下是对zookeeper进行一些简单的介绍:

 

5、zookeeper集群
    zookeeper是一个为分布式应用提供一致性服务的软件,它是开源的Hadoop项目的一个子项目,并根据google发表的一篇论文来实现的。zookeeper为分布式系统提供了高笑且易于使用的协同服务,它能够为分布式应用提供至关多的服务,诸如统一命名服务,配置管理,状态同步和组服务等。zookeeper接口简单,咱们没必要过多地纠结在分布式系统 编程难于处理的同步和一致性问题上,你可使用zookeeper提供的现成(off-the-shelf)服务来实现来实现分布式系统额配置管理,组管理,Leader选举等功能。
    zookeeper集群的安装,准备三台服务器server1:192.168.0.1,server2:192.168.0.2,
    server3:192.168.0.3.
    1)下载zookeeper
    到 http://zookeeper.apache.org/releases.html去下载最新版本Zookeeper-3.4.5的安装包zookeeper-3.4.5.tar.gz.将文件保存server1的~目录下
    2)安装zookeeper
    先在服务器 server分别执行a-c步骤
    a)解压  
    tar -zxvf zookeeper-3.4.5.tar.gz
    解压完成后在目录~下会发现多出一个目录zookeeper-3.4.5,从新命令为zookeeper
    b)配置
    将conf/zoo_sample.cfg拷贝一份命名为zoo.cfg,也放在conf目录下。而后按照以下值修改其中的配置:
   
    # The number of milliseconds of each tick
    tickTime=2000
    # The number of ticks that the initial
    # synchronization phase can take
    initLimit=10
    # The number of ticks that can pass between
    # sending a request and getting an acknowledgement
    syncLimit=5
    # the directory where the snapshot is stored.
    # do not use /tmp for storage, /tmp here is just
    # example sakes.
    dataDir=/home/wwb/zookeeper /data
    dataLogDir=/home/wwb/zookeeper/logs
    # the port at which the clients will connect
    clientPort=2181
    #
    # Be sure to read the maintenance section of the
    # administrator guide before turning on autopurge.
    #
    # The number of snapshots to retain in dataDir
    #autopurge.snapRetainCount=3
    # Purge task interval in hours
    # Set to "0" to disable auto purge feature
    #autopurge.purgeInterval=1
    server.1=192.168.0.1:3888:4888
    server.2=192.168.0.2:3888:4888
     server.3=192.168.0.3:3888:4888
    tickTime:这个时间是做为 Zookeeper 服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每一个 tickTime 时间就会发送一个心跳。
    dataDir:顾名思义就是 Zookeeper 保存数据的目录,默认状况下,Zookeeper 将写数据的日志文件也保存在这个目录里。
    clientPort:这个端口就是客户端链接 Zookeeper 服务器的端口,Zookeeper 会监听这个端口,接受客户端的访问请求。
    initLimit:这个配置项是用来配置 Zookeeper 接受 客户端(这里所说的客户端不是用户链接 Zookeeper 服务器的客户端,而是 Zookeeper 服务器集群中链接到 Leader 的 Follower 服务器)初始化链接时最长能忍受多少个心跳时间间隔数。当已经超过 5个心跳的时间(也就是 tickTime)长度后 Zookeeper 服务器尚未收到客户端的返回信息,那么代表这个客户端链接失败。总的时间长度就是 5*2000=10 秒
    syncLimit:这个配置项标识 Leader 与Follower 之间发送消息,请求和应答时间长度,最长不能超过多少个 tickTime 的时间长度,总的时间长度就是2*2000=4 秒
    server.A=B:C:D:其中 A 是一个数字,表示这个是第几号服务器;B 是这个服务器的 ip 地址;C 表示的是这个服务器与集群中的 Leader 服务器交换信息的端口;D 表示的是万一集群中的 Leader 服务器挂了,须要一个端口来从新进行选举,选出一个新的 Leader,而这个端口就是用来执行选举时服务器相互通讯的端口。若是是伪集群的配置方式,因为 B 都是同样,因此不一样的 Zookeeper 实例通讯端口号不能同样,因此要给它们分配不一样的端口号
注意:dataDir,dataLogDir中的wwb是当前登陆用户名,data,logs目录开始是不存在,须要使用mkdir命令建立相应的目录。而且在该目录下建立文件myid,serve1,server2,server3该文件内容分别为1,2,3。
针对服务器server2,server3能够将server1复制到相应的目录,不过须要注意dataDir,dataLogDir目录,而且文件myid内容分别为2,3
    3)依次启动 server1,server2,server3的zookeeper.
    /home/wwb/zookeeper/bin/zkServer.sh start,出现相似如下内容
    JMX enabled by default
    Using config: /home/wwb/zookeeper/bin/../conf/zoo.cfg
    Starting zookeeper ... STARTED
   4) 测试zookeeper是否正常工做,在server1上执行如下命令
    /home/wwb/zookeeper/bin/zkCli.sh -server192.168.0.2:2181,出现相似如下内容
    JLine support is enabled
    2013-11-27 19:59:40,560 - INFO      [main-SendThread(localhost.localdomain:2181):ClientCnxn$SendThread@736]- Session   establishmentcomplete on  server localhost.localdomain/127.0.0.1:2181, sessionid =    0x1429cdb49220000, negotiatedtimeout = 30000
 
    WATCHER::
   
    WatchedEvent state:SyncConnected type:None path:null
    [zk: 127.0.0.1:2181(CONNECTED) 0] [root@localhostzookeeper2]#  
    即表明集群构建成功了,若是出现错误那应该是第三部时没有启动好集群,
运行,先利用
    ps aux | grep zookeeper查看是否有相应的进程的,没有话,说明集群启动出现问题,能够在每一个服务器上使用
    ./home/wwb/zookeeper/bin/zkServer.sh stop。再依次使用./home/wwb/zookeeper/binzkServer.sh start,这时在执行4通常是没有问题,若是仍是有问题,那么先stop再到bin的上级目录执行./bin/zkServer.shstart试试。
 
注意:zookeeper集群时,zookeeper要求半数以上的机器可用,zookeeper才能提供服务。
 
6、kafka集群
(利用上面server1,server2,server3,下面以server1为实例)
    1)下载kafka0.8( http://kafka.apache.org/downloads.html),保存到服务器/home/wwb目录下kafka-0.8.0-beta1-src.tgz(kafka_2.8.0-0.8.0-beta1.tgz)
    2)解压 tar -zxvf kafka-0.8.0-beta1-src.tgz,产生文件夹kafka-0.8.0-beta1-src更改成kafka01   
3)配置
    修改kafka01/config/ server.properties,其中broker.id,log.dirs,zookeeper.connect必须根据实际状况进行修改,其余项根据须要自行斟酌。大体以下:
     broker.id=1  
     port=9091  
     num.network.threads=2  
     num.io.threads=2  
     socket.send.buffer.bytes=1048576  
    socket.receive.buffer.bytes=1048576  
     socket.request.max.bytes=104857600  
    log.dir=./logs  
    num.partitions=2  
    log.flush.interval.messages=10000  
    log.flush.interval.ms=1000  
    log.retention.hours=168  
    #log.retention.bytes=1073741824  
    log.segment.bytes=536870912  
    num.replica.fetchers=2  
    log.cleanup.interval.mins=10  
    zookeeper.connect=192.168.0.1:2181,192.168.0.2:2182,192.168.0.3:2183  
    zookeeper.connection.timeout.ms=1000000  
    kafka.metrics.polling.interval.secs=5  
    kafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter  
    kafka.csv.metrics.dir=/tmp/kafka_metrics  
    kafka.csv.metrics.reporter.enabled=false
 
4)初始化由于kafka用scala语言编写,所以运行kafka须要首先准备scala相关环境。
    > cd kafka01  
    > ./sbt update  
    > ./sbt package  
    > ./sbt assembly-package-dependency
在第二个命令时可能须要必定时间,因为要下载更新一些依赖包。因此请你们 耐心点。
5) 启动kafka01
    >JMX_PORT=9997 bin/kafka- server-start.sh config/server.properties &  
a)kafka02操做步骤与kafka01雷同,不一样的地方以下
    修改kafka02/config/server.properties
    broker.id=2
    port=9092
    ##其余配置和kafka-0保持一致
    启动kafka02
    JMX_PORT=9998 bin/kafka-server-start.shconfig/server.properties &  
b)kafka03操做步骤与kafka01雷同,不一样的地方以下
    修改kafka03/config/server.properties
    broker.id=3
    port=9093
    ##其余配置和kafka-0保持一致
    启动kafka02
    JMX_PORT=9999 bin/kafka- server-start.shconfig/server.properties &
6)建立Topic(包含一个分区,三个副本)
    >bin/kafka-create-topic.sh--zookeeper 192.168.0.1:2181 --replica 3 --partition 1 --topicmy-replicated-topic
7)查看topic状况
    >bin/kafka-list-top.sh --zookeeper 192.168.0.1:2181
    topic: my-replicated-topic  partition: 0 leader: 1  replicas: 1,2,0  isr: 1,2,0
8)建立发送者
   >bin/kafka-console-producer.sh--broker-list 192.168.0.1:9091 --topic my-replicated-topic
    my test message1
    my test message2
    ^C
9)建立消费者
    >bin/kafka-console-consumer.sh --zookeeper127.0.0.1:2181 --from-beginning --topic my-replicated-topic
    ...
    my test message1
    my test message2
^C
10)杀掉server1上的broker
  >pkill -9 -f config/ server.properties
11)查看topic
  >bin/kafka-list-top.sh --zookeeper192.168.0.1:2181
  topic: my-replicated-topic  partition: 0 leader: 1  replicas: 1,2,0  isr: 1,2,0
发现topic还正常的存在
11)建立消费者,看是否能查询到消息
    >bin/kafka-console-consumer.sh --zookeeper192.168.0.1:2181 --from-beginning --topic my-replicated-topic
    ...
    my test message 1
    my test message 2
    ^C
说明一切都是正常的。
 
OK,以上就是对Kafka我的的理解,不对之处请你们及时指出。
 
 
补充说明:
一、public Map<String, List<KafkaStream<byte[], byte[]>>> createMessageStreams(Map<String, Integer> topicCountMap),其中该方法的参数Map的key为topic名称,value为topic对应的分区数,譬如说若是在kafka中不存在相应的topic时,则会建立一个topic,分区数为value,若是存在的话,该处的value则不起什么做用

 

二、关于生产者向指定的分区发送数据,经过设置partitioner.class的属性来指定向那个分区发送数据,若是本身指定必须编写相应的程序,默认是kafka.producer.DefaultPartitioner,分区程序是基于散列的键。

 

三、在多个消费者读取同一个topic的数据,为了保证每一个消费者读取数据的惟一性,必须将这些消费者group_id定义为同一个值,这样就构建了一个相似队列的数据结构,若是定义不一样,则相似一种广播结构的。

 

四、在consumerapi中,参数 设计到数字部分,相似Map<String,Integer>,
numStream,指的都是在topic不存在的时,会建立一个topic,而且分区个数为Integer,numStream,注意若是数字大于broker的配置中num.partitions属性,会以num.partitions为依据建立分区个数的。

 

五、producerapi,调用send时,若是不存在topic,也会建立topic,在该方法中没有提供分区个数的参数,在这里分区个数是由服务端broker的配置中num.partitions属性决定的
 
关于kafka说明能够参考: http://kafka.apache.org/documentation.html
 
文章转自:http://www.aboutyun.com/thread-9341-1-1.html