spring boot集成kafka之spring-kafka深刻探秘

前言

kafka是一个消息队列产品,基于Topic partitions的设计,能达到很是高的消息发送处理性能。Spring建立了一个项目Spring-kafka,封装了Apache 的Kafka-client,用于在Spring项目里快速集成kafka。除了简单的收发消息外,Spring-kafka还提供了不少高级功能,下面咱们就来一一探秘这些用法。java

项目地址:https://github.com/spring-projects/spring-kafkagit

简单集成

引入依赖

<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
  <version>2.2.6.RELEASE</version>
</dependency>

添加配置

spring.kafka.producer.bootstrap-servers=127.0.0.1:9092github

测试发送和接收

/**
 * @author: kl @kailing.pub
 * @date: 2019/5/30
 */
@SpringBootApplication
@RestController
public class Application {

    private final Logger logger = LoggerFactory.getLogger(Application.class);

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @Autowired
    private KafkaTemplate<Object, Object> template;

    @GetMapping("/send/{input}")
    public void sendFoo(@PathVariable String input) {
        this.template.send("topic_input", input);
    }
    @KafkaListener(id = "webGroup", topics = "topic_input")
    public void listen(String input) {
        logger.info("input value: {}" , input);
    }
}

启动应用后,在浏览器中输入:http://localhost:8080/send/kl。就能够在控制台看到有日志输出了:input value: "kl"。基础的使用就这么简单。发送消息时注入一个KafkaTemplate,接收消息时添加一个@KafkaListener注解便可。web

Spring-kafka-test嵌入式Kafka Server

不过上面的代码可以启动成功,前提是你已经有了Kafka Server的服务环境,咱们知道Kafka是由Scala + Zookeeper构建的,能够从官网下载部署包在本地部署。可是,我想告诉你,为了简化开发环节验证Kafka相关功能,Spring-Kafka-Test已经封装了Kafka-test提供了注解式的一键开启Kafka Server的功能,使用起来也是超级简单。本文后面的全部测试用例的Kafka都是使用这种嵌入式服务提供的。spring

引入依赖

<dependency>
   <groupId>org.springframework.kafka</groupId>
   <artifactId>spring-kafka-test</artifactId>
   <version>2.2.6.RELEASE</version>
   <scope>test</scope>
</dependency>

启动服务

下面使用Junit测试用例,直接启动一个Kafka Server服务,包含四个Broker节点。apache

@RunWith(SpringRunner.class)
@SpringBootTest(classes = ApplicationTests.class)
@EmbeddedKafka(count = 4,ports = {9092,9093,9094,9095})
public class ApplicationTests {
    @Test
    public void contextLoads()throws IOException {
        System.in.read();
    }
}

如上:只须要一个注解@EmbeddedKafka便可,就能够启动一个功能完整的Kafka服务,是否是很酷。默认只写注解不加参数的状况下,是建立一个随机端口的Broker,在启动的日志中会输出具体的端口以及默认的一些配置项。不过这些咱们在Kafka安装包配置文件中的配置项,在注解参数中均可以配置,下面详解下@EmbeddedKafka注解中的可设置参数 :bootstrap

  • value:broker节点数量
  • count:同value做用同样,也是配置的broker的节点数量
  • controlledShutdown:控制关闭开关,主要用来在Broker意外关闭时减小此Broker上Partition的不可用时间

      Kafka是多Broker架构的高可用服务,一个Topic对应多个partition,一个Partition能够有多个副本Replication,这些Replication副本保存在多个Broker,用于高可用。可是,虽然存在多个分区副本集,当前工做副本集却只有一个,默认就是首次分配的副本集【首选副本】为Leader,负责写入和读取数据。当咱们升级Broker或者更新Broker配置时须要重启服务,这个时候须要将partition转移到可用的Broker。下面涉及到三种状况api

  1.    直接关闭Broker:当Broker关闭时,Broker集群会从新进行选主操做,选出一个新的Broker来做为Partition Leader,选举时此Broker上的Partition会短时不可用
  2.   开启controlledShutdown:当Broker关闭时,Broker自己会先尝试将Leader角色转移到其余可用的Broker上
  3.   使用命令行工具:使用bin/kafka-preferred-replica-election.sh,手动触发PartitionLeader角色转移
  • ports:端口列表,是一个数组。对应了count参数,有几个Broker,就要对应几个端口号
  • brokerProperties:Broker参数设置,是一个数组结构,支持以下方式进行Broker参数设置:
@EmbeddedKafka(brokerProperties = {"log.index.interval.bytes = 4096","num.io.threads = 8"})
  • okerPropertiesLocation:Broker参数文件设置

  功能同上面的brokerProperties,只是Kafka Broker的可设置参数达182个之多,都像上面这样配置确定不是最优方案,因此提供了加载本地配置文件的功能,如:数组

@EmbeddedKafka(brokerPropertiesLocation = "classpath:application.properties")

建立新的Topic

默认状况下,若是在使用KafkaTemplate发送消息时,Topic不存在,会建立一个新的Topic,默认的分区数和副本数为以下Broker参数来设定浏览器

num.partitions = 1 #默认Topic分区数
num.replica.fetchers = 1 #默认副本数

程序启动时建立Topic

/**
 * @author: kl @kailing.pub
 * @date: 2019/5/31
 */
@Configuration
public class KafkaConfig {
    @Bean
    public KafkaAdmin admin(KafkaProperties properties){
        KafkaAdmin admin = new KafkaAdmin(properties.buildAdminProperties());
        admin.setFatalIfBrokerNotAvailable(true);
        return admin;
    }
    @Bean
    public NewTopic topic2() {
        return new NewTopic("topic-kl", 1, (short) 1);
    }
}

若是Kafka Broker支持(1.0.0或更高版本),则若是发现现有Topic的Partition 数少于设置的Partition 数,则会新增新的Partition分区。关于KafkaAdmin有几个经常使用的用法以下:

setFatalIfBrokerNotAvailable(true):默认这个值是False的,在Broker不可用时,不影响Spring 上下文的初始化。若是你以为Broker不可用影响正常业务须要显示的将这个值设置为True

setAutoCreate(false) : 默认值为True,也就是Kafka实例化后会自动建立已经实例化的NewTopic对象

initialize():当setAutoCreate为false时,须要咱们程序显示的调用admin的initialize()方法来初始化NewTopic对象

代码逻辑中建立

有时候咱们在程序启动时并不知道某个Topic须要多少Partition数合适,可是又不能一股脑的直接使用Broker的默认设置,这个时候就须要使用Kafka-Client自带的AdminClient来进行处理。上面的Spring封装的KafkaAdmin也是使用的AdminClient来处理的。如:

@Autowired
    private KafkaProperties properties;
    @Test
    public void testCreateToipc(){
        AdminClient client = AdminClient.create(properties.buildAdminProperties());
        if(client !=null){
            try {
                Collection<NewTopic> newTopics = new ArrayList<>(1);
                newTopics.add(new NewTopic("topic-kl",1,(short) 1));
                client.createTopics(newTopics);
            }catch (Throwable e){
                e.printStackTrace();
            }finally {
                client.close();
            }
        }
    }

ps:其余的方式建立Topic

上面的这些建立Topic方式前提是你的spring boot版本到2.x以上了,由于spring-kafka2.x版本只支持spring boot2.x的版本。在1.x的版本中尚未这些api。下面补充一种在程序中经过Kafka_2.10建立Topic的方式

引入依赖

<dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.10</artifactId>
            <version>0.8.2.2</version>
        </dependency>

api方式建立

@Test
    public void testCreateTopic()throws Exception{
        ZkClient zkClient =new ZkClient("127.0.0.1:2181", 3000, 3000, ZKStringSerializer$.MODULE$)
        String topicName = "topic-kl";
        int partitions = 1;
        int replication = 1;
        AdminUtils.createTopic(zkClient,topicName,partitions,replication,new Properties());
    }

注意下ZkClient最后一个构造入参,是一个序列化反序列化的接口实现,博主测试若是不填的话,建立的Topic在ZK上的数据是有问题的,默认的Kafka实现也很简单,就是作了字符串UTF-8编码处理。ZKStringSerializer$是Kafka中已经实现好的一个接口实例,是一个Scala的伴生对象,在Java中直接调用点MODULE$就能够获得一个实例

命令方式建立

@Test
    public void testCreateTopic(){
        String [] options= new String[]{
                "--create",
                "--zookeeper","127.0.0.1:2181",
                "--replication-factor", "3",
                "--partitions", "3",
                "--topic", "topic-kl"
        };
        TopicCommand.main(options);
    }

消息发送之KafkaTemplate探秘

获取发送结果

异步获取

template.send("","").addCallback(new ListenableFutureCallback<SendResult<Object, Object>>() {
            @Override
            public void onFailure(Throwable throwable) {
                ......
            }

            @Override
            public void onSuccess(SendResult<Object, Object> objectObjectSendResult) {
                ....
            }
        });

同步获取

ListenableFuture<SendResult<Object,Object>> future = template.send("topic-kl","kl");
        try {
            SendResult<Object,Object> result = future.get();
        }catch (Throwable e){
            e.printStackTrace();
        }

kafka事务消息

默认状况下,Spring-kafka自动生成的KafkaTemplate实例,是不具备事务消息发送能力的。须要使用以下配置激活事务特性。事务激活后,全部的消息发送只能在发生事务的方法内执行了,否则就会抛一个没有事务交易的异常

spring.kafka.producer.transaction-id-prefix=kafka_tx.

当发送消息有事务要求时,好比,当全部消息发送成功才算成功,以下面的例子:假设第一条消费发送后,在发第二条消息前出现了异常,那么第一条已经发送的消息也会回滚。并且正常状况下,假设在消息一发送后休眠一段时间,在发送第二条消息,消费端也只有在事务方法执行完成后才会接收到消息

@GetMapping("/send/{input}")
    public void sendFoo(@PathVariable String input) {
        template.executeInTransaction(t ->{
            t.send("topic_input","kl");
            if("error".equals(input)){
                throw new RuntimeException("failed");
            }
            t.send("topic_input","ckl");
            return true;
        });
    }

当事务特性激活时,一样,在方法上面加@Transactional注解也会生效

@GetMapping("/send/{input}")
    @Transactional(rollbackFor = RuntimeException.class)
    public void sendFoo(@PathVariable String input) {
        template.send("topic_input", "kl");
        if ("error".equals(input)) {
            throw new RuntimeException("failed");
        }
        template.send("topic_input", "ckl");
    }

Spring-Kafka的事务消息是基于Kafka提供的事务消息功能的。而Kafka Broker默认的配置针对的三个或以上Broker高可用服务而设置的。这边在测试的时候为了简单方便,使用了嵌入式服务新建了一个单Broker的Kafka服务,出现了一些问题:如

一、事务日志副本集大于Broker数量,会抛以下异常:

Number of alive brokers '1' does not meet the required replication factor '3' 
for the transactions state topic (configured via 'transaction.state.log.replication.factor').
This error can be ignored if the cluster is starting up and not all brokers are up yet.

默认Broker的配置transaction.state.log.replication.factor=3,单节点只能调整为1

二、副本数小于副本同步队列数目,会抛以下异常

Number of insync replicas for partition __transaction_state-13 is [1], below required minimum [2]

默认Broker的配置transaction.state.log.min.isr=2,单节点只能调整为1

ReplyingKafkaTemplate得到消息回复

ReplyingKafkaTemplate是KafkaTemplate的一个子类,除了继承父类的方法,新增了一个方法sendAndReceive,实现了消息发送\回复语义

RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record);

也就是我发送一条消息,可以拿到消费者给我返回的结果。就像传统的RPC交互那样。当消息的发送者须要知道消息消费者的具体的消费状况,很是适合这个api。如,一条消息中发送一批数据,须要知道消费者成功处理了哪些数据。下面代码演示了怎么集成以及使用ReplyingKafkaTemplate

/**
 * @author: kl @kailing.pub
 * @date: 2019/5/30
 */
@SpringBootApplication
@RestController
public class Application {
    private final Logger logger = LoggerFactory.getLogger(Application.class);
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
    @Bean
    public ConcurrentMessageListenerContainer<String, String> repliesContainer(ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {
        ConcurrentMessageListenerContainer<String, String> repliesContainer = containerFactory.createContainer("replies");
        repliesContainer.getContainerProperties().setGroupId("repliesGroup");
        repliesContainer.setAutoStartup(false);
        return repliesContainer;
    }

    @Bean
    public ReplyingKafkaTemplate<String, String, String> replyingTemplate(ProducerFactory<String, String> pf, ConcurrentMessageListenerContainer<String, String> repliesContainer) {
        return new ReplyingKafkaTemplate(pf, repliesContainer);
    }

    @Bean
    public KafkaTemplate kafkaTemplate(ProducerFactory<String, String> pf) {
        return new KafkaTemplate(pf);
    }

    @Autowired
    private ReplyingKafkaTemplate template;

    @GetMapping("/send/{input}")
    @Transactional(rollbackFor = RuntimeException.class)
    public void sendFoo(@PathVariable String input) throws Exception {
        ProducerRecord<String, String> record = new ProducerRecord<>("topic-kl", input);
        RequestReplyFuture<String, String, String> replyFuture = template.sendAndReceive(record);
        ConsumerRecord<String, String> consumerRecord = replyFuture.get();
        System.err.println("Return value: " + consumerRecord.value());
    }

    @KafkaListener(id = "webGroup", topics = "topic-kl")
    @SendTo
    public String listen(String input) {
        logger.info("input value: {}", input);
        return "successful";
    }
}

Spring-kafka消息消费用法探秘

@KafkaListener的使用

前面在简单集成中已经演示过了@KafkaListener接收消息的能力,可是@KafkaListener的功能不止如此,其余的比较常见的,使用场景比较多的功能点以下:

  • 显示的指定消费哪些Topic和分区的消息,
  • 设置每一个Topic以及分区初始化的偏移量,
  • 设置消费线程并发度
  • 设置消息异常处理器
@KafkaListener(id = "webGroup", topicPartitions = {
            @TopicPartition(topic = "topic1", partitions = {"0", "1"}),
                    @TopicPartition(topic = "topic2", partitions = "0",
                            partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
            },concurrency = "6",errorHandler = "myErrorHandler")
    public String listen(String input) {
        logger.info("input value: {}", input);
        return "successful";
    }

其余的注解参数都很好理解,errorHandler须要说明下,设置这个参数须要实现一个接口KafkaListenerErrorHandler。并且注解里的配置,是你自定义实现实例在spring上下文中的Name。好比,上面配置为errorHandler = "myErrorHandler"。则在spring上线中应该存在这样一个实例:

/**
 * @author: kl @kailing.pub
 * @date: 2019/5/31
 */
@Service("myErrorHandler")
public class MyKafkaListenerErrorHandler implements KafkaListenerErrorHandler {
    Logger logger =LoggerFactory.getLogger(getClass());
    @Override
    public Object handleError(Message<?> message, ListenerExecutionFailedException exception) {
        logger.info(message.getPayload().toString());
        return null;
    }
    @Override
    public Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer) {
        logger.info(message.getPayload().toString());
        return null;
    }
}

手动Ack模式

手动ACK模式,由业务逻辑控制提交偏移量。好比程序在消费时,有这种语义,特别异常状况下不确认ack,也就是不提交偏移量,那么你只能使用手动Ack模式来作了。开启手动首先须要关闭自动提交,而后设置下consumer的消费模式

spring.kafka.consumer.enable-auto-commit=false
spring.kafka.listener.ack-mode=manual

上面的设置好后,在消费时,只须要在@KafkaListener监听方法的入参加入Acknowledgment 便可,执行到ack.acknowledge()表明提交了偏移量

@KafkaListener(id = "webGroup", topics = "topic-kl")
    public String listen(String input, Acknowledgment ack) {
        logger.info("input value: {}", input);
        if ("kl".equals(input)) {
            ack.acknowledge();
        }
        return "successful";
    }

@KafkaListener注解监听器生命周期

@KafkaListener注解的监听器的生命周期是能够控制的,默认状况下,@KafkaListener的参数autoStartup = "true"。也就是自动启动消费,可是也能够同过KafkaListenerEndpointRegistry来干预他的生命周期。KafkaListenerEndpointRegistry有三个动做方法分别如:start(),pause(),resume()/启动,中止,继续。以下代码详细演示了这种功能。

/**
 * @author: kl @kailing.pub
 * @date: 2019/5/30
 */
@SpringBootApplication
@RestController
public class Application {
    private final Logger logger = LoggerFactory.getLogger(Application.class);

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @Autowired
    private KafkaTemplate template;

    @GetMapping("/send/{input}")
    @Transactional(rollbackFor = RuntimeException.class)
    public void sendFoo(@PathVariable String input) throws Exception {
        ProducerRecord<String, String> record = new ProducerRecord<>("topic-kl", input);
        template.send(record);
    }

    @Autowired
    private KafkaListenerEndpointRegistry registry;

    @GetMapping("/stop/{listenerID}")
    public void stop(@PathVariable String listenerID){
        registry.getListenerContainer(listenerID).pause();
    }
    @GetMapping("/resume/{listenerID}")
    public void resume(@PathVariable String listenerID){
        registry.getListenerContainer(listenerID).resume();
    }
    @GetMapping("/start/{listenerID}")
    public void start(@PathVariable String listenerID){
        registry.getListenerContainer(listenerID).start();
    }
    @KafkaListener(id = "webGroup", topics = "topic-kl",autoStartup = "false")
    public String listen(String input) {
        logger.info("input value: {}", input);
        return "successful";
    }
}

在上面的代码中,listenerID就是@KafkaListener中的id值“webGroup”。项目启动好后,分别执行以下url,就能够看到效果了。

先发送一条消息:http://localhost:8081/send/ckl。由于autoStartup = "false",因此并不会看到有消息进入监听器。

接着启动监听器:http://localhost:8081/start/webGroup。能够看到有一条消息进来了。

暂停和继续消费的效果使用相似方法就能够测试出来了。

SendTo消息转发

前面的消息发送响应应用里面已经见过@SendTo,其实除了作发送响应语义外,@SendTo注解还能够带一个参数,指定转发的Topic队列。常见的场景如,一个消息须要作多重加工,不一样的加工耗费的cup等资源不一致,那么就能够经过跨不一样Topic和部署在不一样主机上的consumer来解决了。如:

@KafkaListener(id = "webGroup", topics = "topic-kl")
    @SendTo("topic-ckl")
    public String listen(String input) {
        logger.info("input value: {}", input);
        return input + "hello!";
    }

    @KafkaListener(id = "webGroup1", topics = "topic-ckl")
    public void listen2(String input) {
        logger.info("input value: {}", input);
    }

消息重试和死信队列的应用

除了上面谈到的经过手动Ack模式来控制消息偏移量外,其实Spring-kafka内部还封装了可重试消费消息的语义,也就是能够设置为当消费数据出现异常时,重试这个消息。并且能够设置重试达到多少次后,让消息进入预约好的Topic。也就是死信队列里。下面代码演示了这种效果:

@Autowired
    private KafkaTemplate template;

    @Bean
    public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
            ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
            ConsumerFactory<Object, Object> kafkaConsumerFactory,
            KafkaTemplate<Object, Object> template) {
        ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        configurer.configure(factory, kafkaConsumerFactory);
        //最大重试三次
        factory.setErrorHandler(new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(template), 3));
        return factory;
    }

    @GetMapping("/send/{input}")
    public void sendFoo(@PathVariable String input) {
        template.send("topic-kl", input);
    }

    @KafkaListener(id = "webGroup", topics = "topic-kl")
    public String listen(String input) {
        logger.info("input value: {}", input);
        throw new RuntimeException("dlt");
    }

    @KafkaListener(id = "dltGroup", topics = "topic-kl.DLT")
    public void dltListen(String input) {
        logger.info("Received from DLT: " + input);
    }

上面应用,在topic-kl监听到消息会,会触发运行时异常,而后监听器会尝试三次调用,当到达最大的重试次数后。消息就会被丢掉重试死信队列里面去。死信队列的Topic的规则是,业务Topic名字+“.DLT”。如上面业务Topic的name为“topic-kl”,那么对应的死信队列的Topic就是“topic-kl.DLT”

文末结语

最近业务上使用了kafka用到了Spring-kafka,因此系统性的探索了下Spring-kafka的各类用法,发现了不少好玩很酷的特性,好比,一个注解开启嵌入式的Kafka服务、像RPC调用同样的发送\响应语义调用、事务消息等功能。但愿此博文可以帮助那些正在使用Spring-kafka或即将使用的人少走一些弯路少踩一点坑。

相关文章
相关标签/搜索