Kafka 详解(三)------Producer生产者

Kafka 详解(三)------Producer生产者html

  在第一篇博客咱们了解到一个kafka系统,一般是生产者Producer 将消息发送到 Broker,而后消费者 Consumer 去 Broker 获取,那么本篇博客咱们来介绍什么是生产者Producer。java

一、生产者概览

  咱们知道一个系统在运行过程当中会有不少消息产生,好比前面说的对于一个购物网站,一般会记录用户的活动,网站的运行度量指标以及一些日志消息等等,那么产生这些消息的组件咱们均可以称为生产者。算法

  而对于生产者产生的消息重要程度又有不一样,是否都很重要不容许丢失,是否容许丢失一部分?以及是否有严格的延迟和吞吐量要求?apache

  对于这些场景在 Kafka 中会有不一样的配置,以及不一样的 API 使用。bootstrap

二、生产者发送消息步骤

  下图是生产者向 Kafka 发送消息的主要步骤:数组

  

  ①、首先要构造一个 ProducerRecord 对象,该对象能够声明主题Topic、分区Partition、键 Key以及值 Value,主题和值是必需要声明的,分区和键能够不用指定。安全

  ②、调用send() 方法进行消息发送。服务器

  ③、由于消息要到网络上进行传输,因此必须进行序列化,序列化器的做用就是把消息的 key 和 value对象序列化成字节数组。网络

  ④、接下来数据传到分区器,若是之间的 ProducerRecord 对象指定了分区,那么分区器将再也不作任何事,直接把指定的分区返回;若是没有,那么分区器会根据 Key 来选择一个分区,选择好分区以后,生产者就知道该往哪一个主题和分区发送记录了。app

  ⑤、接着这条记录会被添加到一个记录批次里面,这个批次里全部的消息会被发送到相同的主题和分区。会有一个独立的线程来把这些记录批次发送到相应的 Broker 上。

  ③、Broker成功接收到消息,表示发送成功,返回消息的元数据(包括主题和分区信息以及记录在分区里的偏移量)。发送失败,能够选择重试或者直接抛出异常。

三、Java Producer API 

  首先在POM 文件中导入 kafka client。

org.apache.kafkakafka-clients2.0.0

  实例代码:

 1 package com.ys.utils; 2  3 import org.apache.kafka.clients.producer.*; 4 import java.util.Properties; 5  6 /** 7  * Create by YSOcean 8  */ 9 public class KafkaProducerUtils {10 11     public static void main(String[] args) {12         Properties kafkaProperties = new Properties();13         //配置broker地址信息14         kafkaProperties.put("bootstrap.servers", "192.168.146.200:9092,192.168.146.201:9092,192.168.146.202:9092");15         //配置 key 的序列化器16         kafkaProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");17         //配置 value 的序列化器18         kafkaProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");19         20         //经过上面的配置文件生成 Producer 对象21         Producer producer = new KafkaProducer(kafkaProperties);22         //生成 ProducerRecord 对象,并制定 Topic,key 以及 value23         ProducerRecordrecord =24                 new ProducerRecord("testTopic","key1","hello Producer");25         //发送消息26         producer.send(record);27     }28 }

  经过运行上述代码,咱们向名为 testTopic 的主题中发送了一条键为 key1,值为 hello Producer 的消息。

  

四、属性配置

  在上面的实例中,咱们配置了以下三个属性:

  ①、bootstrap.servers:该属性指定 brokers 的地址清单,格式为 host:port。清单里不须要包含全部的 broker 地址,生产者会从给定的 broker 里查找到其它 broker 的信息。——建议至少提供两个 broker 的信息,由于一旦其中一个宕机,生产者仍然可以链接到集群上。

  ②、key.serializer:将 key 转换为字节数组的配置,必须设定为一个实现了 org.apache.kafka.common.serialization.Serializer 接口的类,生产者会用这个类把键对象序列化为字节数组。——kafka 默认提供了 StringSerializer和 IntegerSerializer、ByteArraySerializer。固然也能够自定义序列化器。

  ③、value.serializer:和 key.serializer 同样,用于 value 的序列化。

  以上三个属性是必需要配置的,下面还有一些别的属性能够不用配置,默认。

  ④、acks:此配置指定了必需要有多少个分区副本收到消息,生产者才会认为消息写入是成功的,这个参数保障了消息发送的可靠性。默认值为 1。

    1、acks=0。生产者不会等待服务器的反馈,该消息会被马上添加到 socket buffer 中并认为已经发送完成。也就是说,若是发送过程当中发生了问题,致使服务器没有接收到消息,那么生产者也没法知道。在这种状况下,服务器是否收到请求是无法保证的,而且参数retries也不会生效(由于客户端没法得到失败信息)。每一个记录返回的 offset 老是被设置为-1。好处就是因为生产者不须要等待服务器的响应,因此它能够以网络可以支持的最大速度发送消息,从而达到很高的吞吐量。

    2、acks=1。只要集群首领收到消息,生产者就会收到一个来自服务器的成功响应。若是消息没法到达首领节点(好比首领节点崩溃,新首领尚未被选举出来),生产者会收到一个错误的响应,为了不丢失消息,生产者会重发消息(根据配置的retires参数肯定重发次数)。不过若是一个没有收到消息的节点成为首领,消息仍是会丢失,这个时候的吞吐量取决于使用的是同步发送仍是异步发送。

    3、acks=all。只有当集群中参与复制的全部节点所有收到消息时,生产者才会收到一个来自服务器的成功响应。这种模式是最安全的,可是延迟最高。

  ⑤、buffer.memory:该参数用来设置生产者内存缓冲区的大小,生产者用它缓冲要发送到服务器的消息。默认值为33554432 字节。若是应用程序发送消息的速度超过发送到服务器的速度,那么会致使生产者内存不足。这个时候,send() 方法会被阻塞,若是阻塞的时间超过了max.block.ms (在kafka0.9版本以前为block.on.buffer.full 参数)配置的时长,则会抛出一个异常。

  ⑥、compression.type:该参数用于配置生产者生成数据时能够压缩的类型,默认值为 none(不压缩)。还能够指定snappy、gzip或lz4等类型,snappy 压缩算法占用较少的 CPU,gzip 压缩算法占用较多的 CPU,可是压缩比最高,若是网络带宽比较有限,可使用该算法,使用压缩能够下降网络传输开销和存储开销,这每每是 kafka 发送消息的瓶颈所在。

  ⑦、retires:该参数用于配置当生产者发送消息到服务器失败,服务器返回错误响应时,生产者能够重发消息的次数,若是达到了这个次数,生产者会放弃重试并返回错误。默认状况下,生产者会在每次重试之间等待100ms,能够经过 retry.backoff.on 参数来改变这个时间间隔。

   还有一些属性配置,能够参考官网:http://kafka.apachecn.org/documentation.html#producerconfigs

五、序列化器

  前面咱们介绍过,消息要到网络上进行传输,必须进行序列化,而序列化器的做用就是如此。

  ①、默认序列化器

  Kafka 提供了默认的字符串序列化器(org.apache.kafka.common.serialization.StringSerializer),还有整型(IntegerSerializer)和字节数组(BytesSerializer)序列化器,这些序列化器都实现了接口(org.apache.kafka.common.serialization.Serializer)基本上可以知足大部分场景的需求。

  下面是Kafka 实现的字符串序列化器 StringSerializer:

//// Source code recreated from a .class file by IntelliJ IDEA// (powered by Fernflower decompiler)//package org.apache.kafka.common.serialization;import java.io.UnsupportedEncodingException;import java.util.Map;import org.apache.kafka.common.errors.SerializationException;public class StringSerializer implements Serializer {    private String encoding = "UTF8";    public StringSerializer() {
    }    public void configure(Mapconfigs, boolean isKey) {
        String propertyName = isKey ? "key.serializer.encoding" : "value.serializer.encoding";
        Object encodingValue = configs.get(propertyName);        if (encodingValue == null) {
            encodingValue = configs.get("serializer.encoding");
        }        if (encodingValue instanceof String) {            this.encoding = (String)encodingValue;
        }

    }    public byte[] serialize(String topic, String data) {        try {            return data == null ? null : data.getBytes(this.encoding);
        } catch (UnsupportedEncodingException var4) {            throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + this.encoding);
        }
    }    public void close() {
    }
}

View Code

   其中接口 serialization:

 1 // 2 // Source code recreated from a .class file by IntelliJ IDEA 3 // (powered by Fernflower decompiler) 4 // 5  6 package org.apache.kafka.common.serialization; 7  8 import java.io.Closeable; 9 import java.util.Map;10 11 public interface Serializerextends Closeable {12     void configure(Mapvar1, boolean var2);13 14     byte[] serialize(String var1, T var2);15 16     void close();17 }

View Code

  ②、自定义序列化器

  若是Kafka提供的几个默认序列化器不能知足要求,即发送到 Kafka 的消息不是简单的字符串或整型,那么咱们能够自定义序列化器。

  好比对于以下的实体类 Person:

 1 package com.ys.utils; 2  3 /** 4  * Create by YSOcean 5  */ 6 public class Person { 7     private String name; 8     private int age; 9 10     public String getName() {11         return name;12     }13 14     public void setName(String name) {15         this.name = name;16     }17 18     public int getAge() {19         return age;20     }21 22     public void setAge(int age) {23         this.age = age;24     }25 }

View Code

  咱们自定义一个 PersonSerializer:

 1 package com.ys.utils; 2  3 import org.apache.kafka.common.serialization.Serializer; 4  5 import java.io.UnsupportedEncodingException; 6 import java.nio.ByteBuffer; 7 import java.util.Map; 8  9 /**10  * Create by YSOcean11  */12 public class PersonSerializer implements Serializer {13 14     @Override15     public void configure(Map map, boolean b) {16         //不作任何配置17     }18 19     @Override20     /**21      * Person 对象被序列化成:22      *  表示 age 的4 字节整数23      *  表示 name 长度的 4 字节整数(若是为空,则长度为0)24      *  表示 name 的 N 个字节25      */26     public byte[] serialize(String topic, Person data) {27         if(data == null){28             return null;29         }30         byte[] name;31         int stringSize;32         try {33             if(data.getName() != null){34                 name = data.getName().getBytes("UTF-8");35                 stringSize = name.length;36             }else{37                 name = new byte[0];38                 stringSize = 0;39             }40             ByteBuffer buffer = ByteBuffer.allocate(4+4+stringSize);41             buffer.putInt(data.getAge());42             buffer.putInt(stringSize);43             buffer.put(name);44             return buffer.array();45         } catch (UnsupportedEncodingException e) {46             e.printStackTrace();47         }48         return new byte[0];49     }50 51     @Override52     public void close() {53         //不须要关闭任何东西54     }55 }

View Code

  上面例子序列化将Person类的 age 属性序列化为 4 个字节,后期若是该类发生更改,变为长整型 8 个字节,那么可能会存在新旧消息兼容性问题。

  所以一般不建议自定义序列化器,可使用下面介绍的已有的序列化框架。

  ③、序列化框架

  上面咱们知道自定义序列化器可能会存在新旧消息兼容性问题,须要咱们手动去维护,那么为了省去此麻烦,咱们可使用一些已有的序列化框架。好比 JSON、Avro、Thrift 或者 Protobuf。

六、发送消息 send()

  ①、普通发送——发送就忘记

        //一、经过上面的配置文件生成 Producer 对象
        Producer producer = new KafkaProducer(kafkaProperties);        //二、生成 ProducerRecord 对象,并制定 Topic,key 以及 value        //建立名为testTopic的队列,键为testkey,值为testValue的ProducerRecord对象
        ProducerRecordrecord =                new ProducerRecord<>("testTopic","testkey","testValue");        //三、发送消息
        producer.send(record);

   经过配置文件构造一个生产者对象 producer,而后指定主题名称,键值对,构造一个 ProducerRecord 对象,最后使用生产者Producer 的 send() 方法发送 ProducerRecord 对象,send() 方法会返回一个包含 RecordMetadata 的 Future 对象,不过一般咱们会忽略返回值。

  和上面的名字同样——发送就忘记,生产者只管发送,并无论发送的结果是成功或失败。一般若是咱们不关心发送结果,那么就可使用此种方式。

  ②、同步发送

//一、经过上面的配置文件生成 Producer 对象Producer producer = new KafkaProducer(kafkaProperties);//二、生成 ProducerRecord 对象,并制定 Topic,key 以及 value//建立名为testTopic的队列,键为testkey,值为testValue的ProducerRecord对象ProducerRecordrecord =        new ProducerRecord<>("testTopic","testkey","testValue");//三、同步发送消息try {    //经过send()发送完消息后返回一个Future对象,而后调用Future对象的get()方法等待kafka响应    //若是kafka正常响应,返回一个RecordMetadata对象,该对象存储消息的偏移量    //若是kafka发生错误,没法正常响应,就会抛出异常,咱们即可以进行异常处理    producer.send(record).get();
} catch (Exception e) {    //四、异常处理    e.printStackTrace();
}

   和上面普通发送消息同样,只不过这里咱们调用了 Future 对象的 get() 方法来等待 kafka 服务器的响应,程序运行到这里会产生阻塞,直到获取kafka集群的响应。而这个响应有两种状况:

  一、正常响应:返回一个 RecordMetadata 对象,经过该对象咱们可以获取消息的偏移量、分区等信息。

  二、异常响应:基本上来讲会发生两种异常,

    一类是可重试异常,该错误能够经过重发消息来解决。好比链接错误,能够经过再次链接后继续发送上一条未发送的消息;再好比集群没有首领(no leader),由于咱们知道集群首领宕机以后,会有一个时间来进行首领的选举,若是这时候发送消息,确定是没法发送的。

    二类是没法重试异常,好比消息太大异常,对于这类异常,KafkaProducer 不会进行任何重试,直接抛出异常。

  同步发送消息适合须要保证每条消息的发送结果,优势是可以精确的知道什么消息发送成功,什么消息发送失败,而对于失败的消息咱们也能够采起措施进行从新发送。缺点则是增长了每条消息发送的时间,当发送消息频率很高时,此种方式便不适合了。

  ③、异步发送

  有同步发送,基本上就会有异步发送了。同步发送每发送一条消息都得等待kafka服务器的响应,以后才能发送下一条消息,那么咱们不是在错误产生时立刻处理,而是记录异常日志,而后立刻发送下一条消息,而这个异常再经过回调函数去处理,这就是异步发送。

  一、首先咱们要实现一个继承 org.apache.kafka.clients.producer.Callback 接口,而后实现其惟一的 onCompletion 方法。

package com.ys.utils;import org.apache.kafka.clients.producer.Callback;import org.apache.kafka.clients.producer.RecordMetadata;/**
 * Create by YSOcean */public class KafkaCallback implements Callback{
    @Override    public void onCompletion(RecordMetadata recordMetadata, Exception e) {        if(e != null){            //异常处理            e.printStackTrace();
        }
    }
}

View Code

  二、发送消息时,传入这个回调类。

//异步发送消息producer.send(record,new KafkaCallback());
相关文章
相关标签/搜索