kafka 消息服务

apache kafka参考html

http://kafka.apache.org/documentation.html前端

消息队列方式:web

点对点:算法

消息生产者生产消息发送到queue中,而后消息消费者从queue中取出而且消费消息。这里要注意:apache

  • 消息被消费之后,queue中再也不有存储,因此消息消费者不可能消费到已经被消费的消息。
  • Queue支持存在多个消费者,可是对一个消息而言,只会有一个消费者能够消费。
发布/订阅:

消息生产者(发布)将消息发布到topic中,同时有多个消息消费者(订阅)消费该消息。和点对点方式不一样,发布到topic的消息会被全部订阅者消费。服务器

背景介绍:网络

Kafka 是一个消息系统,本来开发自 LinkedIn,用做 LinkedIn 的活动流(Activity Stream)和运营数据处理管道(Pipeline)的基础。如今它已被多家公司做为多种类型的数据管道和消息系统使用。数据结构

活动流数据是几乎全部站点在对其网站使用状况作报表时都要用到的数据中最常规的部分。活动数据包括页面访问量(Page View)、被查看内容方面的信息以及搜索状况等内容。这种数据一般的处理方式并发

是先把各类活动以日志的形式写入某种文件,而后周期性地对这些文件进行统计分析。运营数据指的是服务器的性能数据(CPU、IO 使用率、请求时间、服务日志等等数据),总的来讲,运营数据的统计方法种类繁多。负载均衡

如上图所示,一个典型的Kafka集群中包含若干Producer(能够是web前端产生的Page View,或者是服务器日志,系统CPU、Memory等),若干broker(Kafka支持水平扩展,通常broker数量越多,

集群吞吐率越高),若干Consumer Group,以及一个Zookeeper集群。Kafka经过Zookeeper管理集群配置,选举leader,以及在Consumer Group发生变化时进行rebalance。Producer使用push模式

将消息发布到broker,Consumer使用pull模式从broker订阅并消费消息。

kafka名词解释和工做方式:

  • Broker : 一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker能够容纳多个topic。
  • Topic : 一类消息,例如page view日志、click日志等均可以以topic的形式存在。我们能够理解为一个队列。
  • Producer : 消息生产者,就是向kafka broker推送消息的客户端。
  • Consumer : 消息消费者,向kafka broker拉取消息的客户端
  • Consumer Group (CG):这是kafka用来实现一个topic消息的广播(发给全部的consumer)和单播(发给任意一个consumer)的手段。一个topic能够有多个CG。

                                                topic的消息会复制(不是真的复制,是概念上的)到全部的CG,但每一个CG只会把消息发给该CG中的一个consumer。

                                                若是须要实现广播,只要每一个consumer有一个独立的CG就能够了。

                                                要实现单播只要全部的consumer在同一个CG。

                                                用CG还能够将consumer进行自由的分组而不须要屡次发送消息到不一样的topic。

  • Partition:为了实现扩展性,一个很是大的topic能够分布到多个broker(即服务器)上,一个topic能够分为多个partition,每一个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。

                       kafka只保证按一个partition中的顺序将消息发给consumer,不保证一个topic的总体(多个partition间)的顺序。

  • Offset:每一个partition都由一系列有序的、不可变的消息组成,这些消息被连续的追加到partition中。partition中的每一个消息都有一个连续的序列号叫作offset,用于partition惟一标识一条消息。

                   kafka的存储文件都是按照offset.kafka来命名,用offset作名字的好处是方便查找。例如你想找位于2049的位置,只要找到2048.kafka的文件便可。固然the first offset就是00000000000.kafka

kafka特性:
  • 经过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即便数以TB的消息存储也可以保持长时间的稳定性能。
  • 高吞吐量:即便是很是普通的硬件kafka也能够支持每秒数十万的消息。
  • 支持同步和异步复制两种HA
  • Consumer客户端pull,随机读,利用sendfile系统调用,zero-copy ,批量拉数据
  • 消费状态保存在客户端
  • 消息存储顺序写
  • 数据迁移、扩容对用户透明
  • 支持Hadoop并行数据加载。
  • 支持online和offline的场景。
  • 持久化:经过将数据持久化到硬盘以及replication防止数据丢失。
  • scale out:无需停机便可扩展机器。
  • 按期删除机制,支持设定partitions的segment file保留时间。
可靠性(一致性)

kafka(MQ)要实现从producer到consumer之间的可靠的消息传送和分发。传统的MQ系统一般都是经过broker和consumer间的确认(ack)机制实现的,并在broker保存消息分发的状态。

即便这样一致性也是很难保证的(参考原文)。kafka的作法是由consumer本身保存状态,也不要任何确认。这样虽然consumer负担更重,但其实更灵活了。

由于无论consumer上任何缘由致使须要从新处理消息,均可以再次从broker得到。

kafak系统扩展性

kafka使用zookeeper来实现动态的集群扩展,不须要更改客户端(producer和consumer)的配置。broker会在zookeeper注册并保持相关的元数据(topic,partition信息等)更新。

而客户端会在zookeeper上注册相关的watcher。一旦zookeeper发生变化,客户端能及时感知并做出相应调整。这样就保证了添加或去除broker时,各broker间仍能自动实现负载均衡。

kafka设计目标

高吞吐量是其核心设计之一。

  • 数据磁盘持久化:消息不在内存中cache,直接写入到磁盘,充分利用磁盘的顺序读写性能。
  • zero-copy:减小IO操做步骤。
  • 支持数据批量发送和拉取。
  • 支持数据压缩。
  • Topic划分为多个partition,提升并行处理能力。
Producer负载均衡和HA机制
  • producer根据用户指定的算法,将消息发送到指定的partition。
  • 存在多个partiiton,每一个partition有本身的replica,每一个replica分布在不一样的Broker节点上。
  • 多个partition须要选取出lead partition,lead partition负责读写,并由zookeeper负责fail over。
  • 经过zookeeper管理broker与consumer的动态加入与离开。
Consumer的pull机制

因为kafka broker会持久化数据,broker没有cache压力,所以,consumer比较适合采起pull的方式消费数据,具体特别以下:

  • 简化kafka设计,下降了难度。
  • Consumer根据消费能力自主控制消息拉取速度。
  • consumer根据自身状况自主选择消费模式,例如批量,重复消费,从制定partition或位置(offset)开始消费等.
Consumer与topic关系以及机制

本质上kafka只支持Topic.每一个consumer属于一个consumer group;反过来讲,每一个group中能够有多个consumer.对于Topic中的一条特定的消息,
只会被订阅此Topic的每一个group中的一个consumer消费,此消息不会发送给一个group的多个consumer;那么一个group中全部的consumer将会交错的消费整个Topic.
若是全部的consumer都具备相同的group,这种状况和JMS(Java Message Service) queue模式很像;消息将会在consumers之间负载均衡.
若是全部的consumer都具备不一样的group,那这就是"发布-订阅";消息将会广播给全部的消费者.

在kafka中,一个partition中的消息只会被group中的一个consumer消费(同一时刻);每一个group中consumer消息消费互相独立;咱们能够认为一个group是一个"订阅"者,

一个Topic中的每一个partions,只会被一个"订阅者"中的一个consumer消费,不过一个consumer能够同时消费多个partitions中的消息.

kafka只能保证一个partition中的消息被某个consumer消费时是顺序的.事实上,从Topic角度来讲,当有多个partitions时,消息仍不是全局有序的.

一般状况下,一个group中会包含多个consumer,这样不只能够提升topic中消息的并发消费能力,并且还能提升"故障容错"性,若是group中的某个consumer失效,

那么其消费的partitions将会有其余consumer自动接管.kafka的设计原理决定,对于一个topic,同一个group中不能有多于partitions个数的consumer同时消费,

不然将意味着某些consumer将没法获得消息.

Producer均衡算法

kafka集群中的任何一个broker,均可以向producer提供metadata信息,这些metadata中包含"集群中存活的servers列表"/"partitions leader列表"
等信息(请参看zookeeper中的节点信息).当producer获取到metadata信心以后, producer将会和Topic下全部partition leader保持socket链接;
消息由producer直接经过socket发送到broker,中间不会通过任何"路由层".事实上,消息被路由到哪一个partition上,有producer客户端决定.
好比能够采用"random""key-hash""轮询"等,若是一个topic中有多个partitions,那么在producer端实现"消息均衡分发"是必要的.
在producer端的配置文件中,开发者能够指定partition路由的方式.

Consumer均衡算法

当一个group中,有consumer加入或者离开时,会触发partitions均衡.均衡的最终目的,是提高topic的并发消费能力.
1) 假如topic1,具备以下partitions: P0,P1,P2,P3
2) 加入group中,有以下consumer: C0,C1
3) 首先根据partition索引号对partitions排序: P0,P1,P2,P3
4) 根据consumer.id排序: C0,C1
5) 计算倍数: M = [P0,P1,P2,P3].size / [C0,C1].size,本例值M=2(向上取整)
6) 而后依次分配partitions: C0 = [P0,P1],C1=[P2,P3],即Ci = [P(i * M),P((i + 1) * M -1)]

kafka broker集群内broker之间replication机制

kafka中,replication策略是基于partition,而不是topic;kafka将每一个partition数据复制到多个server上,任何一个partition有一个leader和多个follower(能够没有);

备份的个数能够经过broker配置文件来设定.leader处理全部的read-write请求,follower须要和leader保持同步.Follower就像一个"consumer",

消费消息并保存在本地日志中;leader负责跟踪全部的follower状态,若是follower"落后"太多或者失效,leader将会把它从replicas同步列表中删除.

当全部的follower都将一条消息保存成功,此消息才被认为是"committed",那么此时consumer才能消费它,这种同步策略,就要求follower和leader之间必须具备良好的网络环境.

即便只有一个replicas实例存活,仍然能够保证消息的正常发送和接收,只要zookeeper集群存活便可.(备注:不一样于其余分布式存储,好比hbase须要"多数派"存活才行)

kafka断定一个follower存活与否的条件有2个:

1) follower须要和zookeeper保持良好的连接   

2) 它必须可以及时的跟进leader,不能落后太多.

若是同时知足上述2个条件,那么leader就认为此follower是"活跃的".若是一个follower失效(server失效)或者落后太多,

leader将会把它从同步列表中移除’备注:若是此replicas落后太多,它将会继续从leader中fetch数据,直到足够up-to-date,

而后再次加入到同步列表中;kafka不会更换replicas宿主,由于"同步列表"中replicas须要足够快,这样才能保证producer发布消息时接受到ACK的延迟较小。

当leader失效时,需在followers中选取出新的leader,可能此时follower落后于leader,所以须要选择一个"up-to-date"的follower.kafka中leader选举并无采用"投票多数派"的算法,

由于这种算法对于"网络稳定性"/"投票参与者数量"等条件有较高的要求,并且kafka集群的设计,还须要容忍N-1个replicas失效.对于kafka而言,

每一个partition中全部的replicas信息均可以在zookeeper中得到,那么选举leader将是一件很是简单的事情.选择follower时须要兼顾一个问题,

就是新leader server上所已经承载的partition leader的个数,若是一个server上有过多的partition leader,意味着此server将承受着更多的IO压力.

在选举新leader,须要考虑到"负载均衡",partition leader较少的broker将会更有可能成为新的leader.

在整几个集群中,只要有一个replicas存活,那么此partition均可以继续接受读写操做.

总结:

    1) Producer端直接链接broker.list列表,从列表中返回TopicMetadataResponse,该Metadata包含Topic下每一个partition leader创建socket链接并发送消息.

    2) Broker端使用zookeeper用来注册broker信息,以及监控partition leader存活性.

    3) Consumer端使用zookeeper用来注册consumer信息,其中包括consumer消费的partition列表等,同时也用来发现broker列表,并和partition leader创建socket链接,并获取消息.

相关文章
相关标签/搜索