kafka介绍和使用

1.消息系统简介

1.1为何要用消息系统 ?

解耦 各位系统之间经过消息系统这个统一的接口交换数据,无须了解彼此的存在;
冗余 部分消息系统具备消息持久化能力,可规避消息处理前丢失的风险;html

灵活性和消除峰值 在访问量剧增的状况下,应用仍然须要继续发挥做用,使用消息队列可以使关键组件顶住突发的访问压力,而不会由于突发的超负荷的请求而彻底崩溃;(节省资源)
可恢复性 系统中部分组件失效并不会影响整个系统,它恢复后仍然可从消息系统中获取并处理数据;数据库

顺序保障 在大多使用场景下,数据处理的顺序都很重要。大部分消息队列原本就是排序的,而且能保证数据会按照特定的顺序来处理。Kafka保证一个Partition内的消息的有序性;
异步通讯 在不须要当即处理请求的场景下,能够将请求放入消息系统,合适的时候再处理。apache

 

1.2.有哪些消息系统 ?

RabbitMQ Erlang编写,支持多协议 AMQP,XMPP,SMTP,STOMP。支持负载均衡、数据持久化。同时支持Peer-to-Peer和发布/订阅模式;
Redis 基于Key-Value对的NoSQL数据库,同时支持MQ功能,可作轻量级队列服务使用。就入队操做而言, Redis对短消息(小于10KB)的性能比RabbitMQ好,长消息的性能比RabbitMQ差;
ZeroMQ 轻量级,不须要单独的消息服务器或中间件,应用程序自己扮演该角色,Peer-to-Peer。它实质上是 一个库,须要开发人员本身组合多种技术,使用复杂度高;
ActiveMQ JMS实现,Peer-to-Peer,支持持久化、XA事务;api

MetaQ/RocketMQ 纯Java实现,发布/订阅消息系统,支持本地事务和XA分布式事务;
Kafka 高性能跨语言的分布式发布/订阅消息系统,数据持久化,全分布式,同时支持实时在线处理和离线数据处理。Apache Kafka相对于ActiveMQ是一个很是轻量级的消息系统,除了性能很是好以外,仍是一个工做良好的分布式系统。缓存

1.3.Kafka设计目标是什么?

高吞吐率 在廉价的商用机器上单机可支持每秒100万条消息的读写;
消息持久化 全部消息均被持久化到磁盘,无消息丢失,支持消息重放;
彻底分布式 Producer,Broker,Consumer均支持水平扩展,同时适应在线流处理和离线批处理。安全

2.kafka简介和架构

2.1.核心概念

       Broker:Kafka集群包含一个或多个服务器,这种服务器被称为broker;服务器

       Message:消息是Kafka中最基本的数据单元,主要有key和value构成;真正有效的是消息是value数据,key只是做为消息路由分区使用;网络

Topic:每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不一样Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic便可生产或消费数 据而没必要关心数据存于何处),强调的是kafka不保证topic消息有序;多线程

Partition:Parition是物理上的概念,每一个Topic包含一个或多个Partition;kafka只保证一个partiton是有序的;经过配置来设置partition中的文件大小和文件保留策略;
Producer:负责发布消息到Kafka broker;
Consumer:消息消费者,向Kafka broker读取消息的客户端;架构

Consumer Group:官方称为逻辑上的订阅者,每一个Consumer属于一个特定的Consumer Group(可为每一个Consumer指定group name,若不指定group name则属于默认的group),消息的单播和多播都是基于消费组来实现的,消费组中的消费者不是越多越好,消费者数量超过度区数量时,回致使消费者分配不到资源,形成资源浪费;
Offset:每一个partition都由一系列有序的、不可变的消息组成,这些消息被连续的追加到partition中。partition中的每一个消息都有一个连续的序列号叫作offset,用于partition惟一标识一条消息。

topic的配置可参考:
http://kafka.apache.org/documentation.html#topic-config

2.2.kafka架构

       

         如上图所示,一个典型的Kafka集群中包含若干Producer,若干broker(broker数量越多,集群吞吐率越高),若干Consumer Group,以及一个Zookeeper集群。Kafka经过Zookeeper管理集群配置,选举leader,在消费组发生变化时进行rebalance(新版本不依赖)。Producer使用push模式将消息发布到broker,Consumer使用pull模式从broker订阅并消费消息。

3.kafka的客户端设计

3.1.生产者设计

3.1.1.producer的使用

在Kafka老版本中,同步和异步都是分开成不一样的方法来实现的,最新的都是由KafkaProducer来实现,经过掉future的get阻塞线程来实现同步。实际上二者底层实现相同,都是经过一步实现的。

3.1.2.producer发送消息的过程(0.10.2.1)

        主要是两个线程的操做:

        主线程封装消息成ProducerRecord对象,并调用append方法将消息追加RecordAccumulator中暂时存储;
        Sender线程负责将消息构形成请求,并从RecordAccumulator取出消息消息并批量发送。

1 ProducerIntercptor对消息进行拦截;
2 Serialzer对key和value进行序列化;
3 Partitioner对消息选择合适的分区;
4 RecordAccumulator收集消息,实现批量发送;
5 Sender从RecordAccumulator获取消息;
6 构造ClientRequest;
7 将ClientRequest交给Network,准备发送;
8 Network将请求放入KafkaChannel的缓存;
9 发送请求;
10 收到响应,调用ClientRequest;
11 调用RecordBatch的回调函数,最终调用到每个消息上注册的回调函数。

3.1.3.Product方法详解

  主线程的send方法:

一、首先调用waitOnMetadata()方法确保该主题topic对应的元数据metadata是可用的;
二、计算剩余等待时间remainingWaitMs;
三、根据record中topic、key,利用valueSerializer获得序列化key:serializedKey;
四、根据record中topic、value,利用valueSerializer获得序列化value:serializedValue;
五、调用partition()方法得到分区号partition;
六、计算序列化后的key、value及其offset、size所占大小serializedSize;
七、调用ensureValidRecordSize()方法确保记录大小serializedSize是有效的;
八、根据record中的topic和partition构造TopicPartition实例tp;
九、调用accumulator的append()方法添加记录,得到记录添加结果RecordAppendResult类型的result;
十、根据结果result的batchIsFull或newBatchCreated肯定是否执行sender的wakeup();
十一、返回result中的future。

3.1.4.Replication设计

当某个Topic的replication-factor为N且N大于1时,每一个Partition都会有N个副本(Replication);
Replica的个数小于等于Broker数,即对每一个Partition而言每一个Broker上只会有一个Replica,所以 可用Broker ID表示Replication;
全部Partition的全部Replication默认状况会均匀分布到全部Broker上。

要解决的问题:

1:如何Propagate消息?

Producer在发布消息到某个Partition时,先经过Zookeeper找到该Partition的Leader,而后不管该Topic的Replication Factor为多少(也即该Partition有多少个Replica),Producer只将该消息发送到该Partition的Leader。Leader会将该消息写入其本地Log。每一个Follower都从Leader pull数据。这种方式上,Follower存储的数据顺序与Leader保持一致。Follower在收到该消息并写入其Log后,向Leader发送ACK。

2:什么时候Commit?

        一条消息只有被ISR里的全部Follower都从Leader复制过去才会被认为已提交。这样就避免了部分数据被写进了Leader,还没来得及被任何Follower复制就宕机了,而形成数据丢失(Consumer没法消费这些数据)。而对于Producer而言,它能够选择是否等待消息commit,这能够经过request.required.acks来设置。这种机制确保了只要ISR有一个或以上的Follower,一条被commit的消息就不会丢失。 

3:如何处理Replica恢复?

Kafka producer的ack有3中机制,初始化producer时的producerconfig能够经过配置request.required.acks不一样的值来实现。
0:这意味着生产者producer不等待来自broker同步完成的确认继续发送下一条(批)消息。此选项提供最低的延迟但最弱的耐久性保证(当服务器发生故障时某些数据会丢失,如leader已死,但producer并不知情,发出去的信息broker就收不到)。1:这意味着producer在leader已成功收到的数据并获得确认后发送下一条message。此选项提供了更好的耐久性为客户等待服务器确认请求成功(被写入死亡leader但还没有复制将失去了惟一的消息)。-1:这意味着producer在follower副本确认接收到数据后才算一次发送完成。 此选项提供最好的耐久性,咱们保证没有信息将丢失,只要至少一个同步副本保持存活。

       三种机制,性能依次递减 (producer吞吐量下降),数据健壮性则依次递增。

4:如何处理Replica所有宕机

机器恢复,lead选举。(目前都是动态配置);

1.等待ISR中的任一个Replica“活”过来,而且选它做为Leader 2.选择第一个“活”过来的Replica(不必定是ISR中的)做为Leader。

3.2.Consumer设计

3.2.1.建立一个消费者

消费者都是线程不安全的,若是发现多线程调用,直接抛异常。

consumer 采用 pull 模式从 broker 中读取数据。

push 模式很难适应消费速率不一样的消费者,由于消息发送速率是由 broker 决定的。它的目标是尽量以最快速度传递消息,可是这样很容易形成 consumer 来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而 pull 模式则能够根据 consumer 的消费能力以适当的速率消费消息。

对于 Kafka 而言,pull模式更合适,它可简化 broker 的设计,consumer 可自主控制消费消息的速率,同时 consumer 能够本身控制消费方式——便可批量消费也可逐条消费,同时还能选择不一样的提交方式从而实现不一样的传输语义。

consumer group

        CG是kafka提供的可扩展且具备容错性的消费者机制。组内能够有多个消费者或消费者实例(consumer instance),它们共享一个公共的ID,即group ID。组内的全部消费者协调在一块儿来消费订阅主题(subscribed topics)的全部分区(partition)。kafka 的分配单位是 patition。每一个 consumer 都属于一个 group,一个 partition 只能被同一个 group 内的一个 consumer 所消费(也就保障了一个消息只能被 group 内的一个 consuemr 所消费),可是多个 group 能够同时消费这个 partition。consumer group下订阅的topic下的每一个分区只能分配给某个group下的一个consumer(固然该分区还能够被分配给其余group)

3.2.2.消费获取

        传递保证语义有三个级别:At most once: 最多一次,消息可能会丢失,但不会重复传递,At least once: 至少一次,消息毫不会丢,可是可能会重复传递,Exactly once: 每一条消息只会被传递一次。

        Kafka服务器端并不会记录消费者的消费位置,而是由消费者本身决定如何保存其消费的offset. 0.8.2版本以前消费者会将其消费位置记录zookeeper中,在后面的新版本中,消费者为了缓解zookeeper集群的压力,在Kafka服务器端添加了一个名字是__consusmer_offsets的内部topic,简称为offset topic,他能够用来保存消费者提交的offset,当出现消费者上线或者下线时会触发消费者组的rebalance操做,对partitions从新进行分配,等待rebalance完成以后,消费者就能够读取offset topic中的记录的offset,并今后offset开始继续消费。你也能够根据业务需求将offset存储在别的存储介质中,好比数据库等

3.2.3.rebalance

触发rebalance的时机

# 有新的消费者加入;
# 有消费者宕机或者下线;
# 消费者主动退出消费者组;
# 消费者组订阅的topic出现分区数量变化;
# 消费者调用unsubscrible取消对某topic的订阅。

1. 将目标 topic 下的全部 partirtion 排序,存于PT;
2. 对某 consumer group 下全部 consumer 排序,存于 CG,第 i 个consumer 记为 Ci;
3. N=size(PT)/size(CG),向上取整;
4. 解除 Ci 对原来分配的 partition 的消费权(i从0开始);
5. 将第i*N到(i+1)*N-1个 partition 分配给 Ci。

4.kafka高性能之道

4.1.高效使用磁盘 

顺序写磁盘,顺序写磁盘性能高于随机写内存;

追加写:数据不更新,不作数据级别的删除,文件级别的删除;

支持多目录(多磁盘)。

4.2.零拷贝

这里的零拷贝值得是cpu级别的拷贝,使用nio的调用操做系统的sendfile实现零拷贝,同时减小两次上下文切换和1次系统调用。

传统意义上的拷贝:

NIO拷贝:

4.3.批处理和压缩

        kafka的生产者和消费者均支持批量处理数据,指定缓存的消息达到某个量的时候就发出去,或者缓存了固定的时间后就发送出去,如100条消息就发送,或者每5秒发送一次

这种策略将大大减小服务端的I/O次数;
       生产者支持将数据压缩后发送给broker,从而减小网络传输代价,目前支持:GZIP或Snappy格式。

4.4.Partition

经过partition实现了并行处理和水平扩展,partition也是kafka并行处理的最小单位;

partitiom能够处在不一样的机器上,充分利用多机资源;

同一节点上的partitiom能够位于多个目录下,若是节点下有多个磁盘,能够充分利用多磁盘优点。

4.5.ISR 

ISR实现了可用性和一致性的动态平衡;

ISR容忍了更多节点的失败;

可配置化replica crash处理策略。

相关文章
相关标签/搜索