对应代码仓库地址:https://github.com/T0UGH/getting-started-kafkajava
参考资料:git
kafka权威指南https://book.douban.com/subject/27665114/github
大数据通用的序列化器——Apache Avrohttps://www.jianshu.com/p/0a85bfbb9f5f算法
Kafka 中使用 Avro 序列化框架(一)https://www.jianshu.com/p/4f724c7c497dshell
Kafka 中使用 Avro 序列化组件(三):Confluent Schema Registryhttps://www.jianshu.com/p/cd6f413d35b0apache
本章对应代码仓库可查看https://github.com/T0UGH/getting-started-kafka/tree/main/src/main/java/cn/edu/neu/demo/ch3编程
在这一章,咱们将从Kafka生产者的设计和组件讲起,学习如何使用Kafka 生产者。json
先展现向Kafka发送消息的主要步骤bootstrap
首先建立一个ProducerRecord对象开始,ProducerRecord 对象须要包含目标主题和要发送的内容,有可能还包含键和分区信息数组
把ProducerRecord中的键和值序列化成字节数组,这样它们才可以在网络上传输
接下来,数据被传给分区器。
紧接着,这条记录被添加到一个记录批次里,这个批次里的全部消息会被发送到相同的主题和分区上。
有一个独立的线程负责把这些记录批次发送到相应的broker 上。
服务器在收到这些消息时会返回一个响应。
若是消息成功写入Kafka,就返回一个RecordMetaData对象,它包含了主题和分区信息,以及记录在分区里的偏移量。
若是写入失败,则会返回一个错误。生产者在收到错误以后会尝试从新发送消息,几回以后若是仍是失败,就返回错误信息。
下面展现如何建立一个KafkaProducer
Properties kafkaProperties = new Properties(); // fixme: 运行时请修改47.94.139.116:9092为本身的kafka broker地址 kafkaProperties.put("bootstrap.servers", "47.94.139.116:9092"); kafkaProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); kafkaProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); kafkaProperties.put("acks", "all"); // 根据配置建立Kafka生产者 KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(kafkaProperties);
在建立Kafka生产者以前,有3个必选的配置项
配置项 | 解释 |
---|---|
bootstrap.servers | 该属性指定broker 的地址清单,地址的格式为host:port, host:port。 |
key.serializer | key.serializer 必须被设置为一个实现了org.apache.kafka.common.serialization.Serializer 接口的类,生产者会使用这个类把键序列化成字节数组。Kafka 客户端默认提供了ByteArraySerializer、StringSerializer 和IntegerSerializer |
value.serializer | 与key.serializer 同样,value.serializer 指定的类会将值序列化。 |
实例化生产者对象后,接下来就能够开始发送消息了。发送消息主要有如下3 种方式:
下面分别演示这三种方式
ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>( "sun", "s1", "cn dota best dota"); // 发送消息 try{ kafkaProducer.send(producerRecord); }catch(Exception e){ e.printStackTrace(); }
ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>( "sun", "s1", "cn dota best dota"); // 发送消息 try{ kafkaProducer.send(producerRecord).get(); }catch(Exception e){ e.printStackTrace(); }
producer.send()方法先返回一个Future对象,而后调用Future 对象的get()方法会阻塞当前线程来等待Kafka响应。
若是在发送消息以前或者在发送消息的过程当中发生了任何错误,好比broker 返回了一个不容许重发消息的异常或者已经超过了重发的次数,那么就会抛出异常。
KafkaProducer通常会发生两类错误。
若是只发送消息而不等待响应,那么能够避免阻塞线程来等待,从而提升发送效率。
大多数时候,咱们并不须要等待响应——尽管Kafka会把目标主题、分区信息和消息的偏移量发送回来,但对于发送端的应用程序来讲不是必需的。不过在遇到消息发送失败时,咱们须要抛出异常、记录错误日志,或者把消息写入“错误消息”文件以便往后分析。
这时咱们能够为send()方法注册一个回调函数,让它来处理异步调用的返回结果
// 建立ProducerRecord,它是一种消息的数据结构 ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("sun", "s1", "cn dota best dota"); // 发送消息 kafkaProducer.send(producerRecord, new Callback() { public void onCompletion(RecordMetadata recordMetadata, Exception e) { if(e != null){ e.printStackTrace(); }else{ System.out.println(recordMetadata); } } });
onCompletion
方法的Exception
参数不为null,咱们能够针对这个异常进行处理RecordMetadata
不为null,咱们能够从中获取主题信息、分区信息、偏移量信息生产者还有不少可配置的参数,在Kafka 文档里都有说明,它们大部分都有合理的默认值,因此没有必要去修改它们。不过有几个参数在内存使用、性能和可靠性方面对生产者影响比较大,接下来咱们会一一说明。
acks参数指定了必须要有多少个分区副本收到消息,生产者才会认为消息写入是成功的。
若是acks=0,生产者发送消息以后就马上认为消息写入成功。
若是acks=1,只要集群的首领节点收到消息,生产者就会收到一个来自服务器的成功响应。
若是acks=all,只有当首领节点和全部复制节点所有收到消息时,生产者才会收到一个来自服务器的成功响应。
retries 参数的值决定了生产者能够重发消息的次数,若是达到这个次数,生产者会放弃重试并返回错误。默认状况下,生产者会在每次重试之间等待100ms。
由于生产者会自动进行重试,因此就不必在代码逻辑里处理那些可重试的错误。你只须要处理那些不可重试的错误或重试次数超出上限的状况。
KafkaProducer会在批次填满或linger.ms达到上限时把批次发送出去。
该参数指定了一个批次可使用的内存大小,按照字节数计算(而不是消息个数)。当批次被填满,批次里的全部消息会被发送出去。
该参数指定了生产者在发送批次以前等待更多消息加入批次的时间
该参数指定了生产者在收到服务器响应以前能够发送多少个消息。
咱们已经在以前的例子里看到,建立一个生产者对象必须指定序列化器。咱们已经知道如何使用默认的字符串序列化器,Kafka 还提供了整型和字节数组序列化器,不过它们还不足以知足大部分场景的需求。到最后,咱们须要序列化的记录类型会愈来愈多。
接下来演示如何开发自定义序列化器,并介绍Avro序列化器。若是发送到Kafka的对象 不是简单的字符串或整型,那么可使用序列化框架来建立消息记录,如Avro、Thrift 或Protobuf,或者使用自定义序列化器。咱们强烈建议使用通用的序列化框架。
/** * 一个简单的pojo,为了演示如何自定义序列化器 * */ public class Customer { private int customerId; private String customerName; public Customer(int customerId, String customerName) { this.customerId = customerId; this.customerName = customerName; } public Customer() { } public int getCustomerId() { return customerId; } public void setCustomerId(int customerId) { this.customerId = customerId; } public String getCustomerName() { return customerName; } public void setCustomerName(String customerName) { this.customerName = customerName; } }
/** * 自定义序列化器 * */ public class CustomerSerializer implements Serializer<Customer> { public void configure(Map<String, ?> configs, boolean isKey) { // 不须要配置任何 } /** * Customer对象的序列化函数,组成以下 * 前4字节: customerId * 中间4字节: customerName字节数组长度 * 后面n字节: customerName字节数组 * */ public byte[] serialize(String topic, Customer data) { try{ byte[] serializedName; int stringSize; if(data == null){ return null; }else{ if(data.getCustomerName() != null){ serializedName = data.getCustomerName().getBytes("utf-8"); stringSize = serializedName.length; } else{ serializedName = new byte[0]; stringSize = 0; } } ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + stringSize); buffer.putInt(data.getCustomerId()); buffer.putInt(stringSize); buffer.put(serializedName); return buffer.array(); } catch(Exception e){ throw new SerializationException( "Error when serializing Customer to byte[] " + e); } } public byte[] serialize(String topic, Headers headers, Customer data) { return serialize(topic, data); } public void close() { // 不须要关闭任何 } }
不过咱们不建议采用自定义序列化器,理由以下
Apache Avro是一种与编程语言无关的序列化格式。
Avro数据经过与语言无关的schema来定义。
Avro 有一个颇有意思的特性是,当负责写消息的应用程序使用了新版本的schema,负责读消息的应用程序能够继续处理消息而无需作任何改动,这个特性使得它特别适合用在像Kafka 这样的消息系统上。
下面举个例子
假设咱们有一个v0.0.1的schema
{ "namespace": "customerManagement.avro", "type": "record", "name": "Customer", "fields": [ {"name": "id", "type": "int"}, {"name": "name", "type": "string"}, {"name": "faxNumber", "type": ["null", "string"], "default": "null"} ] }过了一段时间,咱们须要删除
faxnumber
字段(传真号码),添加一个{ "namespace": "customerManagement.avro", "type": "record", "name": "Customer", "fields": [ {"name": "id", "type": "int"}, {"name": "name", "type": "string"}, {"name": "email", "type": ["null", "string"], "default": "null"} ] }更新到新版的schema 后:
- 旧记录仍然包含faxNumber 字段
- 而新记录则包含email 字段
- 部分负责读取数据的应用程序进行了升级,那么它们是如何处理这些变化的呢?
在消费者升级以前:
- 它们会调用相似getName()、getId() 和getFaxNumber() 这样的方法。
- 若是碰到使用新schema 构建的消息,getName() 和getId() 方法仍然可以正常返回,但getFaxNumber() 方法会返回null,由于消息里不包含传真号码。
在消费者升级以后
- getEmail() 方法取代了getFaxNumber() 方法。
- 若是碰到一个使用旧schema 构建的消息,那么getEmail() 方法会返回null,由于旧消息不包含邮件地址。
如今能够看出使用Avro 的好处了:咱们修改了消息的schema,但并不须要更新全部消费者,而这样仍然不会出现异常或阻断性错误,也不须要对现有数据进行大幅更新。
若是在每条Kafka 记录里都嵌入schema,会让记录的大小成倍地增长。
在以前的例子里,ProducerRecord对象包含了目标主题、键和值。Kafka 的消息是一个个键值对,ProducerRecord 对象能够只包含目标主题和值,键能够设置为默认的null,不过大多数应用程序会用到键。
键有两个用途:
若是要建立键为null的消息,不指定键就能够
ProducerRecord<Integer, String> record = new ProducerRecord<>("CustomerCountry", "USA");
若是键为null,而且使用了默认的分区器,那么记录将被随机地发送到主题内各个可用的分区上。分区器使用轮询(Round Robin)算法将消息均衡地分布到各个分区上。
若是键不为空,而且使用了默认的分区器,那么Kafka会使用Kafka本身的散列算法对键进行散列(使用Kafka 本身的散列算法,即便升级Java 版本,散列值也不会发生变化),而后根据散列值把消息映射到特定的分区上。这里的关键之处在于,同一个键老是被映射到同一个分区上。
只有在不改变主题分区数量的状况下,键与分区之间的映射才能保持不变。举个例子,在分区数量保持不变的状况下,能够保证用户045189 的记录老是被写到分区34。在从分区读取数据时,能够进行各类优化。不过,一旦主题增长了新的分区,这些就没法保证了——旧数据仍然留在分区34,但新的记录可能被写到其余分区上。若是要使用键来映射分区,那么最好在建立主题的时候就把分区规划好,并且永远不要增长新分区。
默认状况下,kafka自动建立的主题的分区数量为1,因此咱们须要先修改分区数量,来让自定义分区器有点用。
先cd到opt\kafka\bin\
运行命令:
./kafka-topics.sh --zookeeper 47.94.139.116:2181/kafka --alter --topic sun --partitions 4
查看是否修改为功
./kafka-topics.sh --describe --zookeeper 47.94.139.116:2181/kafka --topic sun
或者也能够经过修改server.properties
中的nums.partions
并重启,来更改默认分区数量。
下面自定义一个分区器,它根据键来划分分区:若键为Banana则放入最后一个分区,若键不为Banana则散列到其余分区。
/** * 自定义分区器 * 若键为Banana则放入最后一个分区,若键不为Banana则散列到其余分区 * */ public class BananaPartitioner implements Partitioner { @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { // 分区信息列表 List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); //分区数量 int partitionsAmount = partitions.size(); // 若是只有一个分区,那全都放到partition0就完事了 if(partitionsAmount == 1){ return 0; } if(keyBytes == null || ! (key instanceof String)){ throw new InvalidRecordException("We expect all messages to have customer name as key"); } if(key.equals("Banana")){ return partitionsAmount - 1; } return (Math.abs(Utils.murmur2(keyBytes)) % (partitionsAmount - 1)); } @Override public void close() { } @Override public void configure(Map<String, ?> configs) { } }
而后,在producer配置中加入kafkaProperties.put("partitioner.class", "cn.edu.neu.demo.ch3.partitioner.BananaPartitioner");
便可。