经过https://www.cnblogs.com/tree1123/p/11243668.html 已经对consumer有了必定的了解。producer比consumer要简单一些。html
0.9.0.0版本之前,是由scala编写的旧版本producer。java
入口类:kafka.producer.Producer算法
代码示例:apache
Properties properties = new Properties(); properties.put("metadata.broker.list", "kafka01:9092,kafka02:9092"); properties.put("serializer.class", "kafka.serializer.StringEncoder"); properties.put("request.requird.acks", "1"); ProducerConfig config = new ProducerConfig(properties); Producer<String, String> producer = new Producer<String, String>(config); KeyedMessage<String,String> msg = new KeyedMessage<String,String>("topic","hello"); Producer.send(msg);
旧版本是同步机制,等待响应。吞吐性不好。在0.9.0.0版本之后,正式下架了。bootstrap
旧版本的方法:安全
send 发送 close 关闭 sync 异步发送 有丢失消息的可能性
旧版本producer由scala编写,0.9.0.0版本之后,新版本producer由java编写。网络
新版本主要入口类是:org.apache.kafka.clients.producer.KafkaProducer多线程
经常使用方法:app
send 实现消息发送主逻辑 close 关闭producer metrics 获取producer的实时监控指标数据 好比发送消息的速率
Kafka producer要比consumer设计简单一些,主要就是向某个topic的某个分区发送一条消息。partitioner决定向哪一个分区发送消息。用户指定key,默认的分区器会根据key的哈希值来选择分区,若是没有指定key就以轮询的方式选择分区。也能够自定义分区策略。异步
肯定分区后,producer寻找到分区的leader,也就是该leader所在的broker,而后发送消息,leader会进行副本同步ISR。
producer会启两个线程,主线程封装ProducerRecord类,序列化后发给partitioner,而后发送到内存缓冲区。
另外一个I/O线程,提取消息分batch统一发送给对应的broker。
示例代码:
Properties properties = new Properties(); properties.put("bootstrap.servers", "kafka01:9092,kafka02:9092"); properties.put("acks", "all"); properties.put("retries", 0); properties.put("batch.size", 16384); properties.put("linger.ms", 1); properties.put("buffer.memory", 33554432); properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties); for (int i = 1; i <= 600; i++) { kafkaProducer.send(new ProducerRecord<String, String>("z_test_20190430", "testkafka0613"+i)); System.out.println("testkafka"+i); } kafkaProducer.close();
一、构造Properties对象,bootstrap.servers key.serializer value.serializer是必须指定的。
二、使用Properties构造KafkaProducer对象。
三、构造ProducerRecord 指定topic 分区 key value。
四、KafkaProducer的send方法发送。
五、关闭KafkaProducer。
bootstrap.servers 和consumer同样,指定部分broker便可。并且broker端若是没有配ip地址,要写成主机名。
key.serializer value.serializer 序列化参数 必定要全类名 没有key也必须设置。
acks 三个值
0: producer彻底无论broker的处理结果 回调也就没有用了 并不能保证消息成功发送 可是这种吞吐量最高
all或者-1: leader broker会等消息写入 而且ISR都写入后 才会响应,这种只要ISR有副本存活就确定不会丢失,但吞 吐量最低。
1: 默认的值 leader broker本身写入后就响应,不会等待ISR其余的副本写入,只要leader broker存活就不会丢失,即保证了不丢失,也保证了吞吐量。
buffer.memory 缓冲区大小 字节 默认是33554432 就是发送消息的内存缓冲区大小 太小的话会影响吞吐量
compression.type 设置是否压缩消息 默认值是none 压缩后能够下降IO开销提升吞吐,可是会增大CPU开销。
支持三种: GZIP Snappy LZ4 性能 LZ4 > Snappy > GZIP
retries 发送消息重试的次数 默认0 不重试 重试可能形成重复发送 可能形成乱序
retry.backoff.ms 设置重试间隔 默认100毫秒
batch.size 调优重要的参数 batch小 吞吐量也会小 batch大 内存压力会大 默认值是16384 16KB
linger.ms 发送延时 默认是0 0的话不用等batch满就发送 延时的话能够提升吞吐 看具体状况进行调整
max.request.size producer可以发送最大消息的大小 默认1048576字节 若是消息很大 须要修改它
request.timeout.ms 发送请求后broker在规定时间返回 默认30秒 超过就是超时了。
fire and forget 就是上边的示例
Properties properties = new Properties(); properties.put("bootstrap.servers", "kafka01:9092,kafka02:9092"); properties.put("acks", "all"); properties.put("retries", 0); properties.put("batch.size", 16384); properties.put("linger.ms", 1); properties.put("buffer.memory", 33554432); properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties); for (int i = 1; i <= 600; i++) { kafkaProducer.send(new ProducerRecord<String, String>("z_test_20190430", "testkafka0613"+i)); System.out.println("testkafka"+i); } kafkaProducer.close();
异步回调 不阻塞
Properties properties = new Properties(); properties.put("bootstrap.servers", "kafka01:9092,kafka02:9092"); properties.put("acks", "all"); properties.put("retries", 0); properties.put("batch.size", 16384); properties.put("linger.ms", 1); properties.put("buffer.memory", 33554432); properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties); for (int i = 1; i <= 600; i++) { kafkaProducer.send(new ProducerRecord<String, String>("z_test_20190430", "testkafka0613"+i),new Callback(){ public void onCompletion(RecordMetadata metadata, Exception e) { if(e != null) { e.printStackTrace(); } else { System.out.println("The offset of the record we just sent is: " + metadata.offset()); } } }); System.out.println("testkafka"+i); } kafkaProducer.close();
同步发送 无限等待返回
producer.send(record).get()
重试机制
若是须要自定义重试机制,就要在回调里对不一样异常区别对待,常见的几种以下:
可重试异常
LeaderNotAvailableException :分区的Leader副本不可用,这多是换届选举致使的瞬时的异常,重试几回就能够恢复 NotControllerException:Controller主要是用来选择分区副本和每个分区leader的副本信息,主要负责统一管理分区信息等,也多是选举所致。
NetWorkerException :瞬时网络故障异常所致。
不可重试异常
SerializationException:序列化失败异常
RecordToolLargeException:消息尺寸过大致使。
示例代码:
producer.send(myRecord, new Callback() { public void onCompletion(RecordMetadata metadata, Exception e) { if(e ==null){ //正常处理逻辑 System.out.println("The offset of the record we just sent is: " + metadata.offset()); }else{ if(e instanceof RetriableException) { //处理可重试异常 ...... } else { //处理不可重试异常 ...... } } } });
partitioner决定向哪一个分区发送消息。用户指定key,默认的分区器会根据key的哈希值来选择分区,若是没有指定key就以轮询的方式选择分区。也能够自定义分区策略。
对于有key的消息,java版本的producer自带的partitioner会根据murmur2算法计算消息key的哈希值。而后对总分区数求模获得消息要被发送到的目标分区号。
自定义分区策略:
建立一个类,实现org.apache.kafka.clients.producer.Partitioner接口
主要分区逻辑在Partitioner.partition中实现:经过topic key value 一同肯定分区
在构造KafkaProducer得Properties中设置partitioner.class 为自定义类 注意是全类名
经常使用的serializer
ByteArraySerializer.class
ByteBufferSerializer.class
BytesSerializer.class
DoubleSerializer.class
IntegerSerializer.class
LongSerializer.class
StringSerializer.class
可是其余一些复杂的就须要自定义序列化:
一、定义数据格式
二、建立自定义序列化类,实现org.apache.kafka.common.serialization.Serializer接口
三、在KafkaProducer的Properties中设置key.serializer value.serializer为自定义类
以上均为单线程的状况,但producer是线程安全的,单线程适合分区较少的状况,分区较多能够多线程但对内存损耗较大。
更多实时计算,Kafka等相关技术博文,欢迎关注实时流式计算