架构设计:系统间通讯(29)——Kafka及场景应用(中2)

接上文:《架构设计:系统间通讯(28)——Kafka及场景应用(中1)css

4-三、复制功能

咱们在上文中已经讨论了Kafka使用分区的概念存储消息,一个topic能够有多个分区它们分布在整个Kafka集群的多个Broker服务节点中,而且一条消息只会按照消息生产者的要求进入topic的某一个分区。那么问题来了:若是某个分区中的消息在被消费端Pull以前,承载该分区的Broker服务节点就由于各类异常缘由崩溃了,那么在这个Broker从新启动前,消费者就没法收到消息了。html

为了解决这个问题,Apache Kafka在V 0.8+版本中加入了复制功能:让topic下的每个分区存储到多个Broker服务节点上,并由Zookeeper统一管理它们的状态。java

这里写图片描述

请注意Kafka中Partition(分区)和replication(复制)是两个彻底不一样的概念,不少读者容易将这两个概念混淆——虽然它们都和“如何存储消息”这件事情有关:前者是说将若干条消息按照必定的规则分别存放在不一样的区域,一条消息只存入一个区域(且Topic下多个分区能够存在于同一个Broker上);后者是说,为了保证消息在被消费前不会丢失,须要将某一个区域中的消息集合复制出多个副本(同一个分区的多个副本不能存放在同一个Broker上)。web

Kafka将分区的多个副本分为两种角色:Leader和Follower,Leader Broker是主要服务节点,消息只会从消息生产者发送给Leader Broker,消息消费者也只会从Leader Broker中Pull消息。Follower Broker为副本服务节点,正常状况下不会公布给生产者或者消费者直接进行操做。Follower Broker服务节点将会主动从Leader Broker上Pull消息。apache

在这种工做机制下,Follower和Leader的消息复制过程因为Follower服务节点的性能、压力、网络等缘由,它们和Leader服务节点会有一个消息差别性。当这个差别性扩大到必定的范围,Leader节点就会认为这个Follower节点再也跟不上本身的节奏,致使的结果就是Leader节点会将这个Follower节点移出“待同步副本集”ISR(in-sync replicas),再也不关注这个Follower节点的同步问题。后端

只有当ISR中全部分区副本所有完成了某一条消息的同步过程,这条消息才算真正完成了“记录”操做。只有这样的消息才会发送给消息消费者。至于这个真正完成“记录”操做的通知是否能返回给消息生产者,彻底取决于消息生产者采用的acks模式(后文会讲到)。api

如今咱们能够回过头看看上文中4-1-3-5小节给出的“查看Topic状态”命令以及命令结果:缓存

# 脚本命令范例
kafka-topics.sh --describe --zookeeper 192.168.61.139:2181 --topic my_topic2

# 显示的结果
Topic:my_topic2 PartitionCount:4        ReplicationFactor:2     Configs:
        Topic: my_topic2        Partition: 0    Leader: 2       Replicas: 2,1   Isr: 2,1
        Topic: my_topic2        Partition: 1    Leader: 1       Replicas: 1,2   Isr: 1,2
        Topic: my_topic2        Partition: 2    Leader: 2       Replicas: 2,1   Isr: 2,1
        Topic: my_topic2        Partition: 3    Leader: 1       Replicas: 1,2   Isr: 1,2

以上命令行用于显示指定topic名称的基本状态信息。Partition表示分区号,Replicas表示全部副本的所在位置的Broker.id信息,Isr表示当前状态正常能够进行消息复制的副本所在位置的Broker.id信息。服务器

那么从命令结果来看,名叫“my_topic2”的topic一共有4个数据分区,每个分区有两个副本。其中:0号分区的Leader Broker服务节点的id为2,0号分区的两个副本分别在id为2和id为1的Broker服务节点上,且id为2和id为1的Broker上的副本状态都是正常的;同理,1号分区的Leader Broker服务节点的id为1,1号分区的两个副本分别在id为2和id为1的Broker服务节点上,且id为2和id为1的Broker上的副本状态都是正常的。。。网络

4-四、Kafka原理:生产者

请注意以前咱们给出的Kafka集群方案的示意图,在图中消息生产者并无链接到zookeeper协调服务,而是直接和多个Kafka Server Brokers创建了链接。和其余种类的消息队列的设计不一样,在整个Kafka方案中消息生产者(Producer)会有不少重要规则的决定权,例如:

  • 消费生产者(Producer)能够决定向指定的Topic的哪个分区(Partition)发送消息。而不是由Broker来决定。

  • 消息生产者(Producer)能够决定消息达到Kafka Broker后,Producer对消息的一致性关注到什么样的级别,又或者根本不关心消息在Broker上的一致性问题。

  • 消息生产者(Producer)能够决定是以同步方式(sync)仍是异步方式(aSync)向Broker Server List发送消息。

  • 在异步方式下,消费生产者(Producer)还能够决定以什么样的间隔(周期)向Broker Server List发送消息。

  • 随机选定Broker Server List中某一个服务节点,读取当前Topic下的分区和复制表信息,并保存在本地Pool中的工做也是由消息生产者(Producer)主动完成。

  • 另外,Kafka中的消息生产者没有相似ActiveMQ中那样的事务机制(可参见文章《架构设计:系统间通讯(23)——提升ActiveMQ工做性能(中)》)。这样的设计和Kafka主要的业务场景有关——用来收集各类操做日志。这样的场景对消息的可靠性要求并不高:漏掉一两条日志并不影响后端大数据平台对日志数据的分析结果;并且这样的设计大量简化了Broker的设计结构:它不须要像ActiveMQ那样专门为达成传输但还未进行commit的消息专门建立存储区域“transaction store”,并在进行了commit或者rollback操做后进行标记。这种处理机制是Apache Kafka高效性能的又一种保障。

  • Kafka中的多个消息生产者(Producer)并不须要ZooKeeper服务中的任何信息为它们协调发送过程,由于没有什么可协调的。生产者惟一须要知道的Topic有多少个分区以及每一个分区,分别存在于哪些Broker上的信息都是来源于对某一个Broker的直接查询。因此Kafka集群中只剩下了Broker和Consumer须要进行协调(这个问题会在后文中进行详细讨论)。

  • 这是分布式系统建设思想中一个重要的原则——不可滥用协调装置:完成同一件工做时,协调N个参与角色要比协调N-1个参与角色耗费更多的时间和性能;因此,只协调须要协调的角色,只通知须要通知的事件,只为协调过程存储必要的数据。我在后续的写做中,会专门为读者详细介绍Kafka中消息生产者的实现过程,这里面有不少设计思想能够在各位的实际工做中借鉴。

4-4-一、基本使用

下面的代码使用Kafka的Java Client API演示消息生产者的使用。这里咱们使用的Kafka Java Client API的版本是V0.8.2.2,您能够直接引入Maven的官方库依赖便可:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.10</artifactId>
    <version>0.8.2.2</version>
</dependency>

如下是Kafka消息生产者的代码,以前咱们已经经过Kafka的命令脚本建立了一个拥有4个分区的Topic——my_topic2:

package kafkaTQ;

import java.util.Date;
import java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

/** * kafka消息生产者演示, * @author yinwenjie */
public class KafkaProducer {

    public static void main(String[] args) throws RuntimeException {
        Properties props = new Properties();
        // 指定kafka节点列表,不须要由zookeeper进行协调
        // 而且链接的目的也不是为了发送消息,而是为了在这些节点列表中选取一个,来获取topic的分区情况
        props.put("metadata.broker.list", "192.168.61.138:9092");
        // 使用这个属性能够指定“将消息送到topic的哪个partition中”,若是业务规则比较复杂的话能够指定分区控制器
        // 不过开发者最好要清楚topic有多少个分区,这样才好进行多线程(负载均衡)发送
        //props.put("partitioner.class", "kafkaTQ.PartitionerController");
        // 能够经过这个参数控制是异步发送仍是同步发送(默认为“同步”)
        //props.put("producer.type", "async");
        // 能够经过这个属性控制复制过程的一致性规则
        //props.put("request.required.acks", "1");
        ProducerConfig config = new ProducerConfig(props);

        // 建立消费者
        Producer<byte[], byte[]> producer = new Producer<byte[], byte[]>(config);

        // 因为咱们为topic建立了四个partition,因此将数据分别发往这四个分区
        for (Integer partitionIndex = 0; ; partitionIndex++) {  
            Date time = new Date();
            // 建立和发送消息,能够指定这条消息的key,producer根据这个key来决定这条消息发送到哪一个parition中
            // 另一个能够决定parition的方式是实现kafka.producer.Partitioner接口
            String messageContext_Value = "this message from producer 由producer指的partitionIndex:[" + partitionIndex % 4 + "]" + time.getTime();
            System.out.println(messageContext_Value);
            byte[] messageContext = messageContext_Value.getBytes();
            byte[] key = partitionIndex.toString().getBytes();

            // 这是消息对象,请注意第二个参数和第三个参数,下一小节将会进行详细介绍
            KeyedMessage<byte[], byte[]> message = new KeyedMessage<byte[], byte[]>("my_topic2", key , partitionIndex % 4 ,  messageContext);
            producer.send(message);

            // 休息0.5秒钟,循环发
            synchronized (KafkaProducer.class) { 
                try {
                    KafkaProducer.class.wait(500);
                } catch (InterruptedException e) {
                    e.printStackTrace(System.out);
                }
            }
        } 
    }
}

4-4-二、生产者指定分区

开发人员能够在消息生产者端指定发送的消息将要传送到Topic下的哪个分区(partition),但前提条件是开发人员清楚这个Topic有多少个分区,不然开发人员就不知道怎么编写代码了。固然开发人员也能够彻底忽略决定分区的规则,这时将由消费者端携带的一个默认规则决定。

开发人员能够有两种方式进行分区指定:第一种方法是以上代码片断中演示的那样,在建立消息对象KeyedMessage时,指定方法中partKey/key的值;另外一种方式是从新实现kafka.producer.Partitioner接口,以便覆盖掉默认实现。

使用KeyedMessage类构造消息对象时,能够指定4个参数,他们分别是:topic名称、消息Key、分区Key和message消息内容。topic名称和message消息内容很容易理解,可是怎样理解消息Key和分区Key呢?如下是KeyedMessage类的源代码(Scala语言):

package kafka.producer

/** * A topic, key, and value. * If a partition key is provided it will override the key for the purpose of partitioning but will not be stored. */
case class KeyedMessage[K, V](val topic: String, val key: K, val partKey: Any, val message: V) {
  if(topic == null)
    throw new IllegalArgumentException("Topic cannot be null.")

  def this(topic: String, message: V) = this(topic, null.asInstanceOf[K], null, message)

  def this(topic: String, key: K, message: V) = this(topic, key, key, message)

  def partitionKey = {
    if(partKey != null)
      partKey
    else if(hasKey)
      key
    else
      null  
  }

  def hasKey = key != null
}

KeyedMessage类的构造函数中有一个局部变量:partitionKey,在KeyedMessage类的首行注释中,对该变量进行了一个说明:

If a partition key is provided it will override the key for the purpose of partitioning but will not be stored.

从源码中能够看出,partitionKey优先使用partKey做为分区依据,若是partKey没有被赋值,则使用key做为分区依据。因此在使用KeyedMessage类的构造函数时,partKey和key您只须要指定其中的一个就彻底够了。

您还能够实现kafka.producer.Partitioner接口,并在建立消费者对象时进行指定,以便实现分区的指定(若是不进行指定,默认的实现类为“kafka.producer.DefaultPartitioner”)。代码片断以下:

package kafkaTQ;

import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;

public class PartitionerController implements Partitioner {

    /** * 必需要有这个构造函数 * @param vp */
    public PartitionerController(VerifiableProperties vp) {

    }

    /* (non-Javadoc) * @see kafka.producer.Partitioner#partition(java.lang.Object, int) */
    @Override
    public int partition(Object parKey, int partition) {
        /* * 在这里您能够根据自身的业务过程从新运算一个分区,并进行返回。 */
        Integer parKeyValue = (Integer)parKey;
        return parKeyValue;
    }
}

须要实现的partition方法中,第一个参数是您在建立消息时所传递的partyKey(这是的partyKey不必定传入Integer),第二个参数是send方法根据自身内部机制决定的目标分区。

4-4-三、同步和异步发送

消息生产这还能够决定是以同步方式向Broker发送消息仍是以异步方式向Broker发送消息。只须要使用生产者配置中的“producer.type”属性进行指定。当该属性值为“sync”时,表示使用同步发送的方式;当该属性值为“async”时,表示使用异步发送方式。

在异步发送方式下,开发人员调用send方法发送消息时,这个消息并不会当即被发送到topic指定的Leader partition所在的Broker,而是会存储在本地的一个缓冲区域(必定注意是客户端本地)。当缓冲区的状态知足最长等待时间或者最大数据量条数时,消息会以一个设置值批量发送给Broker。以下图所示:

这里写图片描述

缓存区的数据按照batch.num.messages设置的数值被一批一批的发送给目标Broker(默认为200条),若是消息的滞留时间超过了queue.buffering.max.ms设置的值(单位毫秒,默认值为5000)就算没有达到batch.num.messages的数值,消息也会被发送。

若是因为Broker的缘由致使消息发送缓慢,这时在本地待发送消息缓存区中的消息就有可能达到
queue.buffering.max.messages设置的缓存区容许存储的最大消息数量,一旦达到这个数量消息生产者端再次调用send方法的时候,send方法所在线程就会被阻塞,直到缓存区有足够的空间可以放下新的数据为止。

4-4-四、强一致性复制和弱一致性复制

Kafka中的消息生产者还能够配置发送的消息在Broker端以哪一种方式进行副本复制:强一致性复制仍是弱一致性复制,又或者不关注消息的一致性。(在分布式系统中强一致性、弱一致性和最终一致性是一个很是关键的知识点,它们是CAP原则重要的实践,我将会在“存储”专题中进行对它们的定义和主流的实现方式进行讲解)

在Kafka的实现中,强一致性复制是指当Leader Partition收到消息后,将在全部Follower partition完成这条消息的复制后才认为消息处理成功,并向消息生产者返回ack信息;弱一致性复制是指当Leader partition收到消息后,只要Leader Broker本身完成了消息的存储就认为消息处理成立,并向消息生产者返回ack信息(复制过程随后由Broker节点自行完成);

您能够经过消息生产者配置中的“request.required.acks”属性来设置消息的复制性要求。在官方文档中,对于这个属性的解释是:

acks=0 If set to zero then the producer will not wait for any acknowledgment from the server at all. The record will be immediately added to the socket buffer and considered sent. No guarantee can be made that the server has received the record in this case, and the retries configuration will not take effect . The offset given back for each record will always be set to -1.

当acks设置为0时,生产者端不会等待Server Broker回执任何的ACK确认信息。只是将要发送的消息交给网络层。这种状况下,消息是否真的到达了Server Broker,实际上生产者端并不知道。因为生产者端并不等待Server Broker回执任何的ACK确认信息,那么消息一旦传输失败(例如,等待超时的状况)“重试”过程就无从谈起了。因为生产者端在这种状况下发送的消息,极可能Server Broker还没来得及处理,甚至更有可能Server Broker都没有接收到,因此Server Broker也没法告知生产者这条消息在分区中的偏移位置。

acks=1 This will mean the leader will write the record to its local log but will respond without awaiting full acknowledgement from all followers. In this case should the leader fail immediately after acknowledging the record but before the followers have replicated it then the record will be lost.

当acks设置为1时,生产者发送消息将等待这个分区的Leader Server Broker 完成它本地的消息记录操做,但不会等待这个分区下其它Follower Server Brokers的操做。在这种状况下,虽然Leader Server Broker对消息的处理成功了,也返回了ACK信息给生产者端,可是在进行副本复制时,仍是可能失败。

acks=all (#注:原文如此,实际上属性值为“-1”)This means the leader will wait for the full set of in-sync replicas to acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica remains alive. This is the strongest available guarantee.

当acks设置为“all”时,消息生产者发送消息时将会等待目标分区的Leader Server Broker以及全部的Follower Server Brokers所有处理完,才会获得ACK确认信息。这样的处理逻辑下牺牲了一部分性能,可是消息存储可靠性是最高的。

4-4-五、生产者须要查询zookeeper?

在2013年2月2日,Kafka的主要参与者Neha Narkhede发表了一篇讲解Kafka Replication过程的技术文档(算是官方文档)《Kafka Replication》,在这篇文档的Synchronous replication-write章节Neha Narkhede这样描述了“写”过程前的准备工做:

To publish a message to a partition, the client first finds the leader of the partition from Zookeeper and sends the message to the leader。

这句话的大意是:为了发送消息到一个分区,客户端首先要经过zookeeper查询到这个分区的Leader Broker在哪一个位置,而且向这个Leader Broker发送信息。国内一些译文由此也造成了相关的中文描述。这显然与本文中提到的“生产者不须要链接zookeeper进行任何协调操做”的描述彻底矛盾

4-4-5-一、进行实验

这里冲突的重点在于“生产者在发送消息时,是直接链接到了zookeeper服务查询相关信息,仍是链接到某一个已知的Broker查询现信息?”

那么咱们只能以实验的形式实际验证一下消息生产者在建立、发送消息的过程当中是否须要链接zookeeper。实际上笔者经过阅读0.8.2.2版本的JAVA Client For Producer API 部分的的源码,真没有发现Producer直接链接zookeeper的证据(主要的类位置包括:kafka.producer.OldProducer、kafka.producer.ProducerPool、kafka.producer.SyncProducer、org.apache.kafka.clients.producer.internals.Sender和org.apache.kafka.clients.producer.KafkaProducer)。可是这显然不具备太大的说服力,毕竟极可能出现漏读代码的状况。

验证明验基于以前咱们已经搭建的Apacke Kafka集群环境,192.168.61.140服务器上运行着一个standalone模式的zookeeper服务。在实验中,咱们使用192.168.61.140服务器上自带的防火墙,设置只有两个Kafka Broker服务节点(139和138)可以访问zookeeper上的2181端口。并在这种状况下观察消息生产者的工做状况(以及相同topic下的消费者是能正常收到生产者发送的消息)。以下图所示:

这里写图片描述

  • 设置192.168.61.140上的防火墙,只容许192.168.61.139和192.168.61.138访问其2181端口:
[root@zk ~]# service iptables status
Table: filter
Chain INPUT (policy ACCEPT)
num  target     prot opt source               destination         
1    ACCEPT     tcp  -- 192.168.61.138 0.0.0.0/0 tcp dpt:2181 
2    ACCEPT     tcp  -- 192.168.61.139 0.0.0.0/0 tcp dpt:2181 
3    ACCEPT     icmp -- 0.0.0.0/0 0.0.0.0/0 
4    REJECT     all  -- 0.0.0.0/0 0.0.0.0/0 reject-with icmp-host-prohibited 

Chain FORWARD (policy ACCEPT)
num  target     prot opt source               destination         
1    REJECT     all  -- 0.0.0.0/0 0.0.0.0/0 reject-with icmp-host-prohibited 

Chain OUTPUT (policy ACCEPT)
num  target     prot opt source               destination

在全端口开放ICMP协议,只是为了可以使用ping命令进行检查。

  • 接下来启动140上的zookeeper服务,而且验证一下在140分别开启和关闭防火墙的状况下,producer所在的服务节点是否可以链接到zookeeper
# 在140上启动zookeeper,而且肯定它以standalone模式运行
[root@zk ~]# zkServer.sh start
JMX enabled by default
Using config: /usr/zookeeper-3.4.6/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED

[root@kp2 ~]# zkServer.sh status
JMX enabled by default
Using config: /usr/zookeeper-3.4.6/bin/../conf/zoo.cfg
Mode: standalone

首先测试在140节点开启防火墙的状况下,producer所在的192.168.61.1服务节点是否能顺利链接到2181端口(使用telnet命令):

telnet 192.168.61.140 2181

Trying 192.168.61.140...
telnet: connect to address 192.168.61.140: Connection refused

而后关闭140上的防火墙,再使用一样的telnet命令进行测试:

telnet 192.168.61.140 2181

Trying 192.168.61.140...
Connected to 192.168.61.140.
Escape character is '^]'.

能够看到140启动防火墙后,192.168.61.1服务节点不能链接到140服务的2181端口。这说明咱们设置的实验前提的确起到了限制192.168.61.1节点访问140节点上zookeeper服务的做用。

接下来咱们从新开启140上防火墙,启动140上的zookeeper服务,启动139和138上的Kafka Broker服务,让整个Kafka Broker集群工做起来。正式开始进行实验:

// 生产者的测试代码就采用4-4-1小节中咱们给出的代码样例。
// 很显然在140开启防火墙,producer没法链接zookeeper的状况下
// producer也可以正常工做。如下是producer程序打印的运行信息

this message from producer 由producer指的partitionIndex:[0]1462439421320
this message from producer 由producer指的partitionIndex:[1]1462439429482
this message from producer 由producer指的partitionIndex:[2]1462439437655
this message from producer 由producer指的partitionIndex:[3]1462439441987
  • 本编文章已经介绍过,消息可以发送出去不必定表明Kafka Broker集群工做正常。在消息复制、回执ACK等环节仍是可能引发错误。只有消息消费者收到了消息,才能认为整个消息接受、处理、发送过程都是成功的

因此为了确认这些发送出去的消息可以被消费者接收到,在进行producer测试工做的同时,咱们在138节点上,使用kafka-console-consumer运行了一个对应的消费者以便接收数据(不能在138节点或者139节点之外运行consumer,由于没法链接zookeeper服务节点的2181端口)。如下是消费者接收到的信息:

[root@activemq ~]# kafka-console-consumer.sh --zookeeper 192.168.61.140:2181 --topic my_topic2
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
this message from producer 由producer指的partitionIndex:[0]1462439421320
this message from producer 由producer指的partitionIndex:[1]1462439429482
this message from producer 由producer指的partitionIndex:[2]1462439437655
this message from producer 由producer指的partitionIndex:[3]1462439441987

4-4-5-二、结果分析

从以上小节的实验状况,咱们看到的结果是:Producer所在的服务节点192.168.61.1,在不能访问192.168.61.140节点上zookeeper服务的状况下,包括Producer在内的整个Kafak集群可以正常工做,消费者端可以正常消费数据。那么问题来了,做为Kafka的主要参与者Neha Narkhede在这样的官方文档中是不太可能出现这样的低级错误的,那么是什么缘由呢?固然若是真要说到出错,那么笔者本身出错的可能性却是要高得多。笔者认为形成这种冲突的缘由可能有如下几种:

  • 这篇文章是在2013年2月份发布的,那时候主流的Kafka版本是V0.7.X。可是笔者在实际工做中并无使用任何V0.7.X版本,因此对V0.7.X版本中是否须要生产者链接zookeeper并无肯定的答案。

  • 在Neha Narkhede的这段话中,Client并非指代的消息生产者,而是泛指的使用zookeeper服务的各类客户端角色。若是是这样的话,那么Client最有可能指代的就是Server Broker。

  • 以上实验中,并无限制住producer直接访问zookeeper的全部状况。防火墙功能可能出现了问题,又或者producer自行经过一个隐藏的端口(例如:9999)访问到了zookeeper。

  • 笔者尚未考虑到的其它可能性。欢迎各位读者留言讨论。

======================================== (接下文)

相关文章
相关标签/搜索