3、Kafka工做流程分析

三 Kafka 工做流程分析
3.1 Kafka 生产过程(Producer)分析
                            
3.1.1 写入方式
  producer 采用推(push)模式将消息发布到 broker,每条消息都被追加(append)到分区(patition)中,属于顺序写磁盘(顺序写磁盘效率比随机写内存要高,保障 kafka 吞吐率)。
3.1.2 分区(Partition)
  消息发送时都被发送到一个 topic,其本质就是一个目录,而 topic 是由一些 Partition
  Logs(分区日志)组成,其组织结构以下图所示:
    
      咱们能够看到,每一个 Partition 中的消息都是 有序的,生产的消息被不断追加到 Partition
        log 上,其中的每个消息都被赋予了一个惟一的 offset 值
   1)分区的缘由
      (1)方便在集群中扩展,每一个 Partition 能够经过调整以适应它所在的机器,而一个 topic又能够有多个 Partition 组成,所以整个集群就能够适应任意大小的数据了;
      (2)能够提升并发,由于能够以 Partition 为单位读写了。
   2)分区的原则(数据写入到哪一个分区)
      (1)指定了 patition,则直接使用;
      (2)未指定 patition 但指定 key,经过对 key 的 value 进行 hash 出一个 patition
      (3)patition 和 key 都未指定,使用轮询选出一个 patition。———————————————————————————
DefaultPartitioner 类 public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {       List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);       int numPartitions = partitions.size();       if (keyBytes == null) {           int nextValue = nextValue(topic);           List<PartitionInfo> availablePartitions =cluster.availablePartitionsForTopic(topic);           if (availablePartitions.size() > 0) {                 int part = Utils.toPositive(nextValue) % availablePartitions.size();                 return availablePartitions.get(part).partition();           } else {           // no partitions are available, give a non-available partition
                return Utils.toPositive(nextValue) % numPartitions;           }       } else {           // hash the keyBytes to choose a partition
        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;       } }
3.1.3 副本(Replication)
    同 一 个 partition 可 能 会 有 多 个 replication ( 对 应 server.properties 配 置 中 的default.replication.factor=N)。
    没有 replication 的状况下,一旦 broker 宕机,其上全部 patition的数据都不可被消费,
    同时 producer 也不能再将数据存于其上的 patition。引入 replication以后,同一个 partition 可能会有多个 replication,
    而这时须要在这些 replication 之间选出一个 leader,producer 和 consumer 只与这个 leader 交互,其它 replication 做为 follower 从 leader中复制数据。
3.1.4 写入流程
    producer 写入消息流程以下:
    
      1)producer 先从 zookeeper 的 "/brokers/.../state"节点找到该 partition 的 leader
      2)producer 将消息发送给该 leader
      3)leader 将消息写入本地 log
      4)followers 从 leader pull 消息,写入本地 log 后向 leader 发送 ACK
      5)leader 收到全部 ISR 中的 replication 的 ACK 后,增长 HW(high watermark,最后 commit的 offset)并向 producer 发送 ACK
3.2 broker 保存消息
   3.2.1 存储方式
      物理上把 topic 分红一个或多个 patition(对应 server.properties 中的 num.partitions=3 配置),每一个 patition 物理上对应一个文件夹(该文件夹存储该 patition 的全部消息和索引文
      件),以下:
             [hadoop@node01 logs]$ ll
              drwxrwxr-x. 2 hadoop hadoop 4096 8 月 6 14:37 first-0
              drwxrwxr-x. 2 hadoop hadoop 4096 8 月 6 14:35 first-1
              drwxrwxr-x. 2 hadoop hadoop 4096 8 月 6 14:37 first-2
             [hadoop@node01 logs]$ cd first-0
             [hadoop@node01 first-0]$ ll
              -rw-rw-r--. 1 hadoop hadoop 10485760 8 月 6 14:33 00000000000000000000.index
              -rw-rw-r--. 1 hadoop hadoop 219 8 月 6 15:07 00000000000000000000.log
              -rw-rw-r--. 1 hadoop hadoop 10485756 8 月 6 14:33 00000000000000000000.timeindex
              -rw-rw-r--. 1 hadoop hadoop 8 8 月 6 14:37 leader-epoch-checkpoint
3.2.2 存储策略
      不管消息是否被消费,kafka 都会保留全部消息。有两种策略能够删除旧数据:
          1)基于时间:log.retention.hours=168
          2)基于大小:log.retention.bytes=1073741824
        须要注意的是,由于 Kafka 读取特定消息的时间复杂度为 O(1),即与文件大小无关,因此这里删除过时文件与提升 Kafka 性能无关。
3.2.3 Zookeeper 存储结构
      
 
       注意:producer 不在 zk 中注册,消费者(0.8 以前)在 zk 中注册。
3.3 Kafka 消费过程分析
      kafka 提供了两套 consumer API:高级 Consumer API 和低级 API。
    3.3.1 高级 API
      1)高级 API 优势
          高级 API 写起来简单
           不须要去自行去管理 offset,系统经过 zookeeper 自行管理
          不须要管理分区,副本等状况,系统自动管理
          消费者断线会自动根据上一次记录在 zookeeper 中的 offset 去接着获取数据(默认设置1 分钟更新一下 zookeeper 中存的的 offset)
          可使用 group 来区分对同一个 topic 的不一样程序访问分离开来(不一样的 group 记录不一样的 offset,这样不一样程序读取同一个 topic 才不会由于 offset 互相影响)
      2)高级 API 缺点
            不能自行控制 offset(对于某些特殊需求来讲)
           不能细化控制如分区、副本、zk 等
3.3.2 低级 API
      1)低级 API 优势
           可以开发者本身控制 offset,想从哪里读取就从哪里读取。
          自行控制链接分区,对分区自定义进行负载均衡
          对 zookeeper 的依赖性下降(如:offset 不必定非要靠 zk 存储,自行存储 offset 便可,好比存在文件或者内存中)
      2)低级 API 缺点
           太过复杂,须要自行控制 offset,链接哪一个分区,找到分区 leader 等。
3.3.3 消费者组
      消费者是以 consumer group 消费者组的方式工做,由一个或者多个消费者组成一个组,共同消费一个 topic。每一个分区在同一时间只能由 group 中的一个消费者读取,可是多个 group
    能够同时消费这个 partition。在图中,有一个由三个消费者组成的 group,有一个消费者读取主题中的两个分区,
    另外两个分别读取一个分区。某个消费者读取某个分区,也能够叫作某个消费者是某个分区的拥有者。
      在这种状况下,消费者能够经过水平扩展的方式同时读取大量的消息。另外,若是一个消费者失败了,那么其余的 group 成员会自动负载均衡读取以前失败的消费者读取的分区。
3.3.4 消费方式
       consumer 采用 pull(拉)模式从 broker 中读取数据。
      push(推)模式很难适应消费速率不一样的消费者,由于消息发送速率是由 broker 决定的。它的目标是尽量以最快速度传递消息,
   可是这样很容易形成 consumer 来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而 pull 模式则能够根据 consumer 的消费能力以适当的速率消费消息。
      对于 Kafka 而言,pull 模式更合适,它可简化 broker 的设计,consumer 可自主控制消费消息的速率,同时 consumer 能够本身控制消费方式——便可批量消费也可逐条消费,同时
   还能选择不一样的提交方式从而实现不一样的传输语义。
3.3.5 消费者组案例
    1)需求:一个分区只能被一个消费者消费。
    2)方式一:指定配置文件
        (1)在 node0二、node03 上修改/bd/kafka/config/consumer.properties 配置文件中的 group.id 属性为任意组名。 
              [hadoop@node02 config]$ vi consumer.properties
             group.id=hadoop
        (2)在 node0二、node03 上分别启动消费者
             [hadoop@node02 kafka]$ bin/kafka-console-consumer.sh --zookeeper node02:2181
            --topic first --consumer.config config/consumer.properties
            [hadoop@node02 kafka]$ bin/kafka-console-consumer.sh --zookeeper node02:2181
            --topic first --consumer.config config/consumer.properties
        (3)在 node01 上启动生产者
             [hadoop@node01 kafka]$ bin/kafka-console-producer.sh --broker-list node01:9092
            --topic first
            >hello world
        (4)查看 node02 和 node03 的接收者。
            同一时刻只有一个消费者接收到消息。
    3) 方式二: 参数指定
         (1) 建立 topic
            ./kafka-topics.sh --create --topic mytopic --partitions 2 --replication-factor 1 --zookeeper   node01:2181
         (2) 建立生产者
            node01 上执行
            ./kafka-console-producer.sh --topic mytopic --broker-list node01:9092
         (3) 建立两个消费者而且划分到同一组
            node02 上执行:
            ./kafka-console-consumer.sh --topic mytopic --bootstrap-server node01:9092 --group aaa
            node03 上执行:
            ./kafka-console-consumer.sh --topic mytopic --bootstrap-server node01:9092 --group aaa
相关文章
相关标签/搜索