Spring Boot2.0 整合 Kafka

Kafka 概述

Apache Kafka 是一个分布式流处理平台,用于构建实时的数据管道和流式的应用.它可让你发布和订阅流式的记录,能够储存流式的记录,而且有较好的容错性,能够在流式记录产生时就进行处理。html

Apache Kafka是分布式发布-订阅消息系统,在 kafka官网上对 Kafka 的定义:一个分布式发布-订阅消息传递系统。java

Kafka 特性

  1. 高吞吐量、低延迟:kafka每秒能够处理几十万条消息,它的延迟最低只有几毫秒,每一个topic能够分多个partition, consumer group 对partition进行consume操做;
  2. 可扩展性:kafka集群支持热扩展;
  3. 持久性、可靠性:消息被持久化到本地磁盘,而且支持数据备份防止数据丢失;
  4. 容错性:容许集群中节点失败(若副本数量为n,则容许n-1个节点失败);
  5. 高并发:支持数千个客户端同时读写;
  6. 支持实时在线处理和离线处理:可使用Storm这种实时流处理系统对消息进行实时进行处理,同时还可使用Hadoop这种批处理系统进行离线处理;

Kafka 使用场景

  1. 日志收集:一个公司能够用Kafka能够收集各类服务的log,经过kafka以统一接口服务的方式开放给各类consumer,例如Hadoop、Hbase、Solr等;
  2. 消息系统:解耦和生产者和消费者、缓存消息等;
  3. 用户活动跟踪:Kafka常常被用来记录web用户或者app用户的各类活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,而后订阅者经过订阅这些topic来作实时的监控分析,或者装载到Hadoop、数据仓库中作离线分析和挖掘;
  4. 运营指标:Kafka也常常用来记录运营监控数据。包括收集各类分布式应用的数据,生产各类操做的集中反馈,好比报警和报告;
  5. 流式处理:好比spark streaming和storm;
  6. 事件源;

Spring Boot2.0 + Kafka

1,安装配置Kafka ,Zookeepergit

安装和配置过程很简单,就不详细说了,参考官网:http://kafka.apache.org/quickstartgithub

使用命令启动Kafka: bin``/kafka-server-start``.sh config``/server``.propertiesweb

下面给出个人环境:spring

Centos 7.5,  Kafka 2.11, Zookeeper-3.4.13,  JDK1.8+

2,建立 Spring Boot 项目apache

注意版本:该项目使用Spring Boot 2.0 +,低版本可能不对json

  1. pom.xml引用
<dependency>
                   <groupId>org.springframework.boot</groupId>
                   <artifactId>spring-boot-starter</artifactId>
               </dependency>
               <dependency>
                   <groupId>org.springframework.kafka</groupId>
                   <artifactId>spring-kafka</artifactId>
               </dependency>
               <dependency>
                   <groupId>com.alibaba</groupId>
                   <artifactId>fastjson</artifactId>
                   <version>1.2.47</version>
               </dependency>
  1. 定义消息生产者
    直接使用 KafkaTemplate 发送消息 ,Spring Boot自动装配,不须要本身定义一个Kafka配置类,吐槽一下网站的文章,全都是互相抄,全都写一个 ProduceConfig Consumerconfig 类, Kafka 的参数配置 硬编码在代码中,简直没法直视。。
    定义一个泛型类KafkaSender<T> T 就是你须要发送的消息 对象,序列化使用阿里的 fastjson

消息发送后,能够在回调类里面处理本身的业务,ListenableFutureCallback 类有两个方法,分别是 onFailureononSuccess ,实际场景能够在这两个方法,处理本身的具体业务,这里不作实现。bootstrap

/**
        * 消息生产者
        *
        * @author Jarvis
        * @date 2018/8/3
        */
       @Component
       public class KafkaSender<T> {
       
           private Logger logger = LoggerFactory.getLogger(KafkaSender.class);
       
           @Autowired
           private KafkaTemplate<String, Object> kafkaTemplate;
       
           /**
            * kafka 发送消息
            *
            * @param obj 消息对象
            */
           public void send(T obj) {
               String jsonObj = JSON.toJSONString(obj);
               logger.info("------------ message = {}", jsonObj);
       
               //发送消息
               ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send("kafka.tut", jsonObj);
               future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
                   @Override
                   public void onFailure(Throwable throwable) {
                       logger.info("Produce: The message failed to be sent:" + throwable.getMessage());
                   }
       
                   @Override
                   public void onSuccess(SendResult<String, Object> stringObjectSendResult) {
                       //TODO 业务处理
                       logger.info("Produce: The message was sent successfully:");
                       logger.info("Produce: _+_+_+_+_+_+_+ result: " + stringObjectSendResult.toString());
                   }
               });
           }
       }
  1. 定义消息消费者
    使用@KafkaListener 注解监听 topics 消息,此处的topics 必须和 send 函数中的 一致

@Header(KafkaHeaders.RECEIVED_TOPI 直接获取 topic缓存

/**
     * 监听kafka.tut 的 topic
     *
     * @param record
     * @param topic  topic
     */
    @KafkaListener(id = "tut", topics = "kafka.tut")
    public void listen(ConsumerRecord<?, ?> record, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
        //判断是否NULL
        Optional<?> kafkaMessage = Optional.ofNullable(record.value());

        if (kafkaMessage.isPresent()) {
            //获取消息
            Object message = kafkaMessage.get();

            logger.info("Receive: +++++++++++++++ Topic:" + topic);
            logger.info("Receive: +++++++++++++++ Record:" + record);
            logger.info("Receive: +++++++++++++++ Message:" + message);
        }
    }
  1. 配置文件 application.yml
spring:
         application:
           name: kafka-tutorial
         kafka:
           # 指定kafka 代理地址,能够多个
           bootstrap-servers: 192.168.10.100:9092
           producer:
             retries: 0
             # 每次批量发送消息的数量
             batch-size: 16384
             # 缓存容量
             buffer-memory: 33554432
             # 指定消息key和消息体的编解码方式
             key-serializer: org.apache.kafka.common.serialization.StringSerializer
             value-serializer: org.apache.kafka.common.serialization.StringSerializer
           consumer:
             # 指定默认消费者group id
             group-id: consumer-tutorial
             auto-commit-interval: 100
             auto-offset-reset: earliest
             enable-auto-commit: true
             # 指定消息key和消息体的编解码方式
             key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
             value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
           # 指定listener 容器中的线程数,用于提升并发量
           listener:
             concurrency: 3
  1. 直接使用 @Autowired 对类 KafkaSender 自动装配,而后调用 send 方法发送消息便可,下面给出代码:
@Autowired
           private KafkaSender<User> kafkaSender;
       
           @Test
           public void kafkaSend() throws InterruptedException {
               //模拟发消息
               for (int i = 0; i < 5; i++) {
       
                   User user = new User();
                   user.setId(System.currentTimeMillis());
                   user.setMsg(UUID.randomUUID().toString());
                   user.setSendTime(new Date());
       
                   kafkaSender.send(message);
                   Thread.sleep(3000);
       
               }
           }

控制台能够看到执行成功:

在服务器执行 bin/kafka-topics.sh --list --zookeeper localhost:2181 能够看到topic

Kafka如何保证数据的不丢失

1.生产者数据的不丢失

  • 新版本的producer采用异步发送机制。KafkaProducer.send(ProducerRecord)方法仅仅是把这条消息放入一个缓存中(即RecordAccumulator,本质上使用了队列来缓存记录),同时后台的IO线程会不断扫描该缓存区,将知足条件的消息封装到某个batch中而后发送出去。显然,这个过程当中就有一个数据丢失的窗口:若IO线程发送以前client端挂掉了,累积在accumulator中的数据的确有可能会丢失。 kafka的ack机制:在kafka发送数据的时候,每次发送消息都会有一个确认反馈机制,确保消息正常的可以被收到。
  • 若是是同步模式:ack机制可以保证数据的不丢失,若是ack设置为0,风险很大,通常不建议设置为0
    producer.type=sync
    request.required.acks=1
  • 若是是异步模式:经过buffer来进行控制数据的发送,有两个值来进行控制,时间阈值与消息的数量阈值,若是buffer满了数据尚未发送出去,若是设置的是当即清理模式,风险很大,必定要设置为阻塞模式
    producer.type=async
    request.required.acks=1
    queue.buffering.max.ms=5000
    queue.buffering.max.messages=10000
    queue.enqueue.timeout.ms = -1
    batch.num.messages=200
  • 结论:producer有丢数据的可能,可是能够经过配置保证消息的不丢失
    2.消费者数据的不丢失
  • 若是在消息处理完成前就提交了offset,那么就有可能形成数据的丢失。因为Kafka consumer默认是自动提交位移的,因此在后台提交位移前必定要保证消息被正常处理了,所以不建议采用很重的处理逻辑,若是处理耗时很长,则建议把逻辑放到另外一个线程中去作。为了不数据丢失,现给出两点建议:
    enable.auto.commit=false 关闭自动提交位移
    在消息被完整处理以后再手动提交位移
  • 若是使用了storm,要开启storm的ackfail机制;
  • 若是没有使用storm,确认数据被完成处理以后,再更新offset值。低级API中须要手动控制offset值。经过offset commit 来保证数据的不丢失,kafka本身记录了每次消费的offset数值,下次继续消费的时候,接着上次的offset进行消费便可。

源码 github:https://github.com/jarvisqi/java-tutorial/tree/master/kafka-tutorial

参考:

  1. http://kafka.apache.org/quickstart
  2. https://docs.spring.io/spring-kafka/reference/htmlsingle/#kafka
  3. http://www.javashuo.com/article/p-xkkxapke-ck.html
相关文章
相关标签/搜索