kafka之Producer同步与异步消息发送及事务幂等性案例应用实战

本套系列博客从真实商业环境抽取案例进行总结和分享,并给出Spark商业应用实战指导,请持续关注本套博客。版权声明:本套Spark商业应用实战归做者(秦凯新)全部,禁止转载,欢迎学习。apache

1 我很安全

为什么惊出此言?心里惶恐。kafka的Producer是线程安全的,用户能够很是很是放心的在多线程中使用。bootstrap

可是官方建议:一般状况下,一个线程维护一个kafka 的producer的效率会更高。安全

2 Producer 消息发送流程

  • 第一步:封装ProducerRecord
  • 第二步:分区器Partioner进行数据路由,选择某一个Topic分区。若是没有指定key,消息会被均匀的分配到全部分区。
  • 第三步:肯定好分区,就会找分区对应的leader,接下来就是副本同步机制。

3 Producer官方实例

3.1 Fire and Fogret案例 (无所谓心态)

  • 发送以后便再也不理会发送结果
    Properties props = new Properties();
       props.put("bootstrap.servers", "localhost:9092");
       props.put("acks", "all");
       props.put("retries", 0);
       props.put("batch.size", 16384);
       props.put("linger.ms", 1);
       props.put("buffer.memory", 33554432);
       props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
       props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
      
       Producer<String, String> producer = new KafkaProducer<>(props);
       for (int i = 0; i < 100; i++)
           producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));
      
       producer.close();
    复制代码

3.2 异步回调官方案例 (不阻塞)

  • JavaProducer的send方法会返回一个JavaFuture对象供用户稍后获取发送结果。这就是回调机制。
  • Fully non-blocking usage can make use of the Callback parameter to provide a callback that will be invoked when the request is complete.网络

  • RecordMetadata 和 Exception 不可能同时为空,消息发送成功时,Exception为null,消息发送失败时,metadata为空。多线程

    ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("the-topic", key, value);
       
       producer.send(myRecord,
                     new Callback() {
                         public void onCompletion(RecordMetadata metadata, Exception e) {
                             if(e != null) {
                                e.printStackTrace();
                             } else {
                                System.out.println("The offset of the record we just sent is: " + metadata.offset());
                             }
                         }
                     });
    复制代码

3.3 同步发送官方案例 (阻塞)

  • 经过 producer.send(record)返回Future对象,经过调用Future.get()进行无限等待结果返回。app

    producer.send(record).get()
    复制代码

3.4 基于事务发送官方案例 (原子性和幂等性)

  • From Kafka 0.11, the KafkaProducer supports two additional modes: the idempotent producer and the transactional producer. The idempotent producer strengthens Kafka's delivery semantics from at least once to exactly once delivery. In particular producer retries will no longer introduce duplicates. The transactional producer allows an application to send messages to multiple partitions (and topics!) atomically.异步

  • To enable idempotence, the enable.idempotence configuration must be set to true. If set, the retries config will default to Integer.MAX_VALUE and the acks config will default to all. There are no API changes for the idempotent producer, so existing applications will not need to be modified to take advantage of this feature.ide

    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("transactional.id", "my-transactional-id");
    Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
    
    producer.initTransactions();
    
    try {
        producer.beginTransaction();
        for (int i = 0; i < 100; i++)
            producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
        producer.commitTransaction();
    } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
        // We can't recover from these exceptions, so our only option is to close the producer and exit.
        producer.close();
    } catch (KafkaException e) {
        // For all other exceptions, just abort the transaction and try again.
        producer.abortTransaction();
    }
    producer.close();
    复制代码
  • As is hinted at in the example, there can be only one open transaction per producer. All messages sent between the beginTransaction() and commitTransaction() calls will be part of a single transaction. When the transactional.id is specified, all messages sent by the producer must be part of a transaction.post

3.5 可重试异常(继承RetriableException)

  • LeaderNotAvailableException :分区的Leader副本不可用,这多是换届选举致使的瞬时的异常,重试几回就能够恢复
  • NotControllerException:Controller主要是用来选择分区副本和每个分区leader的副本信息,主要负责统一管理分区信息等,也多是选举所致。
  • NetWorkerException :瞬时网络故障异常所致。

3.6 不可重试异常

  • SerializationException:序列化失败异常
  • RecordToolLargeException:消息尺寸过大致使。

3.7 异常的区别对待

producer.send(myRecord,
                   new Callback() {
                       public void onCompletion(RecordMetadata metadata, Exception e) {
                           if(e ==null){
                               //正常处理逻辑
                               System.out.println("The offset of the record we just sent is: " + metadata.offset()); 
                               
                           }else{
                                   
                                 if(e instanceof RetriableException) {
                                    //处理可重试异常
                                    ......
                                 } else {
                                    //处理不可重试异常
                                    ......
                                 }
                           }
                       }
                   });
复制代码

3.8 Producer的绅士关闭

  • producer.close():优先把消息处理完毕,优雅退出。
  • producer.close(timeout): 超时时,强制关闭。

4 总结

为了可以证实技术就是一层窗户纸,我会把kafka剖析的体无完肤。学习

秦凯新 于深圳 2018

相关文章
相关标签/搜索