详细解析kafka之kafka分区和副本

本篇主要介绍kafka的分区和副本,由于这二者是有些关联的,因此就放在一块儿来说了,后面顺便会给出一些对应的配置以及具体的实现代码,以供参考~java

1.kafka分区机制

分区机制是kafka实现高吞吐的秘密武器,但这个武器用得很差的话也容易出问题,今天主要就来介绍分区的机制以及相关的部分配置。linux

首先,从数据组织形式来讲,kafka有三层形式,kafka有多个主题,每一个主题有多个分区,每一个分区又有多条消息。数据库

而每一个分区能够分布到不一样的机器上,这样一来,从服务端来讲,分区能够实现高伸缩性,以及负载均衡,动态调节的能力。apache

固然多分区就意味着每条消息都难以按照顺序存储,那么是否是意味着这样的业务场景kafka就无能为力呢?不是的,最简单的作法可使用单个分区,单个分区,全部消息天然都顺序写入到一个分区中,就跟顺序队列同样了。而复杂些的,还有其余办法,那就是使用按消息键,将须要顺序保存的消息存储的单独的分区,其余消息存储其余分区,这个在下面会介绍bootstrap

咱们能够经过replication-factor指定建立topic时候所建立的分区数。负载均衡

bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic testdom

好比这里就是建立了1个分区,的主题。值得注意的是,还有一种建立主题的方法,是使用zookeeper参数的,那种是比较旧的建立方法,这里是使用bootstrap参数的。socket

1.1 分区个数选择

既然分区效果这么好,是否是越多分区越好呢?显而易见并不是如此。ide

分区越多,所须要消耗的资源就越多。甚至若是足够大的时候,还会触发到操做系统的一些参数限制。好比linux中的文件描述符限制,通常在建立线程,建立socket,打开文件的场景下,linux默认的文件描述符参数,只有1024,超过则会报错。性能

看到这里有读者就会不耐烦了,说这么多有啥用,能不能直接告诉我分区分多少个比较好?很遗憾,暂时没有。

由于每一个业务场景都不一样,只能结合具体业务来看。假如每秒钟须要从主题写入和读取1GB数据,而消费者1秒钟最多处理50MB的数据,那么这个时候就能够设置20-25个分区,固然还要结合具体的物理资源状况。

而如何没法估算出大概的处理速度和时间,那么就用基准测试来测试吧。建立不一样分区的topic,逐步压测测出最终的结果。若是实在是懒得测,那比较无脑的肯定分区数的方式就是broker机器数量的2~3倍。

1.2 分区写入策略

所谓分区写入策略,便是生产者将数据写入到kafka主题后,kafka如何将数据分配到不一样分区中的策略。

常见的有三种策略,轮询策略,随机策略,和按键保存策略。其中轮询策略是默认的分区策略,而随机策略则是较老版本的分区策略,不过因为其分配的均衡性不如轮询策略,故然后来改为了轮询策略为默认策略。

轮询策略

所谓轮询策略,即按顺序轮流将每条数据分配到每一个分区中。

举个例子,假设主题test有三个分区,分别是分区A,分区B和分区C。那么主题对接收到的第一条消息写入A分区,第二条消息写入B分区,第三条消息写入C分区,第四条消息则又写入A分区,依此类推。

轮询策略是默认的策略,故而也是使用最频繁的策略,它能最大限度保证全部消息都平均分配到每个分区。除非有特殊的业务需求,不然使用这种方式便可。

随机策略

随机策略,也就是每次都随机地将消息分配到每一个分区。其实大概就是先得出分区的数量,而后每次获取一个随机数,用该随机数肯定消息发送到哪一个分区。

在比较早的版本,默认的分区策略就是随机策略,但其实使用随机策略也是为了更好得将消息均衡写入每一个分区。但后来发现对这一需求而言,轮询策略的表现更优,因此社区后来的默认策略就是轮询策略了。

按键保存策略

按键保存策略,就是当生产者发送数据的时候,能够指定一个key,计算这个key的hashCode值,按照hashCode的值对不一样消息进行存储。

至于要如何实现,那也简单,只要让生产者发送的时候指定key就行。欸刚刚不是说默认的是轮询策略吗?其实啊,kafka默认是实现了两个策略,没指定key的时候就是轮询策略,有的话那激素按键保存策略了。

上面有说到一个场景,那就是要顺序发送消息到kafka。前面提到的方案是让全部数据存储到一个分区中,但其实更好的作法,就是使用这种按键保存策略。

让须要顺序存储的数据都指定相同的键,而不须要顺序存储的数据指定不一样的键,这样一来,即实现了顺序存储的需求,又可以享受到kafka多分区的优点,岂不美哉。

1.3 实现自定义分区

说了这么多,那么到底要如何自定义分区呢?

kafka提供了两种让咱们本身选择分区的方法,第一种是在发送producer的时候,在ProducerRecord中直接指定,但须要知道具体发送的分区index,因此并不推荐。

第二种则是须要实现Partitioner.class类,并重写类中的partition(String topic, Object key, byte[] keyBytes,Object value, byte[] valueBytes, Cluster cluster) 方法。后面在生成kafka producer客户端的时候直接指定新的分区类就能够了。

package kafkaconf;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;


public class MyParatitioner implements Partitioner {
    @Override
    public void configure(Map<String, ?> configs) {
    }

    @Override
    public int partition(String topic, Object key, byte[] keyBytes,
                         Object value, byte[] valueBytes, Cluster cluster) {
        //key不能空,若是key为空的会经过轮询的方式 选择分区
        if(keyBytes == null || (!(key instanceof String))){
            throw new RuntimeException("key is null");
        }
        //获取分区列表
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);

        //如下是上述各类策略的实现,不能共存
        //随机策略
        return ThreadLocalRandom.current().nextInt(partitions.size());

        //按消息键保存策略
        return Math.abs(key.hashCode()) % partitions.size();

        //自定义分区策略, 好比key为123的消息,选择放入最后一个分区
        if(key.toString().equals("123")){
            return partitions.size()-1;
        }else{
            //不然随机
            ThreadLocalRandom.current().nextInt(partitions.size());
        }
    }

    @Override
    public void close() {
    }
}

而后须要在生成kafka producer客户端的时候指定该类就行:

val properties = new Properties()
    ......
    props.put("partitioner.class", "kafkaconf.MyParatitioner");  //主要这个配置指定分区类
    ......其余配置
    val producer = new KafkaProducer[String, String](properties)

2.kafka副本机制

说完了分区,再来讲说副本。先说说副本的基本内容,在kafka中,每一个主题能够有多个分区,每一个分区又能够有多个副本。这多个副本中,只有一个是leader,而其余的都是follower副本。仅有leader副本能够对外提供服务。

多个follower副本一般存放在和leader副本不一样的broker中。经过这样的机制实现了高可用,当某台机器挂掉后,其余follower副本也能迅速”转正“,开始对外提供服务。

这里经过问题来整理这部份内容。

kafka的副本都有哪些做用?

在kafka中,实现副本的目的就是冗余备份,且仅仅是冗余备份,全部的读写请求都是由leader副本进行处理的。follower副本仅有一个功能,那就是从leader副本拉取消息,尽可能让本身跟leader副本的内容一致。

说说follower副本为何不对外提供服务?

这个问题本质上是对性能和一致性的取舍。试想一下,若是follower副本也对外提供服务那会怎么样呢?首先,性能是确定会有所提高的。但同时,会出现一系列问题。相似数据库事务中的幻读,脏读。

好比你如今写入一条数据到kafka主题a,消费者b从主题a消费数据,却发现消费不到,由于消费者b去读取的那个分区副本中,最新消息还没写入。而这个时候,另外一个消费者c却能够消费到最新那条数据,由于它消费了leader副本。

看吧,为了提升那么些性能而致使出现数据不一致问题,那显然是不值得的。

leader副本挂掉后,如何选举新副本?

若是你对zookeeper选举机制有所了解,就知道zookeeper每次leader节点挂掉时,都会经过内置id,来选举处理了最新事务的那个follower节点。

从结果上来讲,kafka分区副本的选举也是相似的,都是选择最新的那个follower副本,但它是经过一个In-sync(ISR)副本集合实现。

kafka会将与leader副本保持同步的副本放到ISR副本集合中。固然,leader副本是一直存在于ISR副本集合中的,在某些特殊状况下,ISR副本中甚至只有leader一个副本。

当leader挂掉时,kakfa经过zookeeper感知到这一状况,在ISR副本中选取新的副本成为leader,对外提供服务。

但这样还有一个问题,前面提到过,有可能ISR副本集合中,只有leader,当leader副本挂掉后,ISR集合就为空,这时候怎么办呢?这时候若是设置unclean.leader.election.enable参数为true,那么kafka会在非同步,也就是不在ISR副本集合中的副本中,选取出副本成为leader,但这样意味这消息会丢失,这又是可用性和一致性的一个取舍了。

ISR副本集合保存的副本的条件是什么?

上面一直说ISR副本集合中的副本就是和leader副本是同步的,那这个同步的标准又是什么呢?

答案其实跟一个参数有关:replica.lag.time.max.ms。

前面说到follower副本的任务,就是从leader副本拉取消息,若是持续拉取速度慢于leader副本写入速度,慢于时间超过replica.lag.time.max.ms后,它就变成“非同步”副本,就会被踢出ISR副本集合中。但后面如何follower副本的速度慢慢提上来,那就又可能会从新加入ISR副本集合中了。

producer的acks参数

前面说了那么多理论的知识,那么就能够来看看如何在实际应用中使用这些知识。

跟副本关系最大的,那天然就是acks机制,acks决定了生产者如何在性能与数据可靠之间作取舍。

配置acks的代码其实很简单,只须要在新建producer的时候多加一个配置:

val properties = new Properties()
    ......
    props.put("acks", "0/1/-1");  //配置acks,有三个可选值
    ......其余配置
    val producer = new KafkaProducer[String, String](properties)

acks这个配置能够指定三个值,分别是0,1和-1。咱们分别来讲三者表明什么:

  • acks为0:这意味着producer发送数据后,不会等待broker确认,直接发送下一条数据,性能最快
  • acks为1:为1意味着producer发送数据后,须要等待leader副本确认接收后,才会发送下一条数据,性能中等
  • acks为-1:这个表明的是all,意味着发送的消息写入全部的ISR集合中的副本(注意不是所有副本)后,才会发送下一条数据,性能最慢,但可靠性最强

还有一点值得一提,kafka有一个配置参数,min.insync.replicas,默认是1(也就是只有leader,实际生产应该调高),该属性规定了最小的ISR数。这意味着当acks为-1(即all)的时候,这个参数规定了必须写入的ISR集中的副本数,若是没达到,那么producer会产生异常。

以上~

相关文章
相关标签/搜索