Spring Boot Kafka概览、配置及优雅地实现发布订阅

本文属于原创,转载注明出处,欢迎关注微信小程序小白AI博客 微信公众号小白AI或者网站 https://xiaobaiai.nethtml

[TOC]前端

1 前言

本篇文章内容很全,很长,很细!不要心急,慢慢看!我都写完了,相信你看完确定能够的,有任何问题能够随时交流!java

本篇文章内容很全,很长,很细!不要心急,慢慢看!我都写完了,相信你看完确定能够的,有任何问题能够随时交流!git

本篇文章内容很全,很长,很细!不要心急,慢慢看!我都写完了,相信你看完确定能够的,有任何问题能够随时交流!github

本篇文章主要介绍Spring Kafka的经常使用配置、主题自动建立、发布消息到集群、订阅消息(群组)、流处理配置以及嵌入式Kafka作测试配置相关内容,最后经过两种方式去实现消息的发布和订阅功能,其中一种是基于Spring Integration方式。本文内容基于Spring Kafka2.3.3文档Spring Boot Kafka相关文档,Spring建立了一个名为Spring kafka的项目,它封装了Apache的kafka客户端部分(生产者/消费者/流处理等),以便在Spring项目中快速集成kafka,Spring-Kafka项目提供了Apache Kafka自动化配置,经过Spring Boot的简化配置(以spring.kafka.*做为前缀的配置参数),在Spring Boot中使用Kafka特别简单。而且Spring Boot还提供了一个嵌入式Kafka代理方便作测试。spring

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup

实现下面的所涉及到的功能实现,须要有以下环境:apache

  • Java运行或开发环境(JRE/JDK)
  • Kafka安装成功

更多的配置能够参考《Kafka,ZK集群开发或部署环境搭建及实验》这一篇文章。编程

本文尽可能作到阐述逻辑清晰,主要路线就是全局介绍Spring Kafka的主要功能及重点配置,而Spring Boot对Spring Kafka进一步简化配置,经过Spring Boot中的Kafka几大注解实现发布订阅功能,同时经过Spring Integration + 自定义Kafka配置方式实现一个较为复杂的Kafka发布订阅功能,本文经过本身实验和整理了较久的时间,涵盖了Spring Kafka大部份内容,但愿你们耐心读下来,有什么问题随时反馈,一块儿学习。

2 Spring Kafka功能概览

Spring Kafka、Spring Integration和Kafka客户端版本联系或者兼容性以下(截至2019年12月9日):json

Spring for Apache Kafka Spring Integration for Apache Kafka Version kafka-clients
2.3.x 3.2.x 2.3.1
2.2.x 3.1.x 2.0.1, 2.1.x, 2.2.x
2.1.x 3.0.x 1.0.x, 1.1.x, 2.0.0
1.3.x 2.3.x 0.11.0.x, 1.0.x
具体更多版本特色能够看官网,spring kafka当前最新为2.3.4版本。

Spring Kafka相关的注解有以下几个:bootstrap

注解类型 描述
EnableKafka 启用由AbstractListenerContainerFactory在封面(covers)下建立的Kafka监听器注解端点,用于配置类;
EnableKafkaStreams 启用默认的Kafka流组件
KafkaHandler 在用KafkaListener注解的类中,将方法标记为Kafka消息监听器的目标的注解
KafkaListener 将方法标记为指定主题上Kafka消息监听器的目标的注解
KafkaListeners 聚合多个KafkaListener注解的容器注解
PartitionOffset 用于向KafkaListener添加分区/初始偏移信息
TopicPartition 用于向KafkaListener添加主题/分区信息

如使用@EnableKafka能够监听AbstractListenerContainerFactory子类目标端点,如ConcurrentKafkaListenerContainerFactoryAbstractKafkaListenerContainerFactory的子类。

public class ConcurrentKafkaListenerContainerFactory<K,V>
extends AbstractKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<K,V>,K,V>
@Configuration
 @EnableKafka
 public class AppConfig {
        @Bean
        public ConcurrentKafkaListenerContainerFactory myKafkaListenerContainerFactory() {
                ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
                factory.setConsumerFactory(consumerFactory());
                factory.setConcurrency(4);
                return factory;
        }
        // other @Bean definitions
 }
@EnableKafka并非在Spring Boot中启用Kafka必须的,Spring Boot附带了Spring Kafka的自动配置,所以不须要使用显式的 @EnableKafka。若是想要本身实现Kafka配置类,则须要加上 @EnableKafka,若是你不想要Kafka自动配置,好比测试中,须要作的只是移除 KafkaAutoConfiguration
@SpringBootTest("spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration")

2.1 自动建立主题

💡 要在应用启动时就建立主题,能够添加NewTopic类型的Bean。若是该主题已经存在,则忽略Bean。

2.2 发送消息

Spring的KafkaTemplate是自动配置的,你能够直接在本身的Bean中自动链接它,以下例所示:

@Component
public class MyBean {

    private final KafkaTemplate kafkaTemplate;

    @Autowired
    public MyBean(KafkaTemplate kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    // ...

}

KafkaTemplate包装了一个生产者,并提供了向kafka主题发送数据的方便方法。提供异步和同步(发送阻塞)方法,异步(发送非阻塞)方法返回ListenableFuture,以此监听异步发送状态,成功仍是失败,KafkaTemplate提供以下接口:

ListenableFuture<SendResult<K, V>> sendDefault(V data);
ListenableFuture<SendResult<K, V>> sendDefault(K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, V data);
ListenableFuture<SendResult<K, V>> send(String topic, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);
ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);
ListenableFuture<SendResult<K, V>> send(Message<?> message);
Map<MetricName, ? extends Metric> metrics();
List<PartitionInfo> partitionsFor(String topic);
<T> T execute(ProducerCallback<K, V, T> callback);
// Flush the producer.
void flush();
interface ProducerCallback<K, V, T> {
    T doInKafka(Producer<K, V> producer);
}

sendDefault API 要求已向模板提供默认主题。部分API接受一个时间戳做为参数,并将该时间戳存储在记录中,如何存储用户提供的时间戳取决于Kafka主题上配置的时间戳类型,若是主题配置为使用CREATE_TIME,则记录用户指定的时间戳(若是未指定则生成)。若是将主题配置为使用LOG_APPEND_TIME,则忽略用户指定的时间戳,而且代理将添加本地代理时间。metricspartitionsFor方法委托给底层Producer上的相同方法。execute方法提供对底层生产者的直接访问

要使用模板,能够配置一个生产者工厂并在模板的构造函数中提供它。下面的示例演示了如何执行此操做:

@Bean
public ProducerFactory<Integer, String> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs());
}

@Bean
public Map<String, Object> producerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    // See https://kafka.apache.org/documentation/#producerconfigs for more properties
    return props;
}

@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
    // KafkaTemplate构造函数中输入生产者工厂配置
    return new KafkaTemplate<Integer, String>(producerFactory());
}

而后,要使用模板,能够调用其方法之一发送消息。

当你使用包含Message<?>参数的方法时,主题、分区和键信息在消息头中提供,有以下子项:

KafkaHeaders.TOPIC
KafkaHeaders.PARTITION_ID
KafkaHeaders.MESSAGE_KEY
KafkaHeaders.TIMESTAMP

如访问头部信息中某一项信息:

public void handleMessage(Message<?> message) throws MessagingException {
    LOGGER.debug("===Received Msg Topic: {}",  message.getHeaders().get(KafkaHeaders.TOPIC));
}

可选的功能是,可使用ProducerListener配置KafkaTemplate,以得到带有发送结果(成功或失败)的异步回调,而不是等待未来完成。如下列表显示了ProducerListener接口的定义:

public interface ProducerListener<K, V> {
    void onSuccess(String topic, Integer partition, K key, V value, RecordMetadata recordMetadata);
    void onError(String topic, Integer partition, K key, V value, Exception exception);
    boolean isInterestedInSuccess();
}

默认状况下,模板配置有LoggingProducerListener,它只记录错误,在发送成功时不执行任何操做。只有当isInterestedInSuccess返回true时才调用onSuccess
为了方便起见,若是你只想实现其中一个方法,那么将提供抽象ProducerListenerAdapter。对于isInterestedInSuccess,它返回false。下面演示了异步结果回调:

public void sendMessage(String msg) {
    LOGGER.info("===Producing message[{}]: {}", mTopic, msg);       
    ListenableFuture<SendResult<String, String>> future = mKafkaTemplate.send(mTopic, msg);
    future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
        @Override
        public void onSuccess(SendResult<String, String> result) {
            LOGGER.info("===Producing message success");  
        }

        @Override
        public void onFailure(Throwable ex) {
            LOGGER.info("===Producing message failed");  
        }

    });
}

若是但愿阻止式发送线程等待结果,能够调用futureget()方法。你可能但愿在等待以前调用flush(),或者为了方便起见,模板有一个带有autoFlush参数的构造函数,该构造函数在每次发送时都会致使模板flush()。不过,请注意,刷新可能会显著下降性能:

public void sendToKafka(final MyOutputData data) {
    final ProducerRecord<String, String> record = createRecord(data);

    try {
        template.send(record).get(10, TimeUnit.SECONDS);
        handleSuccess(data);
    }
    catch (ExecutionException e) {
        handleFailure(data, record, e.getCause());
    }
    catch (TimeoutException | InterruptedException e) {
        handleFailure(data, record, e);
    }
}

使用DefaultKafkaProducerFactory:

如上面使用KafkaTemplate中所示,ProducerFactory用于建立生产者。默认状况下,当不使用事务时,DefaultKafkaProducerFactory会建立一个供全部客户机使用的单例生产者,如KafkaProducer javadocs中所建议的那样。可是,若是对模板调用flush(),这可能会致使使用同一个生产者的其余线程延迟。从2.3版开始,DefaultKafkaProducerFactory有一个新属性producerPerThread。当设置为true时,工厂将为每一个线程建立(和缓存)一个单独的生产者,以免此问题。

producerPerThread为true时,当再也不须要生产者时,用户代码必须在工厂上调用 closeThreadBoundProducer()。这将实际关闭生产者并将其从 ThreadLocal中移除。调用reset()或destroy()不会清理这些生产者。

建立DefaultKafkaProducerFactory时,能够经过调用只接受属性映射的构造函数(请参阅使用KafkaTemplate中的示例)从配置中获取键和/或值序列化器类,或者序列化程序实例能够传递给DefaultKafkaProducerFactory构造函数(在这种状况下,全部生产者共享相同的实例)。或者,能够提供Supplier<Serializer> s(从版本2.3开始),用于为每一个生产者获取单独的Serializer实例:

@Bean
public ProducerFactory<Integer, CustomValue> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs(), null, () -> new CustomValueSerializer());
}

@Bean
public KafkaTemplate<Integer, CustomValue> kafkaTemplate() {
    return new KafkaTemplate<Integer, CustomValue>(producerFactory());
}

使用ReplyingKafkaTemplate:

版本2.1.3引入了KafkaTemplate的一个子类来提供请求/应答语义。这个类名为ReplyingKafkaTemplate,而且有一个方法(除了超类中的那些方法以外)。下面的列表显示了方法签名:

RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record);
RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record,
    Duration replyTimeout);

结果是一个ListenableFuture,它被结果异步填充(或者超时时出现异常)。结果还有一个sendFuture属性,这是调用KafkaTemplate.send()的结果。你可使用此Future肯定发送操做的结果。这里就不展开了。

2.3 接收消息

能够经过配置MessageListenerContainer并提供消息监听器或使用@KafkaListener注解来接收消息。

2.3.1 消息监听器

使用消息监听器容器(message listener container)时,必须提供监听器才能接收数据。目前有八个消息监听器支持的接口。下面的列表显示了这些接口:

// 使用自动提交或容器管理的提交方法之一时,使用此接口处理从Kafka 消费者 poll() 做接收的单个ConsumerRecord实例
public interface MessageListener<K, V> {
    void onMessage(ConsumerRecord<K, V> data);
}

// 使用手动提交方法之一时,使用此接口处理从Kafka 消费者 poll() 操做接收的单个ConsumerRecord实例
public interface AcknowledgingMessageListener<K, V> {
    void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment);
}

// 使用自动提交或容器管理的提交方法之一时,使用此接口处理从Kafka 消费者 poll() 操做接收的单个ConsumerRecord实例。提供对消费者对象的访问。
public interface ConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { 
    void onMessage(ConsumerRecord<K, V> data, Consumer<?, ?> consumer);
}

// 使用手动提交方法之一时,使用此接口处理从Kafka 消费者 poll() 操做接收的单个ConsumerRecord实例。提供对消费者对象的访问。
public interface AcknowledgingConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { 
    void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);
}

// 使用自动提交或容器管理的提交方法之一时,使用此接口处理从Kafka 消费者 poll() 操做接收的全部ConsumerRecord实例。使用此接口时不支持AckMode.RECORD,由于监听器已得到完整的批处理。
public interface BatchMessageListener<K, V> { 
    void onMessage(List<ConsumerRecord<K, V>> data);
}

// 使用手动提交方法之一时,使用此接口处理从Kafka 消费者 poll() 操做接收的全部ConsumerRecord实例。
public interface BatchAcknowledgingMessageListener<K, V> { 
    void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment);
}

// 使用自动提交或容器管理的提交方法之一时,使用此接口处理从Kafka 消费者 poll() 操做接收的全部ConsumerRecord实例。使用此接口时不支持AckMode.RECORD,由于监听器已得到完整的批处理。提供对使用者对象的访问。
public interface BatchConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { 
    void onMessage(List<ConsumerRecord<K, V>> data, Consumer<?, ?> consumer);
}

// 使用手动提交方法之一时,使用此接口处理从Kafka 消费者 poll() 操做接收的全部ConsumerRecord实例。提供对使用者对象的访问。
public interface BatchAcknowledgingConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { 
    void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);
}
上述消费者对象不是线程安全的。只能在调用侦听器的线程上调用其方法。
2.3.1.1 消息监听器容器

提供了两个MessageListenerContainer的实现:

  • KafkaMessageListenerContainer
  • ConcurrentMessageListenerContainer

KafkaMessageListenerContainer从单个线程上的全部主题或分区接收全部消息(即一个分区只能分配到一个消费者,一个消费者能够被分配多个分区)。ConcurrentMessageListenerContainer委托给一个或多个KafkaMessageListenerContainer实例,以提供多线程使用,从多线程上去处理主题或分区的全部消息。

从Spring Kafka2.2.7版开始,你能够将RecordInterceptor添加到侦听器容器中;在调用侦听器以容许检查或修改记录以前,将调用它。若是拦截器返回null,则不调用侦听器。侦听器是批处理侦听器时不调用侦听器。从2.3版开始,CompositeRecordInterceptor可用于调用多个拦截器。

默认状况下,使用事务时,侦听器在事务启动后调用。从2.3.4版开始,你能够设置侦听器容器的interceptBeforeTx属性,以便在事务启动以前调用侦听器。没有为批处理侦听器提供侦听器,由于Kafka已经提供了ConsumerInterceptor

2.3.1.2 使用KafkaMessageListenerContainer

有以下构造函数可用:

public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
                    ContainerProperties containerProperties)
public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
                    ContainerProperties containerProperties,
                    TopicPartitionOffset... topicPartitions)

每一个都获取一个ConsumerFactory以及有关主题和分区的信息,以及ContainerProperties对象中的其余配置。ConcurrentMessageListenerContainer(稍后介绍)使用第二个构造函数跨使用者实例分发TopicPartitionOffsetContainerProperties具备如下构造函数:

public ContainerProperties(TopicPartitionOffset... topicPartitions)
public ContainerProperties(String... topics)
public ContainerProperties(Pattern topicPattern)

第一个构造函数接受一个TopicPartitionOffset参数数组来显式地指示容器要使用哪些分区(使用消费者的 assign()方法)和可选的初始偏移量。默认状况下,正值是绝对偏移量。默认状况下,负值是相对于分区内的当前最后偏移量。提供了TopicPartitionOffset的构造函数,该构造函数接受一个附加的布尔参数。若是是true,则初始偏移(正偏移或负偏移)相对于该消耗器的当前位置。容器启动时应用偏移量。第二个是主题数组,Kafka基于group.id属性:在组中分布分区来分配分区。第三个使用regex表达式来选择主题。

要将MessageListener分配给容器,能够在建立容器时使用ContainerProps.setMessageListener方法。下面的示例演示了如何执行此操做:

ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
containerProps.setMessageListener(new MessageListener<Integer, String>() {
    ...
});
DefaultKafkaConsumerFactory<Integer, String> cf =
                        new DefaultKafkaConsumerFactory<>(consumerProps());
KafkaMessageListenerContainer<Integer, String> container =
                        new KafkaMessageListenerContainer<>(cf, containerProps);
return container;

注意当建立一个Defaultkafkafkaconsumerfactory时,使用构造器,该构造器仅以其特性为基础,就意味着从配置中获取了key/value的Deserializer类别。或者,反序列化程序实例能够传递给key/value的DefaultKafkaConsumerFactory构造函数,在这种状况下,全部消费者共享相同的实例。另外一个选项是提供Supplier<Deserializer>s(从版本2.3开始),用于为每一个使用者获取单独的反序列化程序实例:

DefaultKafkaConsumerFactory<Integer, CustomValue> cf =
                        new DefaultKafkaConsumerFactory<>(consumerProps(), null, () -> new CustomValueDeserializer());
KafkaMessageListenerContainer<Integer, String> container =
                        new KafkaMessageListenerContainer<>(cf, containerProps);
return container;

有关能够设置的各类属性的更多信息,请参阅Javadoc 中ContainerProperties

从版本Spring Kafka 2.1.1开始,一个名为logContainerConfig的新属性就可用了。当启用true和INFO日志记录时,每一个侦听器容器都会写入一条日志消息,总结其配置属性。

例如,要将日志级别更改成INFO,可使用containerProperties.setCommitLogLevel(LogIfLevelEnabled.level.INFO)

从版本Spring Kafka 2.2开始,添加了名为missingtopicsfailal的新容器属性(默认值:true)。若是代理上不存在任何客户端发布或订阅涉及到的主题,这将阻止容器启动。若是容器配置为侦听主题模式(regex),则不适用。之前,容器线程在consumer.poll()方法中循环,等待在记录许多消息时出现主题。除了日志,没有迹象代表有问题。要恢复之前的行为,能够将属性设置为false,这个时候,Broker设置项allow.auto.create.topics=true,且这个容器属性为false,则会自动建立不存在的topic。

2.3.1.3 使用 ConcurrentMessageListenerContainer

单个构造函数相似于第一个KafkaListenerContainer构造函数。下面的列表显示了构造函数的签名:

public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
                            ContainerProperties containerProperties)

它还有一个并发属性。例如,container.setConcurrency(3)即表示建立三个KafkaMessageListenerContainer实例。对于第一个构造函数,Kafka使用它的组管理功能将分区分布到消费者之间。

当监听多个主题时,默认的分区分布可能不是你指望的那样。例如,若是你有三个主题,每一个主题有五个分区,而且但愿使用 concurrency=15,那么你只看到五个活动的消费者,每一个消费者从每一个主题中分配一个分区,其余十个消费者处于空闲状态。这是由于默认的Kafka PartitionAssignorRangeAssignor(参见其Javadoc)。对于这种状况,你可能须要考虑改用 RoundRobinAssignor,它将分区分布到全部使用者。而后,为每一个使用者分配一个主题或分区。若要更改 PartitionAssignor,你能够在提供给 DefaultKafkaConsumerFactory的属性中设置 partition.assignment.strategy消费者配置参数( ConsumerConfigs.PARTITION_ASSIGNMENT_STRATEGY_CONFIG)。

使用Spring Boot时,能够按以下方式分配设置策略:

spring.kafka.consumer.properties.partition.assignment.strategy=\
org.apache.kafka.clients.consumer.RoundRobinAssignor

对于第二个构造函数,ConcurrentMessageListenerContainerTopicPartition实例分布在委托KafkaMessageListenerContainer实例上。

例如,若是提供了六个TopicPartition实例,并发性为3;每一个容器获得两个分区。对于五个TopicPartition实例,两个容器获得两个分区,第三个容器获得一个分区。若是并发性大于TopicPartitions的数量,则会向下调整并发性,以便每一个容器得到一个分区。调整分区的方式可使用命令行工具kafka-topics.sh查询和调整主题上的分区数。还能够添加一个NewTopic Bean,若是NewTopic设定的数目大于当前数目,spring boot的自动配置的KafkaAdmin将向上调整分区。

client.id属性(若是已设置)将附加 -n,其中n是对应于并发的消费者实例。当启用JMX时,这是为MBeans提供惟一名称所必需的。

从版本Spring Kafka 1.3开始,MessageListenerContainer提供了对底层KafkaConsumer的度量的访问。对于ConcurrentMessageListenerContainermetrics()方法返回全部目标KafkaMessageListenerContainer实例的度量(metrics)。根据为底层KafkaConsumer提供的client-id度量被分组到Map<MetricName, ?extends Metric>

从2.3版开始,ContainerProperties提供了一个idleBetweenPolls选项,容许侦听器容器中的主循环在KafkaConsumer.poll()调用之间睡眠。从提供的选项中选择实际睡眠间隔做为最小值,而且选择max.poll.interval.ms 消费者配置和当前记录批处理时间之间的差别。

2.3.1.4 提交偏移量

提供了几个提交偏移量的选项。若是enable.auto.commit使用者属性为true,则Kafka将根据其配置自动提交偏移量。若是为false,则容器支持多个AckMode设置(在下一个列表中描述)。默认的确认模式是批处理。从2.3版开始,框架将enable.auto.commit设置为false,除非在配置中显式设置。之前,若是未设置属性,则使用Kafka默认值(true)。消费者 poll()方法返回一个或多个ConsumerRecords。为每一个记录调用MessageListener。如下列表描述了容器对每一个AckMode采起的操做:

  • RECORD: 当侦听器在处理记录后返回时提交偏移量。
  • BATCH: 处理完poll()返回的全部记录后提交偏移量。
  • TIME: 在处理完poll()返回的全部记录后提交偏移量,只要超过上次提交后的ackTime
  • COUNT: 在处理完poll()返回的全部记录后提交偏移量,只要上次提交后收到ackCount记录。
  • COUNT_TIME: 相似于TIMECOUNT,但若是两个条件都为true,则执行提交。
  • MANUAL: 消息侦听器负责acknowledge()Acknowledgment。以后,应用与BATCH相同的语义。
  • MANUAL_IMMEDIATE: 侦听器调用Acknowledgement.acknowledge()方法时当即提交偏移量。
MANUAL和MANUAL_IMMEDIATE 要求侦听器是 AcknowledgingMessageListenerBatchAcknowledgingMessageListener。请参见消息侦听器。

根据syncCommits容器属性,使用消费者上的commitSync()commitAsync()方法。默认状况下,syncCommits为true;另请参阅setSyncCommitTimeout。请参阅setCommitCallback以获取异步提交的结果;默认回调是LoggingCommitCallback,它记录错误(以及调试级别的成功)。

由于侦听器容器有本身的提交偏移的机制,因此它但愿Kafka ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG为false。从2.3版开始,除非在使用者工厂或容器的使用者属性重写中特别设置,不然它将无条件地将其设置为false。

Acknowledgment有如下方法:

public interface Acknowledgment {
    void acknowledge();
}

此方法使侦听器能够控制什么时候提交偏移。

从版本2.3开始,确认接口有两个附加方法nack(long sleep)nack(int index, long sleep)。第一个用于记录侦听器,第二个用于批处理侦听器。为侦听器类型调用错误的方法将引起IllegalStateException

nack()只能在调用侦听器的消费者线程上调用。

使用批处理侦听器时,能够在发生故障的批内指定索引。调用nack()时,将在对失败和丢弃的记录的分区执行索引和查找以前提交记录的偏移量,以便在下次poll()时从新传递这些偏移量。这是对SeekToCurrentBatchErrorHandler的改进,SeekToCurrentBatchErrorHandler只能查找整个批次以便从新交付。

注意:经过组管理使用分区分配时,确保sleep参数(加上处理上一次轮询记录所花费的时间)小于 consumer max.poll.interval.ms属性很是重要。
2.3.1.5 侦听器容器自动启动和手动启动

侦听器容器实现了SmartLifecycle(经过SmartLifecycle在Spring加载和初始化全部bean后,接着执行一些任务或者启动须要的异步服务),默认状况下autoStartuptrue。容器在后期启动(Integer.MAX-VALUE - 100)。实现SmartLifecycle以处理来自侦听器的数据的其余组件应该在较早的阶段启动。-100为之后的阶段留出了空间,使组件可以在容器以后自动启动。好比咱们经过@Bean将监听器容器交给Spring管理,这个时候经过SmartLifecycle自动执行了初始化的任务,可是当咱们手动经过new监听器容器实例,则后初始化则不会执行,好比KafkaMessageListenerContainer实例须要手动执行start()

autoStartup在手动执行start中设置true与false没有做用,能够参见@KafkaListener声明周期管理这一小节。

2.3.2 @KafkaListener注解

2.3.2.1 Record Listeners

@KafkaListener注解用于将bean方法指定为侦听器容器的侦听器。bean包装在一个MessagingMessageListenerAdapter中,该适配器配置有各类功能,如转换器,用于转换数据(若有必要)以匹配方法参数。经过使用属性占位符(${…}),或者可使用SpEL(#{…​})配置注释上的大多数属性。有关更多信息,请参阅Javadoc。

@KafkaListener:

  • id:listener惟一id,当GroupId没有被配置的时候,默认id为自动产生,此值指定后会覆盖group id。
  • containerFactory:上面提到了@KafkaListener区分单数据仍是多数据消费只须要配置一下注解的containerFactory属性就能够了,这里面配置的是监听容器工厂,也就是ConcurrentKafkaListenerContainerFactory,配置Bean名称
  • topics:须要监听的Topic,可监听多个,能够是表达式或者占位符关键字或者直接是主题名称,如多个主题监听:{"topic1" , "topic2"}
  • topicPattern: 此侦听器的主题模式。条目能够是“主题模式”、“属性占位符键”或“表达式”。框架将建立一个容器,该容器订阅与指定模式匹配的全部主题,以获取动态分配的分区。模式匹配将针对检查时存在的主题周期性地执行。表达式必须解析为主题模式(支持字符串或模式结果类型)。这使用组管理,Kafka将为组成员分配分区。
  • topicPartitions:用于使用手动主题/分区分配时
  • errorHandler:监听异常处理器,配置Bean名称,默认为空
  • groupId:消费组ID
  • idIsGroup:id是否为GroupId
  • clientIdPrefix:消费者Id前缀
  • beanRef:真实监听容器的Bean名称,须要在 Bean名称前加 "__"

@KafkaListener注解为简单的POJO侦听器提供了一种机制。下面的示例演示如何使用它:

public class Listener {
    @KafkaListener(id = "foo", topics = "myTopic", clientIdPrefix = "myClientId")
    public void listen(String data) {
        ...
    }
}

此机制生效须要@Configuration类之一上的@EnableKafka注解和用于配置基础ConcurrentMessageListenerContainer的侦听器容器工厂。默认状况下,须要名为kafkaListenerContainerFactory的bean。如下示例演示如何使用ConcurrentMessageListenerContain:

@Configuration
@EnableKafka
public class KafkaConfig {

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
                        kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

    @Bean
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
        ...
        return props;
    }
}

注意,要设置容器属性,必须在工厂上使用getContainerProperties()方法。它用做注入容器的实际属性的模板。

从版本2.1.1开始,如今能够为注解建立的消费者设置client.id属性。clientdprefix的后缀是-n,其中n是一个整数,表示使用并发时的容器号。

从2.2版开始,如今能够经过使用批注自己的属性来重写容器工厂的并发性和自动启动属性。属性能够是简单值、属性占位符或SpEL表达式。下面的示例演示了如何执行此操做:

@KafkaListener(id = "myListener", topics = "myTopic",
        autoStartup = "${listen.auto.start:true}", concurrency = "${listen.concurrency:3}")
public void listen(String data) {
    ...
}

你还可使用显式主题和分区(以及可选的初始偏移量)配置POJO侦听器。下面的示例演示了如何执行此操做:

@KafkaListener(id = "thing2", topicPartitions =
        { @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
          @TopicPartition(topic = "topic2", partitions = "0",
             partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
        })
public void listen(ConsumerRecord<?, ?> record) {
    ...
}

你能够在partitionspartitionOffsets属性中指定每一个分区,但不能同时指定二者。

使用手动AckMode时,还能够向侦听器提供Acknowledgment。下面的示例还演示了如何使用不一样的容器工厂:

@KafkaListener(id = "cat", topics = "myTopic",
          containerFactory = "kafkaManualAckListenerContainerFactory")
public void listen(String data, Acknowledgment ack) {
    ...
    ack.acknowledge();
}

最后,能够从消息头得到有关消息的元数据。你可使用如下头名称来检索消息头内容:

KafkaHeaders.OFFSET
KafkaHeaders.RECEIVED_MESSAGE_KEY
KafkaHeaders.RECEIVED_TOPIC
KafkaHeaders.RECEIVED_PARTITION_ID
KafkaHeaders.RECEIVED_TIMESTAMP
KafkaHeaders.TIMESTAMP_TYPE

示例:

@KafkaListener(id = "qux", topicPattern = "myTopic1")
public void listen(@Payload String foo,
        @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key,
        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
        @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
        @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts
        ) {
    ...
}
2.3.2.2 批处理侦听器

从版本1.1开始,能够配置@KafkaListener方法来接收从消费者接收的整批消费者记录。要将侦听器容器工厂配置为建立批处理侦听器,能够设置batchListener属性。下面的示例演示了如何执行此操做:

@Bean
public KafkaListenerContainerFactory<?, ?> batchFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setBatchListener(true);
    return factory;
}

如下示例显示如何接收有效载荷列表:

@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list) {
    ...
}

主题、分区、偏移量等在与有效负载并行的头中可用。下面的示例演示如何使用标题:

@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list,
        @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) List<Integer> keys,
        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
        @Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
        @Header(KafkaHeaders.OFFSET) List<Long> offsets) {
    ...
}

或者,您能够接收消息列表Message<?>对象,其中包含每一个偏移量和每一个消息中的其余详细信息,但它必须是惟一的参数(除了使用手动提交时的Acknowledgment和/Consumer<?, ?>参数)。下面的示例演示如何执行此操做:

@KafkaListener(id = "listMsg", topics = "myTopic", containerFactory = "batchFactory")
public void listen14(List<Message<?>> list) {
    ...
}

@KafkaListener(id = "listMsgAck", topics = "myTopic", containerFactory = "batchFactory")
public void listen15(List<Message<?>> list, Acknowledgment ack) {
    ...
}

@KafkaListener(id = "listMsgAckConsumer", topics = "myTopic", containerFactory = "batchFactory")
public void listen16(List<Message<?>> list, Acknowledgment ack, Consumer<?, ?> consumer) {
    ...
}

在这种状况下,不会对有效载荷执行转换。若是BatchMessagingMessageConverter配置了RecordMessageConverter,则还能够向消息参数添加泛型类型,并转换有效负载。有关详细信息,请参阅使用批处理侦听器的负载转换。

你还能够收到一个ConsumerRecord<?, ?>对象,但它必须是惟一的参数(当使用手动提交或Consumer<?, ?>参数时,除了可选的Acknowledgment)。下面的示例演示了如何执行此操做:

@KafkaListener(id = "listCRs", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<Integer, String>> list) {
    ...
}

@KafkaListener(id = "listCRsAck", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<Integer, String>> list, Acknowledgment ack) {
    ...
}

从版本2.2开始,侦听器能够接收poll()方法返回的完整的ConsumerRecords<?, ?>对象,容许侦听器访问其余方法,例如partitions()(返回列表中的TopicPartition实例)和records(TopicPartition)(获取选择性记录)。一样,这必须是惟一的参数(当使用手动提交或Consumer<?, ?>参数时,除了可选的Acknowledgment)。下面的示例演示了如何执行此操做:

@KafkaListener(id = "pollResults", topics = "myTopic", containerFactory = "batchFactory")
public void pollResults(ConsumerRecords<?, ?> records) {
    ...
}

2.3.3 @KafkaListener@Payload验证

从2.2版开始,如今更容易添加验证程序来验证@KafkaListener`@Payload参数。之前,你必须配置一个自定义的DefaultMessageHandlerMethodFactory`并将其添加到注册器中。如今,你能够将验证器添加到注册器自己。如下代码说明了如何执行此操做:

@Configuration
@EnableKafka
public class Config implements KafkaListenerConfigurer {

    ...

    @Override
    public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
      registrar.setValidator(new MyValidator());
    }
}

当你在Spring Boot使用validation starter,会自动配置LocalValidatorFactoryBean,以下例所示:

@Configuration
@EnableKafka
public class Config implements KafkaListenerConfigurer {

    @Autowired
    private LocalValidatorFactoryBean validator;
    ...

    @Override
    public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
      registrar.setValidator(this.validator);
    }
}

如下示例演示如何验证:

public static class ValidatedClass {

  @Max(10)
  private int bar;

  public int getBar() {
    return this.bar;
  }

  public void setBar(int bar) {
    this.bar = bar;
  }

}
@KafkaListener(id="validated", topics = "annotated35", errorHandler = "validationErrorHandler",
      containerFactory = "kafkaJsonListenerContainerFactory")
public void validatedListener(@Payload @Valid ValidatedClass val) {
    ...
}

@Bean
public KafkaListenerErrorHandler validationErrorHandler() {
    return (m, e) -> {
        ...
    };
}

2.3.4 从新平衡监听者

ContainerProperties有一个名为consumerRebalanceListener的属性,该属性接受Kafka客户端的consumerRebalanceListener接口的实现。若是未提供此属性,则容器将配置日志侦听器,该侦听器将在信息级别记录从新平衡事件。该框架还添加了一个子接口ConsumerRawareRebalanceListener。如下列表显示了ConsumerRawareRebalanceListener接口定义:

public interface ConsumerAwareRebalanceListener extends ConsumerRebalanceListener {
    void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
    void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
    void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
}

2.3.5 转发监听者消息

从2.0版开始,若是还使用@SendTo注解注释@KafkaListener,而且方法调用返回结果,则结果将转发到@SendTo指定的主题。如:

@KafkaListener(topics = "annotated21")
@SendTo("!{request.value()}") // runtime SpEL
public String replyingListener(String in) {
    ...
}

@KafkaListener(topics = "${some.property:annotated22}")
@SendTo("#{myBean.replyTopic}") // config time SpEL
public Collection<String> replyingBatchListener(List<String> in) {
    ...
}

@KafkaListener(topics = "annotated23", errorHandler = "replyErrorHandler")
@SendTo("annotated23reply") // static reply topic definition
public String replyingListenerWithErrorHandler(String in) {
    ...
}
...
@KafkaListener(topics = "annotated25")
@SendTo("annotated25reply1")
public class MultiListenerSendTo {

    @KafkaHandler
    public String foo(String in) {
        ...
    }

    @KafkaHandler
    @SendTo("!{'annotated25reply2'}")
    public String bar(@Payload(required = false) KafkaNull nul,
            @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) int key) {
        ...
    }

}

2.3.6 @KafkaListener生命周期管理

@KafkaListener注解建立的侦听器容器不是应用程序上下文中的bean。相反,它们是用KafkaListenerEndpointRegistry类型的基础设施bean注册的。这个bean由框架自动声明并管理容器的生命周期;它将自动启动任何autoStartup设置为true的容器。全部容器工厂建立的全部容器必须处于同一phase。有关详细信息,请参阅侦听器容器自动启动。你可使用注册表以编程方式管理生命周期。启动或中止注册表将启动或中止全部已注册的容器。或者,能够经过使用单个容器的id属性来获取对该容器的引用。能够在批注上设置autoStartup ,这将覆盖容器工厂中配置的默认设置(setAutoStartup(true))。你能够从应用程序上下文中获取对bean的引用,例如自动链接,以管理其注册的容器。如下示例说明了如何执行此操做:

@KafkaListener(id = "myContainer", topics = "myTopic", autoStartup = "false")
public void listen(...) { ... }
@Autowired
private KafkaListenerEndpointRegistry registry;

...

    this.registry.getListenerContainer("myContainer").start();

...

注册表只维护其管理的容器的生命周期;声明为bean的容器不受注册表管理,能够从应用程序上下文中获取。能够经过调用注册表的getListenerContainers()方法来获取托管容器的集合。Spring Kafka版本2.2.5添加了一个方便方法getAllListenerContainers(),它返回全部容器的集合,包括由注册表管理的容器和声明为bean的容器。返回的集合将包括任何已初始化的原型bean,但它不会初始化任何延迟bean声明。

2.4 流处理

Spring for Apache Kafka提供了一个工厂bean来建立StreamsBuilder对象并管理其流的生命周期。只要kafka流在classpath上而且kafka流经过@EnableKafkaStreams注解开启,Spring Boot就会自动配置所需的KafkaStreamsConfiguration bean。

启用Kafka流意味着必须设置应用程序id和引导服务器(bootstrap servers)。前者可使用spring.kafka.streams.application-id配置,若是未设置,则默认为spring.application.name。后者能够全局设置,也能够专门为流覆写。

使用专用属性可使用其余几个属性;可使用spring.Kafka.streams.properties命名空间设置其余任意Kafka属性。有关详细信息,Additional Kafka Properties

默认状况下,由它建立的StreamBuilder对象管理的流将自动启动。可使用spring.kafka.streams.auto-startup属性自定义此行为。

要使用工厂bean,只需将StreamsBuilder链接到@bean,以下例所示:

@Configuration(proxyBeanMethods = false)
@EnableKafkaStreams
public static class KafkaStreamsExampleConfiguration {

    @Bean
    public KStream<Integer, String> kStream(StreamsBuilder streamsBuilder) {
        KStream<Integer, String> stream = streamsBuilder.stream("ks1In");
        stream.map((k, v) -> new KeyValue<>(k, v.toUpperCase())).to("ks1Out",
                Produced.with(Serdes.Integer(), new JsonSerde<>()));
        return stream;
    }

}

默认状况下,由它建立的StreamBuilder对象管理的流将自动启动。可使用spring.kafka.streams.auto-startup属性自定义此行为。

2.5 附加配置

自动配置支持的属性显示在公用应用程序属性中。注意,在大多数状况下,这些属性(连字符或驼峰样式)直接映射到Apache Kafka点式属性。有关详细信息,请参阅Apache Kafka文档。

前面提到的几个属性应用于全部组件(生产者、消费者、管理员和流),但若是但愿使用不一样的值,则能够在组件级别指定。Apache Kafka指定重要性为HIGHMEDIUMLOW的属性。Spring Boot自动配置支持全部高重要性属性、某些选定的中、低属性以及任何没有默认值的属性。

只有Kafka支持的属性的一个子集能够经过KafkaProperties类直接使用,若是要使用不直接支持的其余属性配置生产者或消费者,请使用如下属性:

spring.kafka.properties.prop.one=first
spring.kafka.admin.properties.prop.two=second
spring.kafka.consumer.properties.prop.three=third
spring.kafka.producer.properties.prop.four=fourth
spring.kafka.streams.properties.prop.five=fifth

上面的参数设置示例将公共prop.oneKafka属性设置为first(适用于生产者、消费者和管理员),prop.two admin属性设置为secondprop.three consumer属性设置为thirdprop.four producer属性设置为fourthprop.five streams属性设置为fifth

你还能够配置Spring Kafka JsonDeserializer,以下所示:

spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.value.default.type=com.example.Invoice
spring.kafka.consumer.properties.spring.json.trusted.packages=com.example,org.acme

相似地,能够禁用JsonSerializer在头中发送类型信息的默认行为:

spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties.spring.json.add.type.headers=false
注意: 以这种方式设置的属性将覆盖Spring Boot显式支持的任何配置项。

2.6 使用Embdded Kafka作测试

Spring for Apache Kafka提供了一种使用嵌入式Apache Kafka代理测试项目的便捷方法。要使用此功能,请使用Spring Kafka测试模块中的@EmbeddedKafka注解测试类。有关更多信息,请参阅Spring For Apache Kafka参考手册。

要使Spring Boot自动配置与前面提到的嵌入式Apache Kafka代理一块儿工做,须要将嵌入式代理地址(由EmbeddedKafkaBroker填充)的系统属性从新映射到Apache Kafka的Spring Boot配置属性中。有几种方法能够作到这一点:

  • 提供系统属性以将嵌入的代理地址映射到测试类中的spring.kafka.bootstrap-servers:
static {
    System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY, "spring.kafka.bootstrap-servers");
}
  • @EmbeddedKafka注解上配置属性名:
@EmbeddedKafka(topics = "someTopic",
        bootstrapServersProperty = "spring.kafka.bootstrap-servers")
  • 在配置属性中使用占位符:
spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}

2.7 Spring Integration支持

Spring Integration也有Kafka的适配器,所以咱们能够很方便的采用Spring Integration去实现发布订阅,固然你也能够不使用Spring Integration。

Spring Integration是什么,具体有什么做用,能够参考另外一篇文章《Spring Integration最详解》。

3 Spring Kafka配置参数

这里对全部配置作个说明的是,spring kafka配置分全局配置和子模块配置,子模块配置会复写全局配置,好比SSL认证能够全局配置,可是也能够在每一个子模块,如消费者、生产者、流式处理中均可以单独配置SSL(多是微服务部署,消费者和生产者不在同一个应用中)。这里重点介绍生产者和消费者配置吧,其余就不展开了,用到的时候再去查找和补充。

3.1 全局配置

# 用逗号分隔的主机:端口对列表,用于创建到Kafka群集的初始链接。覆盖全局链接设置属性
spring.kafka.bootstrap-servers
# 在发出请求时传递给服务器的ID。用于服务器端日志记录
spring.kafka.client-id,默认无
# 用于配置客户端的其余属性,生产者和消费者共有的属性
spring.kafka.properties.*
# 消息发送的默认主题,默认无
spring.kafka.template.default-topic

3.2 生产者

Spring Boot中,Kafka 生产者相关配置(全部配置前缀为spring.kafka.producer.):

# 生产者要求Leader在考虑请求完成以前收到的确认数
spring.kafka.producer.acks
# 默认批量大小。较小的批处理大小将使批处理不太常见,并可能下降吞吐量(批处理大小为零将彻底禁用批处理)
spring.kafka.producer.batch-size
spring.kafka.producer.bootstrap-servers
# 生产者可用于缓冲等待发送到服务器的记录的总内存大小。
spring.kafka.producer.buffer-memory
# 在发出请求时传递给服务器的ID。用于服务器端日志记录。
spring.kafka.producer.client-id
# 生产者生成的全部数据的压缩类型
spring.kafka.producer.compression-type
# 键的序列化程序类
spring.kafka.producer.key-serializer
spring.kafka.producer.properties.*
# 大于零时,启用失败发送的重试次数
spring.kafka.producer.retries
spring.kafka.producer.ssl.key-password
spring.kafka.producer.ssl.key-store-location
spring.kafka.producer.ssl.key-store-password
spring.kafka.producer.ssl.key-store-type
spring.kafka.producer.ssl.protocol
spring.kafka.producer.ssl.trust-store-location
spring.kafka.producer.ssl.trust-store-password
spring.kafka.producer.ssl.trust-store-type
# 非空时,启用对生产者的事务支持
spring.kafka.producer.transaction-id-prefix
spring.kafka.producer.value-serializer

3.3 消费者

Spring Boot中,Kafka 消费者相关配置(全部配置前缀为spring.kafka.consumer.):

# 若是“enable.auto.commit”设置为true,设置消费者偏移自动提交到Kafka的频率,默认值无,单位毫秒(ms)
spring.kafka.consumer.auto-commit-interval
# 当Kafka中没有初始偏移或服务器上再也不存在当前偏移时策略设置,默认值无,latest/earliest/none三个值设置
# earliest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
# latest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
# none topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
spring.kafka.consumer.auto-offset-reset
# 用逗号分隔的主机:端口对列表,用于创建到Kafka群集的初始链接。覆盖全局链接设置属性
spring.kafka.consumer.bootstrap-servers
# 在发出请求时传递给服务器的ID,用于服务器端日志记录
spring.kafka.consumer.client-id
# 消费者的偏移量是否在后台按期提交
spring.kafka.consumer.enable-auto-commit
# 若是没有足够的数据来当即知足“fetch-min-size”的要求,则服务器在取回请求以前阻塞的最大时间量
spring.kafka.consumer.fetch-max-wait
# 服务器应为获取请求返回的最小数据量。
spring.kafka.consumer.fetch-min-size
# 标识此消费者所属的默认消费者组的惟一字符串
spring.kafka.consumer.group-id
# 消费者协调员的预期心跳间隔时间。
spring.kafka.consumer.heartbeat-interval
# 用于读取以事务方式写入的消息的隔离级别。
spring.kafka.consumer.isolation-level
# 密钥的反序列化程序类
spring.kafka.consumer.key-deserializer
# 在对poll()的单个调用中返回的最大记录数。
spring.kafka.consumer.max-poll-records
# 用于配置客户端的其余特定于消费者的属性。
spring.kafka.consumer.properties.*
# 密钥存储文件中私钥的密码。
spring.kafka.consumer.ssl.key-password
# 密钥存储文件的位置。
spring.kafka.consumer.ssl.key-store-location
# 密钥存储文件的存储密码。
spring.kafka.consumer.ssl.key-store-password
# 密钥存储的类型,如JKS
spring.kafka.consumer.ssl.key-store-type
# 要使用的SSL协议,如TLSv1.2, TLSv1.1, TLSv1
spring.kafka.consumer.ssl.protocol
# 信任存储文件的位置。
spring.kafka.consumer.ssl.trust-store-location
# 信任存储文件的存储密码。
spring.kafka.consumer.ssl.trust-store-password
# 信任存储区的类型。
spring.kafka.consumer.ssl.trust-store-type
# 值的反序列化程序类。
spring.kafka.consumer.value-deserializer

3.4 监听器

Spring Boot中,Kafka Listener相关配置(全部配置前缀为spring.kafka.listener.):

# ackMode为“COUNT”或“COUNT_TIME”时偏移提交之间的记录数
spring.kafka.listener.ack-count=
spring.kafka.listener.ack-mode
spring.kafka.listener.ack-time
spring.kafka.listener.client-id
spring.kafka.listener.concurrency
spring.kafka.listener.idle-event-interval
spring.kafka.listener.log-container-config
# 若是Broker上不存在至少一个配置的主题(topic),则容器是否没法启动,
# 该设置项结合Broker设置项allow.auto.create.topics=true,若是为false,则会自动建立不存在的topic
spring.kafka.listener.missing-topics-fatal=true
# 非响应消费者的检查间隔时间。若是未指定持续时间后缀,则将使用秒做为单位
spring.kafka.listener.monitor-interval
spring.kafka.listener.no-poll-threshold
spring.kafka.listener.poll-timeout
spring.kafka.listener.type

3.5 管理

spring.kafka.admin.client-id
# 若是启动时代理不可用,是否快速失败
spring.kafka.admin.fail-fast=false
spring.kafka.admin.properties.*
spring.kafka.admin.ssl.key-password
spring.kafka.admin.ssl.key-store-location
spring.kafka.admin.ssl.key-store-password
spring.kafka.admin.ssl.key-store-type
spring.kafka.admin.ssl.protocol
spring.kafka.admin.ssl.trust-store-location
spring.kafka.admin.ssl.trust-store-password
spring.kafka.admin.ssl.trust-store-type

3.6 受权服务(JAAS)

spring.kafka.jaas.control-flag=required
spring.kafka.jaas.enabled=false
spring.kafka.jaas.login-module=com.sun.security.auth.module.Krb5LoginModule
spring.kafka.jaas.options.*

3.7 SSL认证

spring.kafka.ssl.key-password
spring.kafka.ssl.key-store-location
spring.kafka.ssl.key-store-password
spring.kafka.ssl.key-store-type
spring.kafka.ssl.protocol
spring.kafka.ssl.trust-store-location
spring.kafka.ssl.trust-store-password
spring.kafka.ssl.trust-store-type

3.8 Stream流处理

spring.kafka.streams.application-id
spring.kafka.streams.auto-startup
spring.kafka.streams.bootstrap-servers
spring.kafka.streams.cache-max-size-buffering
spring.kafka.streams.client-id
spring.kafka.streams.properties.*
spring.kafka.streams.replication-factor
spring.kafka.streams.ssl.key-password
spring.kafka.streams.ssl.key-store-location
spring.kafka.streams.ssl.key-store-password
spring.kafka.streams.ssl.key-store-type
spring.kafka.streams.ssl.protocol
spring.kafka.streams.ssl.trust-store-location
spring.kafka.streams.ssl.trust-store-password
spring.kafka.streams.ssl.trust-store-type
spring.kafka.streams.state-dir

4 Kafka订阅发布基本特性回顾

  • 同一消费组下全部消费者协同消费订阅主题的全部分区

    • 同消费组,多消费者订阅单主题单分区,则分区只会分配给其中一个消费者,除非这个消费者挂掉,才会分配给其余一个消费者消费消息,意思就是其余消费者在旁边看着吃东西
    • 同消费组,N个消费者订阅单主题N个分区,则默认每一个消费者都会被分配一个分区
    • 同消费组,N个消费者订阅单主题M个分区,当M > N时,则会有消费者多分配多于一个分区的状况;当M < N时,则会有空闲消费者,相似第一条
    • 全部上面所说的消费者实例能够是线程方式或者是进程方式存在,所说的分区分配机制叫作重平衡(rebalance)
    • 当消费者内成员个数发生变化会触发重平衡;订阅的主题个数发生变化会触发重平衡;订阅的主题分区个数发生变化会触发重平衡;
    • 总之就是一个分区只能分配到一个消费者,一个消费者能够被分配多个分区
  • 消费者offset管理机制

    • 每一个主题分区中的消息都有一个惟一偏移值,具备前后顺序,与消费者具备对应关系,消费者每消费一条消息,偏移量加1,并记录在消费者本地,并按期的将记录同步到服务端(Broker),这里的同步机制是能够设置的
    • 消息是被持久化的,当组内全部消费者从新订阅主题时,能够设置是否从头开始消费消息或者是从最后记录的偏移值位置开始消费
  • 分区和消费者个数如何设置

    • 咱们知道主题分区是分布在不一样的Broker上的,每一个分区对应一个消费者,从而具备消息处理具备很高的吞吐量
    • 分区是调优Kafka并行度的最小单元,多线程消费者链接多分区消费消息,在实现上,经过socket链接,所以也会占用文件句柄个数
    • 建立分区都是会占用必定内存的,并非分区越多越好,固然如今kafka社区在优化这一部分,让分区数达到更大,性能也不会有所影响

具体怎么调优副本、分区、消费者等这里就不展开了,后面专门来研究这个问题。

5 发布订阅示例

实现下面的示例须要的环境:

  • Kafka + Zookeeper单点服务器或集群已配置好(若是环境搭建不熟悉,能够去翻看前面写的关于Kafka的环境搭建和测试那一篇),或者是使用Spring-kafka-test embedded Kafka Server
  • Spring Boot开发环境(2.2.1)

    • JDK(1.8或以上)
    • STS(4.4.RELEASE)
    • MARVEN构建方式

5.1 使用Embedded Kafka Server

咱们知道Kafka是Scala+Zookeeper构建的,能够从官方网站下载部署包并在本地部署。不过,Spring Kafka Test已经封装了Kafka测试的带注解的一键式功能,以打开Kafka服务器,从而简化了验证Kafka相关功能的开发过程,使用起来也很是简单。

添加依赖:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka-test</artifactId>
    <scope>test</scope>
</dependency>

启动服务,下面使用Junit测试用例直接启动Kafka服务器服务,包括四个代理节点,Run as JUnit Test。:

@RunWith(SpringRunner.class)
@SpringBootTest(classes = ApplicationTests.class)
@EmbeddedKafka(count = 4,ports = {9092,9093,9094,9095})
public class ApplicationTests {
    @Test
    public void contextLoads()throws IOException {
        System.in.read();
    }
}

@EmbeddedKafka中能够设置相关参数:

  • value: 设置建立代理的个数
  • count: 同value
  • ports: 代理端口号列表
  • brokerPropertiesLocation:指定配置文件,如 "classpath:application.properties"
注意:EmbeddedKafka这样默认是没有建立主题的。会提示 Topic(s) [test] is/are not present and missingTopicsFatal is true错误。@EmbeddedKafka默认状况是建立一个代理,该代理具备一个不带任何参数的随机端口,它将在启动日志中输出特定端口和默认配置项。

5.2 简单的发布订阅实现(无自定义配置)

下面实现一个简单发布订阅功能,经过前端WEB调用一个API,而后在该API控制器中获得请求后生产者开始发送消息,消费者后台监听消息,若是收到消费者消息,则打印出来。

5.2.1 添加依赖及配置Kafka

添加Kafka依赖:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>            
</dependency>

配置Kafka,这里消费者和生产者在同一应用中,咱们只须要配置Kafka Brokers的服务地址+端口:

server: 
  port: 9000
spring:
  kafka:
    bootstrap-servers: 10.151.113.57:9092,10.151.113.57:9093,10.151.113.57:9094
    listener:
        # 设置不监听主题错误,false时,若是broker设置了llow.auto.create.topics = true,生产者发送到未建立主题时,会默认自动建立主题
        # 且默认建立的主题是单副本单分区的
        missing-topics-fatal: false
    consumer:
        # 配置消费者消息offset是否自动重置(消费者重连会可以接收最开始的消息)
        auto-offset-reset: earliest

5.2.2 添加生产者

@Service
public class Producer {

    private static final Logger LOGGER = LogManager.getLogger(Producer.class);
    private static final String TOPIC = "users";

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String message) {
        LOGGER.info(String.format("===Producing message: {}", message));
        this.kafkaTemplate.send(TOPIC, message);
    }
}

5.2.3 添加消费者

@Service
public class Consumer {

    private static final Logger LOGGER = LogManager.getLogger(Consumer.class);

    @KafkaListener(topics = "test", groupId = "group_test")
    public void consume(String message) throws IOException {
        LOGGER.info(String.format("#### -> Consumed message -> %s", message));
    }
   
}

5.2.4 添加WEB控制器

@RestController
@RequestMapping(value = "/kafka")
public class KafkaController {

    private final Producer producer;

    @Autowired
    KafkaController(Producer producer) {
        this.producer = producer;
    }

    @GetMapping(value = "/publish")
    public void sendMessageToKafkaTopic(@RequestParam("message") String message) {
        this.producer.sendMessage(message);
    }
}

5.2.5 测试

添加Spring Boot Application:

@SpringBootApplication
public class TestKafkaApplication {
    public static void main(String[] args) {
        SpringApplication.run(TestKafkaApplication.class, args);
    }

}

启动Kafka Brokers后,须要手动建立主题(若是想自动建立,则须要借助KafkaAdmin,或者是Kafka Broker设置了allow.auto.create.topics=true且应用设置了listener.missing-topics-fatal=false):

# 若是对kafka-topics.sh这里不熟悉,能够去翻看前面写的关于Kafka的相关文章(环境搭建和测试那一篇)
# 建立test主题
$ ./bin/kafka-topics.sh --create --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --replication-factor 1 --partitions 2 --topic test

打开浏览器测试:

http://localhost:9000/kafka/publish?message=hello

则应用控制台会打印hello。整个发布订阅的实现只使用了跟Kafka相关的@KafkaListener注解接收消息和KafkaTemplate模板发送消息,非常简单。

5.3 基于自定义配置发布订阅实现

上面是简单的经过Spring Boot依赖的Spring Kafka配置便可快速实现发布订阅功能,这个时候咱们是没法在程序中操做这些配置的,所以这一小节就是利用咱们以前《Spring Boot从零入门7_最新配置文件配置及优先级详细介绍》文章中讲述的自定义配置文件方式去实现发布订阅功能。

实现内容有:

  • 自定义Kafka配置参数文件(非application.properties/yml)
  • 可实现多生产者(每一个生产者为单服务单线程),多消费者(非@KafkaListener实现消息监听)
  • 支持SSL安全配置
  • 监听生产者
源码不会直接贴,只给出主体部分。

配置文件:

@Configuration
@ConfigurationProperties(prefix = "m2kc")
@PropertySource("classpath:kafka.properties")
@Validated
public class M2KCKafkaConfig {

    @Value("${m2kc.kafka.bootstrap.servers}")
    private String kafkaBootStrapServers;

    @Value("${m2kc.kafka.key.serializer.class}")
    private String kafkaKeySerializerClass;

    ......
    ......
}

生产者:

@Service
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class KafkaProducer {
    private static final Logger LOGGER = LogManager.getLogger(KafkaProducer.class);
    private String mTopic = "test";
    private M2KCKafkaConfig mM2KCKafkaConfig;
    private KafkaTemplate<String, String> mKafkaTemplate;
   
    @Autowired
    public KafkaProducer(M2KCKafkaConfig kafkaConfig) {
        mTopic = kafkaConfig.getKafkaSourceTopic();
        mM2KCKafkaConfig = kafkaConfig;     
        mKafkaTemplate = getKafkaTemplate();
    }

    public KafkaTemplate<String, String> getKafkaTemplate() {
        KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<String, String>(producerFactory());
        return kafkaTemplate;
    }

    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> properties = new HashMap<String, Object>();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, mM2KCKafkaConfig.getKafkaBootStrapServers());
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, mM2KCKafkaConfig.getKafkaKeySerializerClass());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, mM2KCKafkaConfig.getKafkaValueSerializerClass());        
        if (mM2KCKafkaConfig.isKafkaSslEnable()) {
            // TODO : to test
            properties.put("security.protocol", mM2KCKafkaConfig.getKafkaSslProtocol());
            properties.put("ssl.truststore.location", mM2KCKafkaConfig.getKafkaSslStoreLocation());
            properties.put("ssl.truststore.password", mM2KCKafkaConfig.getKafkaSslTrustStorePassword());

            properties.put("ssl.keystore.location", mM2KCKafkaConfig.getKafkaSslStoreLocation());
            properties.put("ssl.keystore.password", mM2KCKafkaConfig.getKafkaSslKeyStorePassword());
            properties.put("ssl.key.password", mM2KCKafkaConfig.getKafkaSslKeyPassword());            
        }

        return new DefaultKafkaProducerFactory<String, String>(properties);
    }
    
    public void sendMessage(String msg) {
        LOGGER.info("===Producing message[{}]: {}", mTopic, msg);       
        ListenableFuture<SendResult<String, String>> future = mKafkaTemplate.send(mTopic, msg);
        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onSuccess(SendResult<String, String> result) {
                LOGGER.info("===Producing message success");  
            }

            @Override
            public void onFailure(Throwable ex) {
                LOGGER.info("===Producing message failed");  
            }

        });
    }
}

消费者:

@Service
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class KafkaConsumer implements InitializingBean {
    private static final Logger LOGGER = LogManager.getLogger(KafkaConsumer.class);

    private String mTopic;
    private M2KCKafkaConfig mM2KCKafkaConfig;
    private KafkaMessageListenerContainer<String, String> mKafkaMessageListenerContainer; 

    @Autowired
    public KafkaConsumer(M2KCKafkaConfig kafkaConfig) {
        LOGGER.info("===KafkaConsumer construct");
        mTopic = kafkaConfig.getKafkaSourceTopic();
        mM2KCKafkaConfig = kafkaConfig;
    }
    
    @PostConstruct
    public void start(){
        LOGGER.info("===KafkaConsumer start");        
    }
    
    @Override  
    public void afterPropertiesSet() throws Exception {          
        LOGGER.info("===afterPropertiesSet is called");      
        createContainer();
    }  

    private void createContainer() {
        mKafkaMessageListenerContainer =  createKafkaMessageListenerContainer();
        mKafkaMessageListenerContainer.setAutoStartup(false);;
        mKafkaMessageListenerContainer.start();
        LOGGER.info("===", mKafkaMessageListenerContainer);
    }
    
    private KafkaMessageListenerContainer<String, String> createKafkaMessageListenerContainer() {
        KafkaMessageListenerContainer<String, String> container = new KafkaMessageListenerContainer<>(consumerFactory(),
                createContainerProperties());
        LOGGER.info("===createKafkaMessageListenerContainer");
        return container;
    }
   
    private ContainerProperties createContainerProperties() {
        ContainerProperties containerProps = new ContainerProperties(mTopic);
        containerProps.setMessageListener(createMessageListener());
        return containerProps;
    }

    private ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> properties = new HashMap<String, Object>();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, mM2KCKafkaConfig.getKafkaBootStrapServers());
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, mM2KCKafkaConfig.getKafkaKeyDeserializerClass());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, mM2KCKafkaConfig.getKafkaValueDeserializerClass());
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, mM2KCKafkaConfig.getKafkaConsumerGroupID());
        if (mM2KCKafkaConfig.isKafkaSslEnable()) {
            // TODO : to test
            properties.put("security.protocol", mM2KCKafkaConfig.getKafkaSslProtocol());
            properties.put("ssl.truststore.location", mM2KCKafkaConfig.getKafkaSslStoreLocation());
            properties.put("ssl.truststore.password", mM2KCKafkaConfig.getKafkaSslTrustStorePassword());

            properties.put("ssl.keystore.location", mM2KCKafkaConfig.getKafkaSslStoreLocation());
            properties.put("ssl.keystore.password", mM2KCKafkaConfig.getKafkaSslKeyStorePassword());
            properties.put("ssl.key.password", mM2KCKafkaConfig.getKafkaSslKeyPassword());
        }

        return new DefaultKafkaConsumerFactory<String, String>(properties);
    }

    private MessageListener<String, String> createMessageListener() {
        return new MessageListener<String, String>() {
            @Override
            public void onMessage(ConsumerRecord<String, String> data) {
                // TODO Auto-generated method stub
                LOGGER.info("===Consuming msg: {}", data.value());
            }

        };
    }
}

继承InitializingBean只是为了初始化,也能够去掉,将初始化写入了构造函数中。这里的消费者和生产者都使用@Scope,因此须要手动获取实例,经过context去调用getBean()。另外配置文件没有写全,这里须要注意。

5.3 基于Spring Integration发布订阅实现

Spring Integration也有对Kafka支持的适配器,采用Spring Integration,咱们也可以快速的实现发布订阅功能,且实现群组多消费者批量消费功能:

  • 实现Kafka自定义配置类
  • 采用Spring Integration
  • 发布订阅
  • 群组多消费者批量消费
  • 采用DSL特定领域语法去编写
  • 生产者发布成功与失败异常处理

咱们能够先看看总体的Kafka消息传递通道:

  • 出站通道中KafkaProducerMessageHandler用于将消息发送到主题
  • KafkaMessageDrivenChannelAdapter用于设置入站通道和消息处理

具体的Demo能够参考Github中的一个sample :

6 总结

本篇文章详细介绍了Spring Kafka的发送消息和接收消息功能,其余包括Spring Kafka Stream的简单介绍、Spring Kafka参数配置,以及在Spring Boot中如何经过三种方式去实现Kafka的发布订阅功能,涉及了Kafka的多消费者多订阅者,SSL安全传输,Spring Integration Kafka等。文章很长,把握整体,结合实际,差很少基本内容都有所涉及了。

7 知识扩展

Spring Expression Language(简称SpEL),在Spring中,不一样于属性占位符${...},而SpEL表达式则要放到#{...}中(除代码块中用Expression外)。如配置文件中有topics参数spring.kafka.topics,则能够将配置文件中参数传入注解@KafkaListener(id = "foo", topics = "#{'${topicOne:annotated1,foo}'.split(',')}")

SpEL表达式经常使用示例:

// 字面量
#{3.1415926}    // 浮点数
#{9.87E4}       // 科学计数法表示98700
#{'Hello'}      // String 类型
#{false}        // Boolean 类型
// 引用Bean、属性和方法
#{sgtPeppers}                                   // 使用这个bean
#{sgtPeppers.artist}                            // 引用bean中的属性
#{sgtPeppers.selectArtist()}                    // 引用bean中的方法
#{sgtPeppers.selectArtist().toUpperCase()}      // 方法返回值的操做
#{sgtPeppers.selectArtist()?.toUpperCase()}     // 防止selectArtist()方法返回null,?表示非null则执行toUpperCase()
// 访问类做用域的方法和常量的话,使用T()这个关键的运算符
#{T(java.lang.Math)}   
#{T(java.lang.Math).PI}             // 引用PI的值
#{T(java.lang.Math).random()}       // 获取0-1的随机数
#{T(System).currentTimeMillis()}    // 获取时间到当前的毫秒数
// 替代属性占位符获取配置文件属性值
@Value("#{表达式}" 
private String variable;

8 参考资料

相关文章
相关标签/搜索