[Kafka][3][Kafka生产者——向Kafka写入数据]

第3章 Kafka生产者——向Kafka写入数据

对应代码仓库地址:https://github.com/T0UGH/getting-started-kafkajava

参考资料:git

kafka权威指南https://book.douban.com/subject/27665114/github

大数据通用的序列化器——Apache Avrohttps://www.jianshu.com/p/0a85bfbb9f5f算法

Kafka 中使用 Avro 序列化框架(一)https://www.jianshu.com/p/4f724c7c497dshell

Kafka 中使用 Avro 序列化组件(三):Confluent Schema Registryhttps://www.jianshu.com/p/cd6f413d35b0apache

本章对应代码仓库可查看https://github.com/T0UGH/getting-started-kafka/tree/main/src/main/java/cn/edu/neu/demo/ch3编程

在这一章,咱们将从Kafka生产者设计组件讲起,学习如何使用Kafka 生产者。json

  1. 咱们将演示如何建立KafkaProducer和ProducerRecords对象、
  2. 如何将记录发送给Kafka
  3. 如何处理从Kafka返回的错误
  4. 介绍用于控制生产者行为的重要配置选项
  5. 深刻探讨如何使用不一样的分区方法和序列化器,以及如何自定义序列化器和分区器。

3.1 生产者概览

先展现向Kafka发送消息的主要步骤bootstrap

  1. 首先建立一个ProducerRecord对象开始,ProducerRecord 对象须要包含目标主题和要发送的内容,有可能还包含分区信息数组

  2. 把ProducerRecord中的键和值序列化成字节数组,这样它们才可以在网络上传输

  3. 接下来,数据被传给分区器

    • 若是以前在ProducerRecord对象里指定了分区,那么分区器就不会再作任何事情,直接把指定的分区返回
    • 若是没有指定分区,那么分区器会根据ProducerRecord对象的来选择一个分区
  4. 紧接着,这条记录被添加到一个记录批次里,这个批次里的全部消息会被发送到相同的主题和分区上。

  5. 有一个独立的线程负责把这些记录批次发送到相应的broker 上。

  6. 服务器在收到这些消息时会返回一个响应

    • 若是消息成功写入Kafka,就返回一个RecordMetaData对象,它包含了主题分区信息,以及记录在分区里的偏移量

    • 若是写入失败,则会返回一个错误。生产者在收到错误以后会尝试从新发送消息,几回以后若是仍是失败,就返回错误信息。

3.2 建立Kafka生产者

下面展现如何建立一个KafkaProducer

Properties kafkaProperties = new Properties();

// fixme: 运行时请修改47.94.139.116:9092为本身的kafka broker地址
kafkaProperties.put("bootstrap.servers", "47.94.139.116:9092");

kafkaProperties.put("key.serializer",                     "org.apache.kafka.common.serialization.StringSerializer");

kafkaProperties.put("value.serializer",                           "org.apache.kafka.common.serialization.StringSerializer");

kafkaProperties.put("acks", "all");

// 根据配置建立Kafka生产者
KafkaProducer<String, String> kafkaProducer 
    = new KafkaProducer<String, String>(kafkaProperties);

在建立Kafka生产者以前,有3个必选的配置项

配置项 解释
bootstrap.servers 该属性指定broker 的地址清单,地址的格式为host:port, host:port。
key.serializer key.serializer 必须被设置为一个实现了org.apache.kafka.common.serialization.Serializer 接口的类,生产者会使用这个类把键序列化成字节数组。Kafka 客户端默认提供了ByteArraySerializer、StringSerializer 和IntegerSerializer
value.serializer 与key.serializer 同样,value.serializer 指定的类会将值序列化

3.3 发送消息到Kafka

实例化生产者对象后,接下来就能够开始发送消息了。发送消息主要有如下3 种方式:

  • 发送并忘记(fire-and-forget)
    • 咱们把消息发送给服务器,但并不关心它是否正常到达
    • 大多数状况下,消息会正常到达,由于Kafka 是高可用的,并且生产者会自动尝试重发。
    • 不过,使用这种方式有时候也会丢失一些消息
  • 同步发送
    • 咱们使用send() 方法发送消息,它会返回一个Future 对象调用get() 方法进行等待,就能够知道消息是否发送成功。
  • 异步发送
    • 咱们调用send() 方法,并指定一个回调函数,服务器在返回响应时调用该函数。

下面分别演示这三种方式

3.3.1 发送并忘记

ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(
	"sun", "s1", "cn dota best dota");

// 发送消息
try{
    kafkaProducer.send(producerRecord);
}catch(Exception e){
    e.printStackTrace();
}
  • 生产者的send() 方法将ProducerRecord对象做为参数。ProducerRecord中包含目标主题的名字和要发送的对象和对象。
  • 咱们使用生产者的send()方法发送ProducerRecord对象。send() 方法会返回一个包含RecordMetadata 的Future对象,不过由于咱们这里忽略了返回值,因此没法知道消息是否发送成功。若是不关心发送结果,那么可使用这种发送方式。好比,记录日志。
  • 虽然咱们忽略返回值的同时也忽略了返回的异常。不过在发送消息以前,生产者仍是有可能发生其余的异常,这些异常将被抛出。这些异常有多是:
    • SerializationException(说明序列化消息失败)
    • BufferExhaustedException 或TimeoutException(说明缓冲区已满)
    • 又或者是InterruptException(说明发送线程被中断)。

3.3.2 同步发送

ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(
	"sun", "s1", "cn dota best dota");

// 发送消息
try{
    kafkaProducer.send(producerRecord).get();
}catch(Exception e){
    e.printStackTrace();
}
  • producer.send()方法先返回一个Future对象,而后调用Future 对象的get()方法阻塞当前线程来等待Kafka响应。

    • 若是服务器返回一个错误,get() 方法会抛出异常
    • 不然,咱们会获得一个RecordMetadata对象,能够经过它获取消息的主题分区偏移量等信息。
  • 若是在发送消息以前或者在发送消息的过程当中发生了任何错误,好比broker 返回了一个不容许重发消息的异常或者已经超过了重发的次数,那么就会抛出异常

KafkaProducer通常会发生两类错误。

  • 一类是可重试错误,这类错误能够经过重发消息来解决。好比对于链接错误,能够经过再次创建链接来解决,“无主(no leader)”错误则能够经过从新为分区选举首领来解决。KafkaProducer 能够被配置成自动重试,若是在屡次重试后仍没法解决问题,应用程序会收到一个重试异常
  • 另外一类错误没法经过重试解决,好比“消息太大”异常。对于这类错误,KafkaProducer直接抛出异常

3.3.3 异步发送

若是只发送消息而不等待响应,那么能够避免阻塞线程来等待,从而提升发送效率。

大多数时候,咱们并不须要等待响应——尽管Kafka会把目标主题、分区信息和消息的偏移量发送回来,但对于发送端的应用程序来讲不是必需的。不过在遇到消息发送失败时,咱们须要抛出异常记录错误日志,或者把消息写入“错误消息”文件以便往后分析。

这时咱们能够为send()方法注册一个回调函数,让它来处理异步调用的返回结果

// 建立ProducerRecord,它是一种消息的数据结构
ProducerRecord<String, String> producerRecord 
    = new ProducerRecord<String, String>("sun", "s1", "cn dota best dota");

// 发送消息
kafkaProducer.send(producerRecord, new Callback() {
    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
        if(e != null){
            e.printStackTrace();
        }else{
            System.out.println(recordMetadata);
        }   
    }
});
  • 这里经过注册一个回调函数处理异步发送的结果
    • 若是错误,则onCompletion方法的Exception参数不为null,咱们能够针对这个异常进行处理
    • 若是没有错误,RecordMetadata不为null,咱们能够从中获取主题信息、分区信息、偏移量信息

3.4 生产者的配置

生产者还有不少可配置的参数,在Kafka 文档里都有说明,它们大部分都有合理的默认值,因此没有必要去修改它们。不过有几个参数在内存使用、性能和可靠性方面对生产者影响比较大,接下来咱们会一一说明。

3.4.1 acks

acks参数指定了必须要有多少个分区副本收到消息生产者才会认为消息写入是成功的。

  • 若是acks=0,生产者发送消息以后就马上认为消息写入成功

    • 也就是说,若是服务器没有收到消息,生产者也无从得知,消息也就丢失了。
    • 不过,由于生产者不须要等待服务器的响应,因此它能够以网络可以支持的最大速度发送消息,从而达到很高的吞吐量
  • 若是acks=1,只要集群的首领节点收到消息,生产者就会收到一个来自服务器的成功响应

    • 若是消息没法到达首领节点(好比首领节点崩溃,新的首领尚未被选举出来),生产者会收到一个错误响应,为了不数据丢失,生产者会重发消息。
  • 若是acks=all,只有当首领节点全部复制节点所有收到消息时,生产者才会收到一个来自服务器的成功响应

    • 这种模式是最安全的。
    • 不过,它的延迟比acks=1 时更高,由于咱们要等待不仅一个服务器节点接收消息。

3.4.2 retries(重试次数)

retries 参数的值决定了生产者能够重发消息的次数,若是达到这个次数,生产者会放弃重试并返回错误。默认状况下,生产者会在每次重试之间等待100ms。

由于生产者会自动进行重试,因此就不必在代码逻辑里处理那些可重试的错误。你只须要处理那些不可重试的错误或重试次数超出上限的状况。

3.4.3 batch.size(批次大小)和linger.ms(批次等待时间)

KafkaProducer会在批次填满或linger.ms达到上限时把批次发送出去。

该参数指定了一个批次可使用的内存大小按照字节数计算(而不是消息个数)。当批次被填满,批次里的全部消息会被发送出去。

该参数指定了生产者在发送批次以前等待更多消息加入批次的时间

3.4.4 max.in.flight.requests.per.connection

该参数指定了生产者收到服务器响应以前能够发送多少个消息

  • 它的值越高,就会占用越多的内存,不过也会提高吞吐量。
  • 把它设为1能够保证消息是按照发送的顺序写入服务器的,即便发生了重试。

3.5 序列化器

咱们已经在以前的例子里看到,建立一个生产者对象必须指定序列化器。咱们已经知道如何使用默认的字符串序列化器,Kafka 还提供了整型和字节数组序列化器,不过它们还不足以知足大部分场景的需求。到最后,咱们须要序列化的记录类型会愈来愈多。

接下来演示如何开发自定义序列化器,并介绍Avro序列化器。若是发送到Kafka的对象 不是简单的字符串整型,那么可使用序列化框架来建立消息记录,如Avro、Thrift 或Protobuf,或者使用自定义序列化器。咱们强烈建议使用通用的序列化框架

3.5.1 自定义序列化器

/**
 * 一个简单的pojo,为了演示如何自定义序列化器
 * */
public class Customer {

    private int customerId;

    private String customerName;

    public Customer(int customerId, String customerName) {
        this.customerId = customerId;
        this.customerName = customerName;
    }

    public Customer() {
    }

    public int getCustomerId() {
        return customerId;
    }

    public void setCustomerId(int customerId) {
        this.customerId = customerId;
    }

    public String getCustomerName() {
        return customerName;
    }

    public void setCustomerName(String customerName) {
        this.customerName = customerName;
    }
}
/**
 * 自定义序列化器
 * */
public class CustomerSerializer implements Serializer<Customer> {
    public void configure(Map<String, ?> configs, boolean isKey) {
        // 不须要配置任何
    }

    /**
     * Customer对象的序列化函数,组成以下
     * 前4字节: customerId
     * 中间4字节: customerName字节数组长度
     * 后面n字节: customerName字节数组
     * */
    public byte[] serialize(String topic, Customer data) {
        try{
            byte[] serializedName;
            int stringSize;
            if(data == null){
                return null;
            }else{
                if(data.getCustomerName() != null){
                    serializedName = data.getCustomerName().getBytes("utf-8");
                    stringSize = serializedName.length;
                } else{
                    serializedName = new byte[0];
                    stringSize = 0;
                }
            }
            ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + stringSize);
            buffer.putInt(data.getCustomerId());
            buffer.putInt(stringSize);
            buffer.put(serializedName);
            return buffer.array();
        } catch(Exception e){
            throw new SerializationException(
                "Error when serializing Customer to byte[] " + e);
        }
    }

    public byte[] serialize(String topic, Headers headers, Customer data) {
        return serialize(topic, data);
    }

    public void close() {
        // 不须要关闭任何
    }
}

不过咱们不建议采用自定义序列化器,理由以下

  • 若是咱们有多种类型的消费者,可能须要把customerID 字段变成长整型,或者为Customer 添加startDate 字段,这样就会出现新旧消息的兼容性问题
  • 在不一样版本的序列化器和反序列化器之间调试兼容性问题着实是个挑战——你须要比较原始的字节数组。
  • 更糟糕的是,若是同一个公司的不一样团队都须要往Kafka 写入Customer 数据,那么他们就须要使用相同的序列化器,若是序列化器发生改动他们几乎都要在同一时间修改代码

3.5.2 使用Avro序列化

Apache Avro是一种与编程语言无关序列化格式

Avro数据经过与语言无关的schema来定义

  • schema经过JSON来描述
  • 数据能够被序列化成二进制文件或JSON 文件,不过通常会使用二进制文件。
  • Avro 在读写文件须要用到schema,schema 通常会被内嵌在数据文件里。

Avro 有一个颇有意思的特性是,当负责写消息的应用程序使用了新版本的schema,负责读消息的应用程序能够继续处理消息而无需作任何改动,这个特性使得它特别适合用在像Kafka 这样的消息系统上。

下面举个例子

假设咱们有一个v0.0.1的schema

{
"namespace": "customerManagement.avro",
	"type": "record",
	"name": "Customer",
	"fields": [
		{"name": "id", "type": "int"},
		{"name": "name", "type": "string"},
		{"name": "faxNumber", "type": ["null", "string"], "default": "null"} 
	]
}

过了一段时间,咱们须要删除faxnumber字段(传真号码),添加一个email字段,新的schema(v0.0.2)以下

{
"namespace": "customerManagement.avro",
	"type": "record",
	"name": "Customer",
	"fields": [
		{"name": "id", "type": "int"},
		{"name": "name", "type": "string"},
		{"name": "email", "type": ["null", "string"], "default": "null"} 
	]
}

更新到新版的schema 后:

  • 旧记录仍然包含faxNumber 字段
  • 而新记录则包含email 字段
  • 部分负责读取数据的应用程序进行了升级,那么它们是如何处理这些变化的呢?

消费者升级以前

  • 它们会调用相似getName()、getId() 和getFaxNumber() 这样的方法。
  • 若是碰到使用新schema 构建的消息,getName() 和getId() 方法仍然可以正常返回,但getFaxNumber() 方法会返回null,由于消息里不包含传真号码。

消费者升级以后

  • getEmail() 方法取代了getFaxNumber() 方法。
  • 若是碰到一个使用旧schema 构建的消息,那么getEmail() 方法会返回null,由于旧消息不包含邮件地址。

如今能够看出使用Avro 的好处了:咱们修改了消息的schema,但并不须要更新全部消费者,而这样仍然不会出现异常或阻断性错误,也不须要对现有数据进行大幅更新。

3.5.3 在Kafka中使用Avro

若是在每条Kafka 记录里都嵌入schema,会让记录的大小成倍地增长。

  • 可是在读取记录时仍然须要用到整个schema,因此要先找到schema。
  • 咱们要使用“schema 注册表”来达到目的。
  • Kafka中并不包含schema注册表的实现,如今已经有一些开源的schema注册表实现,好比:Confluent Schema Registry。
  • 咱们把全部写入数据须要用到的schema保存在注册表里,而后在记录里放一个schema的标识符
  • 消费者使用记录中的标识符从注册表里拉取schema来反序列化记录。

3.6 分区

在以前的例子里,ProducerRecord对象包含了目标主题。Kafka 的消息是一个个键值对,ProducerRecord 对象能够只包含目标主题和值能够设置为默认的null,不过大多数应用程序会用到键。

键有两个用途:

  1. 能够做为消息的附加信息
  2. 也能够用来决定消息该被写到主题的哪一个分区。拥有相同键的消息将被写到同一个分区。

若是要建立键为null的消息,不指定键就能够

ProducerRecord<Integer, String> record = new ProducerRecord<>("CustomerCountry", "USA");

若是键为null,而且使用了默认的分区器,那么记录将被随机地发送到主题内各个可用的分区上。分区器使用轮询(Round Robin)算法将消息均衡地分布到各个分区上。

若是键不为空,而且使用了默认的分区器,那么Kafka会使用Kafka本身的散列算法对键进行散列(使用Kafka 本身的散列算法,即便升级Java 版本,散列值也不会发生变化),而后根据散列值把消息映射到特定的分区上。这里的关键之处在于,同一个键老是被映射到同一个分区上

只有在不改变主题分区数量的状况下,键与分区之间的映射才能保持不变。举个例子,在分区数量保持不变的状况下,能够保证用户045189 的记录老是被写到分区34。在从分区读取数据时,能够进行各类优化。不过,一旦主题增长了新的分区,这些就没法保证了——旧数据仍然留在分区34,但新的记录可能被写到其余分区上。若是要使用键来映射分区,那么最好在建立主题的时候就把分区规划好,并且永远不要增长新分区。

3.6.1 自定义分区器

默认状况下,kafka自动建立的主题的分区数量为1,因此咱们须要先修改分区数量,来让自定义分区器有点用。

  1. 先cd到opt\kafka\bin\

  2. 运行命令:

    ./kafka-topics.sh --zookeeper 47.94.139.116:2181/kafka --alter --topic sun --partitions 4
  3. 查看是否修改为功

    ./kafka-topics.sh --describe --zookeeper 47.94.139.116:2181/kafka --topic sun

或者也能够经过修改server.properties中的nums.partions并重启,来更改默认分区数量。

下面自定义一个分区器,它根据键来划分分区:若键为Banana则放入最后一个分区,若键不为Banana则散列到其余分区。

/**
 * 自定义分区器
 * 若键为Banana则放入最后一个分区,若键不为Banana则散列到其余分区
 * */
public class BananaPartitioner implements Partitioner {

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {

        // 分区信息列表
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);

        //分区数量
        int partitionsAmount = partitions.size();

        // 若是只有一个分区,那全都放到partition0就完事了
        if(partitionsAmount == 1){
            return 0;
        }

        if(keyBytes == null || ! (key instanceof String)){
            throw new InvalidRecordException("We expect all messages to have customer name as key");
        }

        if(key.equals("Banana")){
            return partitionsAmount - 1;
        }

        return (Math.abs(Utils.murmur2(keyBytes)) % (partitionsAmount - 1));

    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

而后,在producer配置中加入kafkaProperties.put("partitioner.class", "cn.edu.neu.demo.ch3.partitioner.BananaPartitioner");便可。

相关文章
相关标签/搜索