[kafka] 001_kafka起步

 

 

1、简介html

  Kafka is a distributed, partitioned, replicated commit log service. It provides the functionality of a messaging system, but with a unique design.

 Kafka是一种高吞吐量的分布式发布订阅消息系统,它能够处理消费者规模的网站中的全部动做流数据。

  这种动做(网页浏览,搜索和其余用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据一般是因为吞吐量的要求而经过处理日志和日志聚合来解决。

  对于像Hadoop的同样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。

  Kafka的目的是经过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了经过集群机来提供实时的消费。

 

2、特色node

Kafka maintains feeds of messages in categories called topics.

We'll call processes that publish messages to a Kafka topic producers.

We'll call processes that subscribe to topics and process the feed of published messages consumers.

Kafka is run as a cluster comprised of one or more servers each of which is called a broker.

at a high level, producers send messages over the network to the Kafka cluster which in turn serves them up to consumers like this:apache

 

Communication between the clients and the servers is done with a simple, high-performance, language agnostic TCP protocol. We provide a Java client for Kafka, but clients are available in many languages.(客户端和服务器之间的沟通采用了TCP协议,kafka提供了基于Java的客户端,可是理论上客户端能够用任何语言编码实现)缓存

 

3、相关术语服务器

Broker
Kafka集群包含一个或多个服务器,这种服务器被称为broker.

Topic
每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic. 物理上不一样Topic的消息分开存储,逻辑上一个Topic 的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic便可生产或消费数据而没必要关心数据存于何处.

Partition
Partition是物理上的概念,每一个Topic包含一个或多个Partition.

Producer
负责发布消息到Kafka broker

Consumer
消息消费者,向Kafka broker读取(pull)消息的客户端.

Consumer Group
每一个Consumer属于一个特定的Consumer Group(可为每一个Consumer指定group name,若不指定group name则属于默认的group).

 

4、设计原理网络

kafka的设计初衷是但愿做为一个统一的信息收集平台,可以实时的收集反馈信息,并须要可以支撑较大的数据量,且具有良好的容错能力.

一、持久性并发

kafka使用文件存储消息,这就直接决定kafka在性能上严重依赖文件系统的自己特性.且不管任何OS下,对文件系统自己的优化几乎没有可能.

文件缓存/直接内存映射等是经常使用的手段.

由于kafka是对日志文件进行append操做,所以磁盘检索的开支是较小的;同时为了减小磁盘写入的次数,broker会将消息暂时buffer起来,当消息的个数(或尺寸)达到必定阀值时,再flush到磁盘,这样减小了磁盘IO调用的次数.

二、性能app

须要考虑的影响性能点不少,除磁盘IO以外,咱们还须要考虑网络IO,这直接关系到kafka的吞吐量问题.kafka并无提供太多高超的技巧;

对于producer端,能够将消息buffer起来,当消息的条数达到必定阀值时,批量发送给broker;

对于consumer端也是同样,批量fetch多条消息.不过消息量的大小能够经过配置文件来指定.

对于kafka broker端,彷佛有个sendfile系统调用能够潜在的提高网络IO的性能:将文件的数据映射到系统内存中,socket直接读取相应的内存区域便可,而无需进程再次copy和交换.

其实对于producer/consumer/broker三者而言,CPU的开支应该都不大,所以启用消息压缩机制是一个良好的策略;压缩须要消耗少许的CPU资源,不过对于kafka而言,网络IO更应该须要考虑.能够将任何在网络上传输的消息都通过压缩.kafka支持gzip/snappy等多种压缩方式.

三、生产者负载均衡

负载均衡: 
    producer将会和Topic下全部partition leader保持socket链接;消息由producer直接经过socket发送到broker,中间不会通过任何"路由层".事实上,消息被路由到哪一个partition上,有producer客户端决定.好比能够采用"random""key-hash""轮询"等,若是一个topic中有多个partitions,那么在producer端实现"消息均衡分发"是必要的.
 
    其中partition leader的位置(host:port)注册在zookeeper中,producer做为zookeeper client,已经注册了watch用来监听partition leader的变动事件.

    异步发送:将多条消息暂且在客户端buffer起来,并将他们批量的发送到broker,小数据IO太多,会拖慢总体的网络延迟,批量延迟发送事实上提高了网络效率。不过这也有必定的隐患,好比说当producer失效时,那些还没有发送的消息将会丢失。

 

  四、消费者dom

    consumer端向broker发送"fetch"请求,并告知其获取消息的offset;此后consumer将会得到必定条数的消息;consumer端也能够重置offset来从新消费消息.
 
    在JMS实现中,Topic模型基于push方式,即broker将消息推送给consumer端.不过在kafka中,采用了pull方式,即consumer在和broker创建链接以后,主动去pull(或者说fetch)消息;这中模式有些优势,首先consumer端能够根据本身的消费能力适时的去fetch消息并处理,且能够控制消息消费的进度(offset);此外,消费者能够良好的控制消息消费的数量,batch fetch.
 
    其余JMS实现,消息消费的位置是有prodiver保留,以便避免重复发送消息或者将没有消费成功的消息重发等,同时还要控制消息的状态.这就要求JMS broker须要太多额外的工做.在kafka中,partition中的消息只有一个consumer在消费,且不存在消息状态的控制,也没有复杂的消息确认机制,可见kafka broker端是至关轻量级的.当消息被consumer接收以后,consumer能够在本地保存最后消息的offset,并间歇性的向zookeeper注册offset.因而可知,consumer客户端也很轻量级.

 

 

  五、消息传送机制

对于JMS实现,消息传输担保很是直接:有且只有一次(exactly once).在kafka中稍有不一样,有三种方式:

1) at most once: 最多一次,这个和JMS中"非持久化"消息相似.发送一次,不管成败,将不会重发. 2) at least once: 消息至少发送一次,若是消息未能接受成功,可能会重发,直到接收成功. 3) exactly once: 消息只会发送一次.
at most once: 消费者fetch消息,而后保存offset,而后处理消息;当client保存offset以后,可是在消息处理过程当中出现了异常,致使部分消息未能继续处理.那么此后
"未处理"的消息将不能被fetch到,这就是"at most once".
at least once: 消费者fetch消息,而后处理消息,而后保存offset.若是消息处理成功以后,可是在保存offset阶段zookeeper异常致使保存操做未能执行成功,这就致使接下来再次fetch时可能得到上次已经处理过的消息,这就是
"at least once",缘由offset没有及时的提交给zookeeper,zookeeper恢复正常仍是以前offset状态.
exactly once: kafka中并无严格的去实现(基于2阶段提交,事务),咱们认为这种策略在kafka中是没有必要的.
一般状况下
"at-least-once"是咱们首选.(相比at most once而言,重复接收数据总比丢失数据要好).

 

  六、复制备份

    kafka将每一个partition数据复制到多个server上,任何一个partition有一个leader和多个follower(能够没有);

    备份的个数能够经过broker配置文件来设定.

    leader处理全部的read-write请求,follower须要和leader保持同步.Follower和consumer同样,消费消息并保存在本地日志中;leader负责跟踪全部的follower状态,若是follower"落后"太多或者失效,leader将会把它从replicas同步列表中删除.当全部的follower都将一条消息保存成功,此消息才被认为是"committed",那么此时consumer才能消费它.

    即便只有一个replicas实例存活,仍然能够保证消息的正常发送和接收,只要zookeeper集群存活便可.(不一样于其余分布式存储,好比hbase须要"多数派"存活才行)

    当leader失效时,需在followers中选取出新的leader,可能此时follower落后于leader,所以须要选择一个"up-to-date"的follower.选择follower时须要兼顾一个问题,就是新leader-server上所已经承载的partition leader的个数,若是一个server上有过多的partition leader,意味着此server将承受着更多的IO压力.在选举新leader,须要考虑到"负载均衡".

 

  7.日志

    若是一个topic的名称为"my_topic",它有2个partitions,那么日志将会保存在my_topic_0和my_topic_1两个目录中;

    日志文件中保存了一序列"log entries"(日志条目),每一个log entry格式为"4个字节的数字N表示消息的长度" + "N个字节的消息内容";

    每一个日志都有一个offset来惟一的标记一条消息,offset的值为8个字节的数字,表示此消息在此partition中所处的起始位置.每一个partition在物理存储层面,由多个log file组成(称为segment).

    segmentfile的命名为"最小offset".kafka.例如"00000000000.kafka";其中"最小offset"表示此segment中起始消息的offset.

     其中每一个partiton中所持有的segments列表信息会存储在zookeeper中.
    
    当segment文件尺寸达到必定阀值时(能够经过配置文件设定,默认1G),将会建立一个新的文件;当buffer中消息的条数达到阀值时将会触发日志信息flush到日志文件中,同时若是"距离最近一次flush的时间差"达到阀值时,也会触发flush到日志文件.若是broker失效,极有可能会丢失那些还没有flush到文件的消息.由于server意外实现,仍然会致使log文件格式的破坏(文件尾部),那么就要求当server启东是须要检测最后一个segment的文件结构是否合法并进行必要的修复.

    获取消息时,须要指定offset和最大chunk尺寸,offset用来表示消息的起始位置,chunk size用来表示最大获取消息的总长度(间接的表示消息的条数).根据offset,能够找到此消息所在segment文件,而后根据segment的最小offset取差值,获得它在file中的相对位置,直接读取输出便可.
  
    日志文件的删除策略很是简单:启动一个后台线程按期扫描log file列表,把保存时间超过阀值的文件直接删除(根据文件的建立时间).为了不删除文件时仍然有read操做(consumer消费),采起copy-on-write方式.

 

八、分配

  kafka使用zookeeper来存储一些meta信息,并使用了zookeeper watch机制来发现meta信息的变动并做出相应的动做(好比consumer失效,触发负载均衡等)
1) Broker node registry: 当一个kafka-broker启动后,首先会向zookeeper注册本身的节点信息(临时znode),同时当broker和zookeeper断开链接时,此znode也会被删除.
格式:
/broker/ids/[0...N] -->host:port;其中[0..N]表示broker id,每一个broker的配置文件中都须要指定一个数字类型的id(全局不可重复),znode的值为此broker的host:port信息.
2) Broker Topic Registry: 当一个broker启动时,会向zookeeper注册本身持有的topic和partitions信息,仍然是一个临时znode.
  格式:
/broker/topics/[topic]/[0...N] 其中[0..N]表示partition索引号.
  
3) Consumer and Consumer group: 每一个consumer客户端被建立时,会向 zookeeper 注册本身的信息;此做用主要是为了"负载均衡".
一个group中的多个consumer能够交错的消费一个topic的全部partitions;简而言之,保证此topic的全部partitions都能被此group所消费,且消费时为了性能考虑,让partition相对均衡的分散到每一个consumer上.
4) Consumer id Registry: 每一个consumer都有一个惟一的ID(host:uuid,能够经过配置文件指定,也能够由系统生成),此id用来标记消费者信息.
格式:
/consumers/[group_id]/ids/[consumer_id]
仍然是一个临时的znode,此节点的值为{
"topic_name":#streams...},即表示此consumer目前所消费的topic + partitions列表.
5) Consumer offset Tracking: 用来跟踪每一个consumer目前所消费的partition中最大的offset.
格式:
/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id]-->offset_value 此znode为持久节点,能够看出offset跟group_id有关,以代表当group中一个消费者失效,其余consumer能够继续消费.
6) Partition Owner registry: 用来标记partition被哪一个consumer消费.临时znode
格式:
/consumers/[group_id]/owners/[topic]/[broker_id-partition_id]-->consumer_node_id当consumer启动时,所触发的操做:
A) 首先进行
"Consumer id Registry";
B) 而后在
"Consumer id Registry"节点下注册一个watch用来监听当前group中其余consumer的"leave""join";只要此znode path下节点列表变动,都会触发此group下consumer的负载均衡.(好比一个consumer失效,那么其余consumer接管partitions).
C) 在
"Broker id registry"节点下,注册一个watch用来监听broker的存活状况;若是broker列表变动,将会触发全部的groups下的consumer从新balance.

 

 

  1) Producer端使用zookeeper用来"发现"broker列表,以及和Topic下每一个partition leader创建socket链接并发送消息.
2) Broker端使用zookeeper用来注册broker信息,已经监测partition-leader存活性.
3) Consumer端使用zookeeper用来注册consumer信息,其中包括consumer消费的partition列表等,同时也用来发现broker列表,并和partition leader创建socket链接,并获取消息.

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

5、参考资料

http://kafka.apache.org/documentation.html#gettingStarted

http://www.cnblogs.com/likehua/p/3999538.html

相关文章
相关标签/搜索