本文属于原创,转载注明出处,欢迎关注微信小程序小白AI博客
微信公众号小白AI
或者网站 https://xiaobaiai.nethtml
[TOC]前端
本篇文章内容很全,很长,很细!不要心急,慢慢看!我都写完了,相信你看完确定能够的,有任何问题能够随时交流!java
本篇文章内容很全,很长,很细!不要心急,慢慢看!我都写完了,相信你看完确定能够的,有任何问题能够随时交流!git
本篇文章内容很全,很长,很细!不要心急,慢慢看!我都写完了,相信你看完确定能够的,有任何问题能够随时交流!github
本篇文章主要介绍Spring Kafka的经常使用配置、主题自动建立、发布消息到集群、订阅消息(群组)、流处理配置以及嵌入式Kafka作测试配置相关内容,最后经过两种方式去实现消息的发布和订阅功能,其中一种是基于Spring Integration
方式。本文内容基于Spring Kafka2.3.3文档及Spring Boot Kafka相关文档,Spring建立了一个名为Spring kafka
的项目,它封装了Apache的kafka客户端部分(生产者/消费者/流处理等),以便在Spring项目中快速集成kafka,Spring-Kafka项目提供了Apache Kafka自动化配置,经过Spring Boot的简化配置(以spring.kafka.*
做为前缀的配置参数),在Spring Boot中使用Kafka特别简单。而且Spring Boot还提供了一个嵌入式Kafka代理方便作测试。spring
spring.kafka.bootstrap-servers=localhost:9092 spring.kafka.consumer.group-id=myGroup
实现下面的所涉及到的功能实现,须要有以下环境:apache
更多的配置能够参考《Kafka,ZK集群开发或部署环境搭建及实验》
这一篇文章。编程
本文尽可能作到阐述逻辑清晰,主要路线就是全局介绍Spring Kafka的主要功能及重点配置,而Spring Boot对Spring Kafka进一步简化配置,经过Spring Boot中的Kafka几大注解实现发布订阅功能,同时经过Spring Integration + 自定义Kafka配置方式实现一个较为复杂的Kafka发布订阅功能,本文经过本身实验和整理了较久的时间,涵盖了Spring Kafka大部份内容,但愿你们耐心读下来,有什么问题随时反馈,一块儿学习。
Spring Kafka、Spring Integration和Kafka客户端版本联系或者兼容性以下(截至2019年12月9日):json
Spring for Apache Kafka | Spring Integration for Apache Kafka Version | kafka-clients |
---|---|---|
2.3.x | 3.2.x | 2.3.1 |
2.2.x | 3.1.x | 2.0.1, 2.1.x, 2.2.x |
2.1.x | 3.0.x | 1.0.x, 1.1.x, 2.0.0 |
1.3.x | 2.3.x | 0.11.0.x, 1.0.x |
具体更多版本特色能够看官网,spring kafka当前最新为2.3.4版本。
Spring Kafka相关的注解有以下几个:bootstrap
注解类型 | 描述 |
---|---|
EnableKafka | 启用由AbstractListenerContainerFactory 在封面(covers)下建立的Kafka监听器注解端点,用于配置类; |
EnableKafkaStreams | 启用默认的Kafka流组件 |
KafkaHandler | 在用KafkaListener注解的类中,将方法标记为Kafka消息监听器的目标的注解 |
KafkaListener | 将方法标记为指定主题上Kafka消息监听器的目标的注解 |
KafkaListeners | 聚合多个KafkaListener注解的容器注解 |
PartitionOffset | 用于向KafkaListener添加分区/初始偏移信息 |
TopicPartition | 用于向KafkaListener添加主题/分区信息 |
如使用@EnableKafka
能够监听AbstractListenerContainerFactory
子类目标端点,如ConcurrentKafkaListenerContainerFactory
是AbstractKafkaListenerContainerFactory
的子类。
public class ConcurrentKafkaListenerContainerFactory<K,V> extends AbstractKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<K,V>,K,V>
@Configuration @EnableKafka public class AppConfig { @Bean public ConcurrentKafkaListenerContainerFactory myKafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(4); return factory; } // other @Bean definitions }
@EnableKafka
并非在Spring Boot中启用Kafka必须的,Spring Boot附带了Spring Kafka的自动配置,所以不须要使用显式的@EnableKafka
。若是想要本身实现Kafka配置类,则须要加上@EnableKafka
,若是你不想要Kafka自动配置,好比测试中,须要作的只是移除KafkaAutoConfiguration
:
@SpringBootTest("spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration")
💡 要在应用启动时就建立主题,能够添加NewTopic
类型的Bean。若是该主题已经存在,则忽略Bean。
Spring的KafkaTemplate
是自动配置的,你能够直接在本身的Bean中自动链接它,以下例所示:
@Component public class MyBean { private final KafkaTemplate kafkaTemplate; @Autowired public MyBean(KafkaTemplate kafkaTemplate) { this.kafkaTemplate = kafkaTemplate; } // ... }
KafkaTemplate
包装了一个生产者,并提供了向kafka主题发送数据的方便方法。提供异步和同步(发送阻塞)方法,异步(发送非阻塞)方法返回ListenableFuture
,以此监听异步发送状态,成功仍是失败,KafkaTemplate提供以下接口:
ListenableFuture<SendResult<K, V>> sendDefault(V data); ListenableFuture<SendResult<K, V>> sendDefault(K key, V data); ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data); ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data); ListenableFuture<SendResult<K, V>> send(String topic, V data); ListenableFuture<SendResult<K, V>> send(String topic, K key, V data); ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data); ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data); ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record); ListenableFuture<SendResult<K, V>> send(Message<?> message); Map<MetricName, ? extends Metric> metrics(); List<PartitionInfo> partitionsFor(String topic); <T> T execute(ProducerCallback<K, V, T> callback); // Flush the producer. void flush(); interface ProducerCallback<K, V, T> { T doInKafka(Producer<K, V> producer); }
sendDefault
API 要求已向模板提供默认主题。部分API接受一个时间戳做为参数,并将该时间戳存储在记录中,如何存储用户提供的时间戳取决于Kafka主题上配置的时间戳类型,若是主题配置为使用CREATE_TIME
,则记录用户指定的时间戳(若是未指定则生成)。若是将主题配置为使用LOG_APPEND_TIME
,则忽略用户指定的时间戳,而且代理将添加本地代理时间。metrics
和 partitionsFor
方法委托给底层Producer上的相同方法。execute方法提供对底层生产者的直接访问
要使用模板,能够配置一个生产者工厂并在模板的构造函数中提供它。下面的示例演示了如何执行此操做:
@Bean public ProducerFactory<Integer, String> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } @Bean public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); // See https://kafka.apache.org/documentation/#producerconfigs for more properties return props; } @Bean public KafkaTemplate<Integer, String> kafkaTemplate() { // KafkaTemplate构造函数中输入生产者工厂配置 return new KafkaTemplate<Integer, String>(producerFactory()); }
而后,要使用模板,能够调用其方法之一发送消息。
当你使用包含Message<?>
参数的方法时,主题、分区和键信息在消息头中提供,有以下子项:
KafkaHeaders.TOPIC KafkaHeaders.PARTITION_ID KafkaHeaders.MESSAGE_KEY KafkaHeaders.TIMESTAMP
如访问头部信息中某一项信息:
public void handleMessage(Message<?> message) throws MessagingException { LOGGER.debug("===Received Msg Topic: {}", message.getHeaders().get(KafkaHeaders.TOPIC)); }
可选的功能是,可使用ProducerListener
配置KafkaTemplate
,以得到带有发送结果(成功或失败)的异步回调,而不是等待未来完成。如下列表显示了ProducerListener
接口的定义:
public interface ProducerListener<K, V> { void onSuccess(String topic, Integer partition, K key, V value, RecordMetadata recordMetadata); void onError(String topic, Integer partition, K key, V value, Exception exception); boolean isInterestedInSuccess(); }
默认状况下,模板配置有LoggingProducerListener
,它只记录错误,在发送成功时不执行任何操做。只有当isInterestedInSuccess
返回true时才调用onSuccess
。
为了方便起见,若是你只想实现其中一个方法,那么将提供抽象ProducerListenerAdapter
。对于isInterestedInSuccess
,它返回false。下面演示了异步结果回调:
public void sendMessage(String msg) { LOGGER.info("===Producing message[{}]: {}", mTopic, msg); ListenableFuture<SendResult<String, String>> future = mKafkaTemplate.send(mTopic, msg); future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() { @Override public void onSuccess(SendResult<String, String> result) { LOGGER.info("===Producing message success"); } @Override public void onFailure(Throwable ex) { LOGGER.info("===Producing message failed"); } }); }
若是但愿阻止式发送线程等待结果,能够调用future
的get()
方法。你可能但愿在等待以前调用flush()
,或者为了方便起见,模板有一个带有autoFlush
参数的构造函数,该构造函数在每次发送时都会致使模板flush()
。不过,请注意,刷新可能会显著下降性能:
public void sendToKafka(final MyOutputData data) { final ProducerRecord<String, String> record = createRecord(data); try { template.send(record).get(10, TimeUnit.SECONDS); handleSuccess(data); } catch (ExecutionException e) { handleFailure(data, record, e.getCause()); } catch (TimeoutException | InterruptedException e) { handleFailure(data, record, e); } }
使用DefaultKafkaProducerFactory:
如上面使用KafkaTemplate
中所示,ProducerFactory
用于建立生产者。默认状况下,当不使用事务时,DefaultKafkaProducerFactory
会建立一个供全部客户机使用的单例生产者,如KafkaProducer
javadocs中所建议的那样。可是,若是对模板调用flush(),这可能会致使使用同一个生产者的其余线程延迟。从2.3版开始,DefaultKafkaProducerFactory
有一个新属性producerPerThread
。当设置为true
时,工厂将为每一个线程建立(和缓存)一个单独的生产者,以免此问题。
当producerPerThread
为true时,当再也不须要生产者时,用户代码必须在工厂上调用closeThreadBoundProducer()
。这将实际关闭生产者并将其从ThreadLocal
中移除。调用reset()或destroy()不会清理这些生产者。
建立DefaultKafkaProducerFactory
时,能够经过调用只接受属性映射的构造函数(请参阅使用KafkaTemplate中的示例)从配置中获取键和/或值序列化器类,或者序列化程序实例能够传递给DefaultKafkaProducerFactory
构造函数(在这种状况下,全部生产者共享相同的实例)。或者,能够提供Supplier<Serializer> s
(从版本2.3开始),用于为每一个生产者获取单独的Serializer
实例:
@Bean public ProducerFactory<Integer, CustomValue> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs(), null, () -> new CustomValueSerializer()); } @Bean public KafkaTemplate<Integer, CustomValue> kafkaTemplate() { return new KafkaTemplate<Integer, CustomValue>(producerFactory()); }
使用ReplyingKafkaTemplate:
版本2.1.3
引入了KafkaTemplate
的一个子类来提供请求/应答语义。这个类名为ReplyingKafkaTemplate
,而且有一个方法(除了超类中的那些方法以外)。下面的列表显示了方法签名:
RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record); RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record, Duration replyTimeout);
结果是一个ListenableFuture
,它被结果异步填充(或者超时时出现异常)。结果还有一个sendFuture
属性,这是调用KafkaTemplate.send()
的结果。你可使用此Future肯定发送操做的结果。这里就不展开了。
能够经过配置MessageListenerContainer
并提供消息监听器或使用@KafkaListener
注解来接收消息。
使用消息监听器容器(message listener container)时,必须提供监听器才能接收数据。目前有八个消息监听器支持的接口。下面的列表显示了这些接口:
// 使用自动提交或容器管理的提交方法之一时,使用此接口处理从Kafka 消费者 poll() 做接收的单个ConsumerRecord实例 public interface MessageListener<K, V> { void onMessage(ConsumerRecord<K, V> data); } // 使用手动提交方法之一时,使用此接口处理从Kafka 消费者 poll() 操做接收的单个ConsumerRecord实例 public interface AcknowledgingMessageListener<K, V> { void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment); } // 使用自动提交或容器管理的提交方法之一时,使用此接口处理从Kafka 消费者 poll() 操做接收的单个ConsumerRecord实例。提供对消费者对象的访问。 public interface ConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { void onMessage(ConsumerRecord<K, V> data, Consumer<?, ?> consumer); } // 使用手动提交方法之一时,使用此接口处理从Kafka 消费者 poll() 操做接收的单个ConsumerRecord实例。提供对消费者对象的访问。 public interface AcknowledgingConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer); } // 使用自动提交或容器管理的提交方法之一时,使用此接口处理从Kafka 消费者 poll() 操做接收的全部ConsumerRecord实例。使用此接口时不支持AckMode.RECORD,由于监听器已得到完整的批处理。 public interface BatchMessageListener<K, V> { void onMessage(List<ConsumerRecord<K, V>> data); } // 使用手动提交方法之一时,使用此接口处理从Kafka 消费者 poll() 操做接收的全部ConsumerRecord实例。 public interface BatchAcknowledgingMessageListener<K, V> { void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment); } // 使用自动提交或容器管理的提交方法之一时,使用此接口处理从Kafka 消费者 poll() 操做接收的全部ConsumerRecord实例。使用此接口时不支持AckMode.RECORD,由于监听器已得到完整的批处理。提供对使用者对象的访问。 public interface BatchConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { void onMessage(List<ConsumerRecord<K, V>> data, Consumer<?, ?> consumer); } // 使用手动提交方法之一时,使用此接口处理从Kafka 消费者 poll() 操做接收的全部ConsumerRecord实例。提供对使用者对象的访问。 public interface BatchAcknowledgingConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer); }
上述消费者对象不是线程安全的。只能在调用侦听器的线程上调用其方法。
提供了两个MessageListenerContainer
的实现:
KafkaMessageListenerContainer
从单个线程上的全部主题或分区接收全部消息(即一个分区只能分配到一个消费者,一个消费者能够被分配多个分区)。ConcurrentMessageListenerContainer
委托给一个或多个KafkaMessageListenerContainer
实例,以提供多线程使用,从多线程上去处理主题或分区的全部消息。
从Spring Kafka2.2.7版开始,你能够将RecordInterceptor
添加到侦听器容器中;在调用侦听器以容许检查或修改记录以前,将调用它。若是拦截器返回null,则不调用侦听器。侦听器是批处理侦听器时不调用侦听器。从2.3版开始,CompositeRecordInterceptor
可用于调用多个拦截器。
默认状况下,使用事务时,侦听器在事务启动后调用。从2.3.4版开始,你能够设置侦听器容器的interceptBeforeTx
属性,以便在事务启动以前调用侦听器。没有为批处理侦听器提供侦听器,由于Kafka已经提供了ConsumerInterceptor
。
有以下构造函数可用:
public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory, ContainerProperties containerProperties) public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory, ContainerProperties containerProperties, TopicPartitionOffset... topicPartitions)
每一个都获取一个ConsumerFactory
以及有关主题和分区的信息,以及ContainerProperties
对象中的其余配置。ConcurrentMessageListenerContainer
(稍后介绍)使用第二个构造函数跨使用者实例分发TopicPartitionOffset
。ContainerProperties
具备如下构造函数:
public ContainerProperties(TopicPartitionOffset... topicPartitions) public ContainerProperties(String... topics) public ContainerProperties(Pattern topicPattern)
第一个构造函数接受一个TopicPartitionOffset
参数数组来显式地指示容器要使用哪些分区(使用消费者的 assign()方法)和可选的初始偏移量。默认状况下,正值是绝对偏移量。默认状况下,负值是相对于分区内的当前最后偏移量。提供了TopicPartitionOffset
的构造函数,该构造函数接受一个附加的布尔参数。若是是true,则初始偏移(正偏移或负偏移)相对于该消耗器的当前位置。容器启动时应用偏移量。第二个是主题数组,Kafka基于group.id
属性:在组中分布分区来分配分区。第三个使用regex表达式来选择主题。
要将MessageListener
分配给容器,能够在建立容器时使用ContainerProps.setMessageListener
方法。下面的示例演示了如何执行此操做:
ContainerProperties containerProps = new ContainerProperties("topic1", "topic2"); containerProps.setMessageListener(new MessageListener<Integer, String>() { ... }); DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps()); KafkaMessageListenerContainer<Integer, String> container = new KafkaMessageListenerContainer<>(cf, containerProps); return container;
注意当建立一个Defaultkafkafkaconsumerfactory
时,使用构造器,该构造器仅以其特性为基础,就意味着从配置中获取了key/value的Deserializer类别。或者,反序列化程序实例能够传递给key/value的DefaultKafkaConsumerFactory
构造函数,在这种状况下,全部消费者共享相同的实例。另外一个选项是提供Supplier<Deserializer>s
(从版本2.3开始),用于为每一个使用者获取单独的反序列化程序实例:
DefaultKafkaConsumerFactory<Integer, CustomValue> cf = new DefaultKafkaConsumerFactory<>(consumerProps(), null, () -> new CustomValueDeserializer()); KafkaMessageListenerContainer<Integer, String> container = new KafkaMessageListenerContainer<>(cf, containerProps); return container;
有关能够设置的各类属性的更多信息,请参阅Javadoc 中ContainerProperties
。
从版本Spring Kafka 2.1.1开始,一个名为logContainerConfig
的新属性就可用了。当启用true和INFO日志记录时,每一个侦听器容器都会写入一条日志消息,总结其配置属性。
例如,要将日志级别更改成INFO,可使用containerProperties.setCommitLogLevel(LogIfLevelEnabled.level.INFO)
。
从版本Spring Kafka 2.2开始,添加了名为missingtopicsfailal
的新容器属性(默认值:true)。若是代理上不存在任何客户端发布或订阅涉及到的主题,这将阻止容器启动。若是容器配置为侦听主题模式(regex),则不适用。之前,容器线程在consumer.poll()
方法中循环,等待在记录许多消息时出现主题。除了日志,没有迹象代表有问题。要恢复之前的行为,能够将属性设置为false,这个时候,Broker设置项allow.auto.create.topics=true,且这个容器属性为false,则会自动建立不存在的topic。
单个构造函数相似于第一个KafkaListenerContainer
构造函数。下面的列表显示了构造函数的签名:
public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory, ContainerProperties containerProperties)
它还有一个并发属性。例如,container.setConcurrency(3)
即表示建立三个KafkaMessageListenerContainer
实例。对于第一个构造函数,Kafka使用它的组管理功能将分区分布到消费者之间。
当监听多个主题时,默认的分区分布可能不是你指望的那样。例如,若是你有三个主题,每一个主题有五个分区,而且但愿使用concurrency=15
,那么你只看到五个活动的消费者,每一个消费者从每一个主题中分配一个分区,其余十个消费者处于空闲状态。这是由于默认的KafkaPartitionAssignor
是RangeAssignor
(参见其Javadoc)。对于这种状况,你可能须要考虑改用RoundRobinAssignor
,它将分区分布到全部使用者。而后,为每一个使用者分配一个主题或分区。若要更改PartitionAssignor
,你能够在提供给DefaultKafkaConsumerFactory
的属性中设置partition.assignment.strategy
消费者配置参数(ConsumerConfigs.PARTITION_ASSIGNMENT_STRATEGY_CONFIG
)。使用Spring Boot时,能够按以下方式分配设置策略:
spring.kafka.consumer.properties.partition.assignment.strategy=\ org.apache.kafka.clients.consumer.RoundRobinAssignor
对于第二个构造函数,ConcurrentMessageListenerContainer
将TopicPartition
实例分布在委托KafkaMessageListenerContainer
实例上。
例如,若是提供了六个TopicPartition
实例,并发性为3;每一个容器获得两个分区。对于五个TopicPartition
实例,两个容器获得两个分区,第三个容器获得一个分区。若是并发性大于TopicPartitions
的数量,则会向下调整并发性,以便每一个容器得到一个分区。调整分区的方式可使用命令行工具kafka-topics.sh
查询和调整主题上的分区数。还能够添加一个NewTopic
Bean,若是NewTopic设定的数目大于当前数目,spring boot的自动配置的KafkaAdmin
将向上调整分区。
client.id属性(若是已设置)将附加
-n
,其中n是对应于并发的消费者实例。当启用JMX时,这是为MBeans提供惟一名称所必需的。
从版本Spring Kafka 1.3开始,MessageListenerContainer
提供了对底层KafkaConsumer
的度量的访问。对于ConcurrentMessageListenerContainer
,metrics()
方法返回全部目标KafkaMessageListenerContainer
实例的度量(metrics)。根据为底层KafkaConsumer
提供的client-id
度量被分组到Map<MetricName, ?extends Metric>
。
从2.3版开始,ContainerProperties
提供了一个idleBetweenPolls
选项,容许侦听器容器中的主循环在KafkaConsumer.poll()
调用之间睡眠。从提供的选项中选择实际睡眠间隔做为最小值,而且选择max.poll.interval.ms
消费者配置和当前记录批处理时间之间的差别。
提供了几个提交偏移量的选项。若是enable.auto.commit
使用者属性为true
,则Kafka将根据其配置自动提交偏移量。若是为false
,则容器支持多个AckMode
设置(在下一个列表中描述)。默认的确认模式是批处理。从2.3版开始,框架将enable.auto.commit
设置为false
,除非在配置中显式设置。之前,若是未设置属性,则使用Kafka默认值(true)。消费者 poll()
方法返回一个或多个ConsumerRecords
。为每一个记录调用MessageListener
。如下列表描述了容器对每一个AckMode
采起的操做:
poll()
返回的全部记录后提交偏移量。poll()
返回的全部记录后提交偏移量,只要超过上次提交后的ackTime
poll()
返回的全部记录后提交偏移量,只要上次提交后收到ackCount
记录。TIME
和COUNT
,但若是两个条件都为true,则执行提交。acknowledge()
和Acknowledgment
。以后,应用与BATCH相同的语义。Acknowledgement.acknowledge()
方法时当即提交偏移量。MANUAL和MANUAL_IMMEDIATE 要求侦听器是AcknowledgingMessageListener
或BatchAcknowledgingMessageListener
。请参见消息侦听器。
根据syncCommits
容器属性,使用消费者上的commitSync()
或commitAsync()
方法。默认状况下,syncCommits
为true;另请参阅setSyncCommitTimeout
。请参阅setCommitCallback
以获取异步提交的结果;默认回调是LoggingCommitCallback
,它记录错误(以及调试级别的成功)。
由于侦听器容器有本身的提交偏移的机制,因此它但愿Kafka ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
为false。从2.3版开始,除非在使用者工厂或容器的使用者属性重写中特别设置,不然它将无条件地将其设置为false。
Acknowledgment
有如下方法:
public interface Acknowledgment { void acknowledge(); }
此方法使侦听器能够控制什么时候提交偏移。
从版本2.3开始,确认接口有两个附加方法nack(long sleep)
和nack(int index, long sleep)
。第一个用于记录侦听器,第二个用于批处理侦听器。为侦听器类型调用错误的方法将引起IllegalStateException
。
nack()只能在调用侦听器的消费者线程上调用。
使用批处理侦听器时,能够在发生故障的批内指定索引。调用nack()
时,将在对失败和丢弃的记录的分区执行索引和查找以前提交记录的偏移量,以便在下次poll()
时从新传递这些偏移量。这是对SeekToCurrentBatchErrorHandler
的改进,SeekToCurrentBatchErrorHandler
只能查找整个批次以便从新交付。
注意:经过组管理使用分区分配时,确保sleep参数(加上处理上一次轮询记录所花费的时间)小于
consumer max.poll.interval.ms
属性很是重要。
侦听器容器实现了SmartLifecycle
(经过SmartLifecycle
在Spring加载和初始化全部bean后,接着执行一些任务或者启动须要的异步服务),默认状况下autoStartup
为true
。容器在后期启动(Integer.MAX-VALUE - 100
)。实现SmartLifecycle
以处理来自侦听器的数据的其余组件应该在较早的阶段启动。-100
为之后的阶段留出了空间,使组件可以在容器以后自动启动。好比咱们经过@Bean
将监听器容器交给Spring管理,这个时候经过SmartLifecycle
自动执行了初始化的任务,可是当咱们手动经过new监听器容器实例,则后初始化则不会执行,好比KafkaMessageListenerContainer
实例须要手动执行start()
。
autoStartup
在手动执行start中设置true与false没有做用,能够参见@KafkaListener
声明周期管理这一小节。
@KafkaListener
注解用于将bean方法指定为侦听器容器的侦听器。bean包装在一个MessagingMessageListenerAdapter
中,该适配器配置有各类功能,如转换器,用于转换数据(若有必要)以匹配方法参数。经过使用属性占位符(${…}
),或者可使用SpEL(#{…}
)配置注释上的大多数属性。有关更多信息,请参阅Javadoc。
@KafkaListener
:
id
:listener惟一id,当GroupId没有被配置的时候,默认id为自动产生,此值指定后会覆盖group id。containerFactory
:上面提到了@KafkaListener区分单数据仍是多数据消费只须要配置一下注解的containerFactory属性就能够了,这里面配置的是监听容器工厂,也就是ConcurrentKafkaListenerContainerFactory
,配置Bean名称topics
:须要监听的Topic,可监听多个,能够是表达式或者占位符关键字或者直接是主题名称,如多个主题监听:{"topic1" , "topic2"}
topicPattern
: 此侦听器的主题模式。条目能够是“主题模式”、“属性占位符键”或“表达式”。框架将建立一个容器,该容器订阅与指定模式匹配的全部主题,以获取动态分配的分区。模式匹配将针对检查时存在的主题周期性地执行。表达式必须解析为主题模式(支持字符串或模式结果类型)。这使用组管理,Kafka将为组成员分配分区。topicPartitions
:用于使用手动主题/分区分配时errorHandler
:监听异常处理器,配置Bean名称,默认为空groupId
:消费组IDidIsGroup
:id是否为GroupIdclientIdPrefix
:消费者Id前缀beanRef
:真实监听容器的Bean名称,须要在 Bean名称前加 "__"@KafkaListener
注解为简单的POJO侦听器提供了一种机制。下面的示例演示如何使用它:
public class Listener { @KafkaListener(id = "foo", topics = "myTopic", clientIdPrefix = "myClientId") public void listen(String data) { ... } }
此机制生效须要@Configuration
类之一上的@EnableKafka
注解和用于配置基础ConcurrentMessageListenerContainer
的侦听器容器工厂。默认状况下,须要名为kafkaListenerContainerFactory
的bean。如下示例演示如何使用ConcurrentMessageListenerContain
:
@Configuration @EnableKafka public class KafkaConfig { @Bean KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(3); factory.getContainerProperties().setPollTimeout(3000); return factory; } @Bean public ConsumerFactory<Integer, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString()); ... return props; } }
注意,要设置容器属性,必须在工厂上使用getContainerProperties()
方法。它用做注入容器的实际属性的模板。
从版本2.1.1开始,如今能够为注解建立的消费者设置client.id
属性。clientdprefix
的后缀是-n
,其中n是一个整数,表示使用并发时的容器号。
从2.2版开始,如今能够经过使用批注自己的属性来重写容器工厂的并发性和自动启动属性。属性能够是简单值、属性占位符或SpEL表达式。下面的示例演示了如何执行此操做:
@KafkaListener(id = "myListener", topics = "myTopic", autoStartup = "${listen.auto.start:true}", concurrency = "${listen.concurrency:3}") public void listen(String data) { ... }
你还可使用显式主题和分区(以及可选的初始偏移量)配置POJO侦听器。下面的示例演示了如何执行此操做:
@KafkaListener(id = "thing2", topicPartitions = { @TopicPartition(topic = "topic1", partitions = { "0", "1" }), @TopicPartition(topic = "topic2", partitions = "0", partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100")) }) public void listen(ConsumerRecord<?, ?> record) { ... }
你能够在partitions
或partitionOffsets
属性中指定每一个分区,但不能同时指定二者。
使用手动AckMode
时,还能够向侦听器提供Acknowledgment
。下面的示例还演示了如何使用不一样的容器工厂:
@KafkaListener(id = "cat", topics = "myTopic", containerFactory = "kafkaManualAckListenerContainerFactory") public void listen(String data, Acknowledgment ack) { ... ack.acknowledge(); }
最后,能够从消息头得到有关消息的元数据。你可使用如下头名称来检索消息头内容:
KafkaHeaders.OFFSET KafkaHeaders.RECEIVED_MESSAGE_KEY KafkaHeaders.RECEIVED_TOPIC KafkaHeaders.RECEIVED_PARTITION_ID KafkaHeaders.RECEIVED_TIMESTAMP KafkaHeaders.TIMESTAMP_TYPE
示例:
@KafkaListener(id = "qux", topicPattern = "myTopic1") public void listen(@Payload String foo, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts ) { ... }
从版本1.1开始,能够配置@KafkaListener
方法来接收从消费者接收的整批消费者记录。要将侦听器容器工厂配置为建立批处理侦听器,能够设置batchListener
属性。下面的示例演示了如何执行此操做:
@Bean public KafkaListenerContainerFactory<?, ?> batchFactory() { ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setBatchListener(true); return factory; }
如下示例显示如何接收有效载荷列表:
@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory") public void listen(List<String> list) { ... }
主题、分区、偏移量等在与有效负载并行的头中可用。下面的示例演示如何使用标题:
@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory") public void listen(List<String> list, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) List<Integer> keys, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions, @Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics, @Header(KafkaHeaders.OFFSET) List<Long> offsets) { ... }
或者,您能够接收消息列表Message<?>
对象,其中包含每一个偏移量和每一个消息中的其余详细信息,但它必须是惟一的参数(除了使用手动提交时的Acknowledgment和/
或Consumer<?, ?>
参数)。下面的示例演示如何执行此操做:
@KafkaListener(id = "listMsg", topics = "myTopic", containerFactory = "batchFactory") public void listen14(List<Message<?>> list) { ... } @KafkaListener(id = "listMsgAck", topics = "myTopic", containerFactory = "batchFactory") public void listen15(List<Message<?>> list, Acknowledgment ack) { ... } @KafkaListener(id = "listMsgAckConsumer", topics = "myTopic", containerFactory = "batchFactory") public void listen16(List<Message<?>> list, Acknowledgment ack, Consumer<?, ?> consumer) { ... }
在这种状况下,不会对有效载荷执行转换。若是BatchMessagingMessageConverter
配置了RecordMessageConverter
,则还能够向消息参数添加泛型类型,并转换有效负载。有关详细信息,请参阅使用批处理侦听器的负载转换。
你还能够收到一个ConsumerRecord<?, ?>
对象,但它必须是惟一的参数(当使用手动提交或Consumer<?, ?>
参数时,除了可选的Acknowledgment)。下面的示例演示了如何执行此操做:
@KafkaListener(id = "listCRs", topics = "myTopic", containerFactory = "batchFactory") public void listen(List<ConsumerRecord<Integer, String>> list) { ... } @KafkaListener(id = "listCRsAck", topics = "myTopic", containerFactory = "batchFactory") public void listen(List<ConsumerRecord<Integer, String>> list, Acknowledgment ack) { ... }
从版本2.2开始,侦听器能够接收poll()
方法返回的完整的ConsumerRecords<?, ?>
对象,容许侦听器访问其余方法,例如partitions()
(返回列表中的TopicPartition
实例)和records
(TopicPartition)(获取选择性记录)。一样,这必须是惟一的参数(当使用手动提交或Consumer<?, ?>
参数时,除了可选的Acknowledgment)。下面的示例演示了如何执行此操做:
@KafkaListener(id = "pollResults", topics = "myTopic", containerFactory = "batchFactory") public void pollResults(ConsumerRecords<?, ?> records) { ... }
从2.2版开始,如今更容易添加验证程序来验证@KafkaListener
`@Payload参数。之前,你必须配置一个自定义的
DefaultMessageHandlerMethodFactory`并将其添加到注册器中。如今,你能够将验证器添加到注册器自己。如下代码说明了如何执行此操做:
@Configuration @EnableKafka public class Config implements KafkaListenerConfigurer { ... @Override public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) { registrar.setValidator(new MyValidator()); } }
当你在Spring Boot使用validation starter
,会自动配置LocalValidatorFactoryBean
,以下例所示:
@Configuration @EnableKafka public class Config implements KafkaListenerConfigurer { @Autowired private LocalValidatorFactoryBean validator; ... @Override public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) { registrar.setValidator(this.validator); } }
如下示例演示如何验证:
public static class ValidatedClass { @Max(10) private int bar; public int getBar() { return this.bar; } public void setBar(int bar) { this.bar = bar; } }
@KafkaListener(id="validated", topics = "annotated35", errorHandler = "validationErrorHandler", containerFactory = "kafkaJsonListenerContainerFactory") public void validatedListener(@Payload @Valid ValidatedClass val) { ... } @Bean public KafkaListenerErrorHandler validationErrorHandler() { return (m, e) -> { ... }; }
ContainerProperties
有一个名为consumerRebalanceListener
的属性,该属性接受Kafka客户端的consumerRebalanceListene
r接口的实现。若是未提供此属性,则容器将配置日志侦听器,该侦听器将在信息级别记录从新平衡事件。该框架还添加了一个子接口ConsumerRawareRebalanceListener
。如下列表显示了ConsumerRawareRebalanceListener
接口定义:
public interface ConsumerAwareRebalanceListener extends ConsumerRebalanceListener { void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions); void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions); void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions); }
从2.0版开始,若是还使用@SendTo
注解注释@KafkaListener
,而且方法调用返回结果,则结果将转发到@SendTo
指定的主题。如:
@KafkaListener(topics = "annotated21") @SendTo("!{request.value()}") // runtime SpEL public String replyingListener(String in) { ... } @KafkaListener(topics = "${some.property:annotated22}") @SendTo("#{myBean.replyTopic}") // config time SpEL public Collection<String> replyingBatchListener(List<String> in) { ... } @KafkaListener(topics = "annotated23", errorHandler = "replyErrorHandler") @SendTo("annotated23reply") // static reply topic definition public String replyingListenerWithErrorHandler(String in) { ... } ... @KafkaListener(topics = "annotated25") @SendTo("annotated25reply1") public class MultiListenerSendTo { @KafkaHandler public String foo(String in) { ... } @KafkaHandler @SendTo("!{'annotated25reply2'}") public String bar(@Payload(required = false) KafkaNull nul, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) int key) { ... } }
为@KafkaListener
注解建立的侦听器容器不是应用程序上下文中的bean。相反,它们是用KafkaListenerEndpointRegistry
类型的基础设施bean注册的。这个bean由框架自动声明并管理容器的生命周期;它将自动启动任何autoStartup
设置为true
的容器。全部容器工厂建立的全部容器必须处于同一phase
。有关详细信息,请参阅侦听器容器自动启动。你可使用注册表以编程方式管理生命周期。启动或中止注册表将启动或中止全部已注册的容器。或者,能够经过使用单个容器的id属性来获取对该容器的引用。能够在批注上设置autoStartup
,这将覆盖容器工厂中配置的默认设置(setAutoStartup(true)
)。你能够从应用程序上下文中获取对bean的引用,例如自动链接,以管理其注册的容器。如下示例说明了如何执行此操做:
@KafkaListener(id = "myContainer", topics = "myTopic", autoStartup = "false") public void listen(...) { ... }
@Autowired private KafkaListenerEndpointRegistry registry; ... this.registry.getListenerContainer("myContainer").start(); ...
注册表只维护其管理的容器的生命周期;声明为bean的容器不受注册表管理,能够从应用程序上下文中获取。能够经过调用注册表的getListenerContainers()
方法来获取托管容器的集合。Spring Kafka版本2.2.5添加了一个方便方法getAllListenerContainers()
,它返回全部容器的集合,包括由注册表管理的容器和声明为bean的容器。返回的集合将包括任何已初始化的原型bean,但它不会初始化任何延迟bean声明。
Spring for Apache Kafka
提供了一个工厂bean来建立StreamsBuilder
对象并管理其流的生命周期。只要kafka流在classpath上而且kafka流经过@EnableKafkaStreams
注解开启,Spring Boot就会自动配置所需的KafkaStreamsConfiguration
bean。
启用Kafka流意味着必须设置应用程序id和引导服务器(bootstrap servers)。前者可使用spring.kafka.streams.application-id
配置,若是未设置,则默认为spring.application.name
。后者能够全局设置,也能够专门为流覆写。
使用专用属性可使用其余几个属性;可使用spring.Kafka.streams.properties
命名空间设置其余任意Kafka属性。有关详细信息,Additional Kafka Properties 。
默认状况下,由它建立的StreamBuilder
对象管理的流将自动启动。可使用spring.kafka.streams.auto-startup
属性自定义此行为。
要使用工厂bean,只需将StreamsBuilder
链接到@bean
,以下例所示:
@Configuration(proxyBeanMethods = false) @EnableKafkaStreams public static class KafkaStreamsExampleConfiguration { @Bean public KStream<Integer, String> kStream(StreamsBuilder streamsBuilder) { KStream<Integer, String> stream = streamsBuilder.stream("ks1In"); stream.map((k, v) -> new KeyValue<>(k, v.toUpperCase())).to("ks1Out", Produced.with(Serdes.Integer(), new JsonSerde<>())); return stream; } }
默认状况下,由它建立的StreamBuilder
对象管理的流将自动启动。可使用spring.kafka.streams.auto-startup
属性自定义此行为。
自动配置支持的属性显示在公用应用程序属性中。注意,在大多数状况下,这些属性(连字符或驼峰样式)直接映射到Apache Kafka点式属性。有关详细信息,请参阅Apache Kafka
文档。
前面提到的几个属性应用于全部组件(生产者、消费者、管理员和流),但若是但愿使用不一样的值,则能够在组件级别指定。Apache Kafka指定重要性为HIGH
、MEDIUM
或LOW
的属性。Spring Boot自动配置支持全部高重要性属性、某些选定的中、低属性以及任何没有默认值的属性。
只有Kafka支持的属性的一个子集能够经过KafkaProperties
类直接使用,若是要使用不直接支持的其余属性配置生产者或消费者,请使用如下属性:
spring.kafka.properties.prop.one=first spring.kafka.admin.properties.prop.two=second spring.kafka.consumer.properties.prop.three=third spring.kafka.producer.properties.prop.four=fourth spring.kafka.streams.properties.prop.five=fifth
上面的参数设置示例将公共prop.one
Kafka属性设置为first
(适用于生产者、消费者和管理员),prop.two
admin属性设置为second
,prop.three
consumer属性设置为third
,prop.four
producer属性设置为fourth
,prop.five
streams属性设置为fifth
。
你还能够配置Spring Kafka JsonDeserializer
,以下所示:
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer spring.kafka.consumer.properties.spring.json.value.default.type=com.example.Invoice spring.kafka.consumer.properties.spring.json.trusted.packages=com.example,org.acme
相似地,能够禁用JsonSerializer
在头中发送类型信息的默认行为:
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer spring.kafka.producer.properties.spring.json.add.type.headers=false
注意: 以这种方式设置的属性将覆盖Spring Boot显式支持的任何配置项。
Spring for Apache Kafka提供了一种使用嵌入式Apache Kafka代理测试项目的便捷方法。要使用此功能,请使用Spring Kafka测试模块中的@EmbeddedKafka
注解测试类。有关更多信息,请参阅Spring For Apache Kafka参考手册。
要使Spring Boot自动配置与前面提到的嵌入式Apache Kafka代理一块儿工做,须要将嵌入式代理地址(由EmbeddedKafkaBroker
填充)的系统属性从新映射到Apache Kafka的Spring Boot配置属性中。有几种方法能够作到这一点:
spring.kafka.bootstrap-servers
:static { System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY, "spring.kafka.bootstrap-servers"); }
@EmbeddedKafka
注解上配置属性名:@EmbeddedKafka(topics = "someTopic", bootstrapServersProperty = "spring.kafka.bootstrap-servers")
spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}
Spring Integration也有Kafka的适配器,所以咱们能够很方便的采用Spring Integration去实现发布订阅,固然你也能够不使用Spring Integration。
Spring Integration是什么,具体有什么做用,能够参考另外一篇文章《Spring Integration最详解》。
这里对全部配置作个说明的是,spring kafka配置分全局配置和子模块配置,子模块配置会复写全局配置,好比SSL认证能够全局配置,可是也能够在每一个子模块,如消费者、生产者、流式处理中均可以单独配置SSL(多是微服务部署,消费者和生产者不在同一个应用中)。这里重点介绍生产者和消费者配置吧,其余就不展开了,用到的时候再去查找和补充。
# 用逗号分隔的主机:端口对列表,用于创建到Kafka群集的初始链接。覆盖全局链接设置属性 spring.kafka.bootstrap-servers # 在发出请求时传递给服务器的ID。用于服务器端日志记录 spring.kafka.client-id,默认无 # 用于配置客户端的其余属性,生产者和消费者共有的属性 spring.kafka.properties.* # 消息发送的默认主题,默认无 spring.kafka.template.default-topic
Spring Boot中,Kafka 生产者
相关配置(全部配置前缀为spring.kafka.producer.
):
# 生产者要求Leader在考虑请求完成以前收到的确认数 spring.kafka.producer.acks # 默认批量大小。较小的批处理大小将使批处理不太常见,并可能下降吞吐量(批处理大小为零将彻底禁用批处理) spring.kafka.producer.batch-size spring.kafka.producer.bootstrap-servers # 生产者可用于缓冲等待发送到服务器的记录的总内存大小。 spring.kafka.producer.buffer-memory # 在发出请求时传递给服务器的ID。用于服务器端日志记录。 spring.kafka.producer.client-id # 生产者生成的全部数据的压缩类型 spring.kafka.producer.compression-type # 键的序列化程序类 spring.kafka.producer.key-serializer spring.kafka.producer.properties.* # 大于零时,启用失败发送的重试次数 spring.kafka.producer.retries spring.kafka.producer.ssl.key-password spring.kafka.producer.ssl.key-store-location spring.kafka.producer.ssl.key-store-password spring.kafka.producer.ssl.key-store-type spring.kafka.producer.ssl.protocol spring.kafka.producer.ssl.trust-store-location spring.kafka.producer.ssl.trust-store-password spring.kafka.producer.ssl.trust-store-type # 非空时,启用对生产者的事务支持 spring.kafka.producer.transaction-id-prefix spring.kafka.producer.value-serializer
Spring Boot中,Kafka 消费者相关配置(全部配置前缀为spring.kafka.consumer.
):
# 若是“enable.auto.commit”设置为true,设置消费者偏移自动提交到Kafka的频率,默认值无,单位毫秒(ms) spring.kafka.consumer.auto-commit-interval # 当Kafka中没有初始偏移或服务器上再也不存在当前偏移时策略设置,默认值无,latest/earliest/none三个值设置 # earliest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 # latest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 # none topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常 spring.kafka.consumer.auto-offset-reset # 用逗号分隔的主机:端口对列表,用于创建到Kafka群集的初始链接。覆盖全局链接设置属性 spring.kafka.consumer.bootstrap-servers # 在发出请求时传递给服务器的ID,用于服务器端日志记录 spring.kafka.consumer.client-id # 消费者的偏移量是否在后台按期提交 spring.kafka.consumer.enable-auto-commit # 若是没有足够的数据来当即知足“fetch-min-size”的要求,则服务器在取回请求以前阻塞的最大时间量 spring.kafka.consumer.fetch-max-wait # 服务器应为获取请求返回的最小数据量。 spring.kafka.consumer.fetch-min-size # 标识此消费者所属的默认消费者组的惟一字符串 spring.kafka.consumer.group-id # 消费者协调员的预期心跳间隔时间。 spring.kafka.consumer.heartbeat-interval # 用于读取以事务方式写入的消息的隔离级别。 spring.kafka.consumer.isolation-level # 密钥的反序列化程序类 spring.kafka.consumer.key-deserializer # 在对poll()的单个调用中返回的最大记录数。 spring.kafka.consumer.max-poll-records # 用于配置客户端的其余特定于消费者的属性。 spring.kafka.consumer.properties.* # 密钥存储文件中私钥的密码。 spring.kafka.consumer.ssl.key-password # 密钥存储文件的位置。 spring.kafka.consumer.ssl.key-store-location # 密钥存储文件的存储密码。 spring.kafka.consumer.ssl.key-store-password # 密钥存储的类型,如JKS spring.kafka.consumer.ssl.key-store-type # 要使用的SSL协议,如TLSv1.2, TLSv1.1, TLSv1 spring.kafka.consumer.ssl.protocol # 信任存储文件的位置。 spring.kafka.consumer.ssl.trust-store-location # 信任存储文件的存储密码。 spring.kafka.consumer.ssl.trust-store-password # 信任存储区的类型。 spring.kafka.consumer.ssl.trust-store-type # 值的反序列化程序类。 spring.kafka.consumer.value-deserializer
Spring Boot中,Kafka Listener相关配置(全部配置前缀为spring.kafka.listener.
):
# ackMode为“COUNT”或“COUNT_TIME”时偏移提交之间的记录数 spring.kafka.listener.ack-count= spring.kafka.listener.ack-mode spring.kafka.listener.ack-time spring.kafka.listener.client-id spring.kafka.listener.concurrency spring.kafka.listener.idle-event-interval spring.kafka.listener.log-container-config # 若是Broker上不存在至少一个配置的主题(topic),则容器是否没法启动, # 该设置项结合Broker设置项allow.auto.create.topics=true,若是为false,则会自动建立不存在的topic spring.kafka.listener.missing-topics-fatal=true # 非响应消费者的检查间隔时间。若是未指定持续时间后缀,则将使用秒做为单位 spring.kafka.listener.monitor-interval spring.kafka.listener.no-poll-threshold spring.kafka.listener.poll-timeout spring.kafka.listener.type
spring.kafka.admin.client-id # 若是启动时代理不可用,是否快速失败 spring.kafka.admin.fail-fast=false spring.kafka.admin.properties.* spring.kafka.admin.ssl.key-password spring.kafka.admin.ssl.key-store-location spring.kafka.admin.ssl.key-store-password spring.kafka.admin.ssl.key-store-type spring.kafka.admin.ssl.protocol spring.kafka.admin.ssl.trust-store-location spring.kafka.admin.ssl.trust-store-password spring.kafka.admin.ssl.trust-store-type
spring.kafka.jaas.control-flag=required spring.kafka.jaas.enabled=false spring.kafka.jaas.login-module=com.sun.security.auth.module.Krb5LoginModule spring.kafka.jaas.options.*
spring.kafka.ssl.key-password spring.kafka.ssl.key-store-location spring.kafka.ssl.key-store-password spring.kafka.ssl.key-store-type spring.kafka.ssl.protocol spring.kafka.ssl.trust-store-location spring.kafka.ssl.trust-store-password spring.kafka.ssl.trust-store-type
spring.kafka.streams.application-id spring.kafka.streams.auto-startup spring.kafka.streams.bootstrap-servers spring.kafka.streams.cache-max-size-buffering spring.kafka.streams.client-id spring.kafka.streams.properties.* spring.kafka.streams.replication-factor spring.kafka.streams.ssl.key-password spring.kafka.streams.ssl.key-store-location spring.kafka.streams.ssl.key-store-password spring.kafka.streams.ssl.key-store-type spring.kafka.streams.ssl.protocol spring.kafka.streams.ssl.trust-store-location spring.kafka.streams.ssl.trust-store-password spring.kafka.streams.ssl.trust-store-type spring.kafka.streams.state-dir
同一消费组下全部消费者协同消费订阅主题的全部分区
消费者offset管理机制
分区和消费者个数如何设置
具体怎么调优副本、分区、消费者等这里就不展开了,后面专门来研究这个问题。
实现下面的示例须要的环境:
Spring-kafka-test
embedded Kafka Server
Spring Boot开发环境(2.2.1)
咱们知道Kafka是Scala+Zookeeper
构建的,能够从官方网站下载部署包并在本地部署。不过,Spring Kafka Test已经封装了Kafka测试的带注解的一键式功能,以打开Kafka服务器,从而简化了验证Kafka相关功能的开发过程,使用起来也很是简单。
添加依赖:
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka-test</artifactId> <scope>test</scope> </dependency>
启动服务,下面使用Junit测试用例直接启动Kafka服务器服务,包括四个代理节点,Run as JUnit Test
。:
@RunWith(SpringRunner.class) @SpringBootTest(classes = ApplicationTests.class) @EmbeddedKafka(count = 4,ports = {9092,9093,9094,9095}) public class ApplicationTests { @Test public void contextLoads()throws IOException { System.in.read(); } }
@EmbeddedKafka
中能够设置相关参数:
注意:EmbeddedKafka这样默认是没有建立主题的。会提示
Topic(s) [test] is/are not present and missingTopicsFatal is true
错误。@EmbeddedKafka默认状况是建立一个代理,该代理具备一个不带任何参数的随机端口,它将在启动日志中输出特定端口和默认配置项。
下面实现一个简单发布订阅功能,经过前端WEB调用一个API,而后在该API控制器中获得请求后生产者开始发送消息,消费者后台监听消息,若是收到消费者消息,则打印出来。
添加Kafka依赖:
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
配置Kafka,这里消费者和生产者在同一应用中,咱们只须要配置Kafka Brokers的服务地址+端口:
server: port: 9000 spring: kafka: bootstrap-servers: 10.151.113.57:9092,10.151.113.57:9093,10.151.113.57:9094 listener: # 设置不监听主题错误,false时,若是broker设置了llow.auto.create.topics = true,生产者发送到未建立主题时,会默认自动建立主题 # 且默认建立的主题是单副本单分区的 missing-topics-fatal: false consumer: # 配置消费者消息offset是否自动重置(消费者重连会可以接收最开始的消息) auto-offset-reset: earliest
@Service public class Producer { private static final Logger LOGGER = LogManager.getLogger(Producer.class); private static final String TOPIC = "users"; @Autowired private KafkaTemplate<String, String> kafkaTemplate; public void sendMessage(String message) { LOGGER.info(String.format("===Producing message: {}", message)); this.kafkaTemplate.send(TOPIC, message); } }
@Service public class Consumer { private static final Logger LOGGER = LogManager.getLogger(Consumer.class); @KafkaListener(topics = "test", groupId = "group_test") public void consume(String message) throws IOException { LOGGER.info(String.format("#### -> Consumed message -> %s", message)); } }
@RestController @RequestMapping(value = "/kafka") public class KafkaController { private final Producer producer; @Autowired KafkaController(Producer producer) { this.producer = producer; } @GetMapping(value = "/publish") public void sendMessageToKafkaTopic(@RequestParam("message") String message) { this.producer.sendMessage(message); } }
添加Spring Boot Application:
@SpringBootApplication public class TestKafkaApplication { public static void main(String[] args) { SpringApplication.run(TestKafkaApplication.class, args); } }
启动Kafka Brokers后,须要手动建立主题(若是想自动建立,则须要借助KafkaAdmin,或者是Kafka Broker设置了allow.auto.create.topics=true
且应用设置了listener.missing-topics-fatal=false
):
# 若是对kafka-topics.sh这里不熟悉,能够去翻看前面写的关于Kafka的相关文章(环境搭建和测试那一篇) # 建立test主题 $ ./bin/kafka-topics.sh --create --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --replication-factor 1 --partitions 2 --topic test
打开浏览器测试:
http://localhost:9000/kafka/publish?message=hello
则应用控制台会打印hello
。整个发布订阅的实现只使用了跟Kafka相关的@KafkaListener
注解接收消息和KafkaTemplate
模板发送消息,非常简单。
上面是简单的经过Spring Boot依赖的Spring Kafka配置便可快速实现发布订阅功能,这个时候咱们是没法在程序中操做这些配置的,所以这一小节就是利用咱们以前《Spring Boot从零入门7_最新配置文件配置及优先级详细介绍》文章中讲述的自定义配置文件方式去实现发布订阅功能。
实现内容有:
@KafkaListener
实现消息监听)源码不会直接贴,只给出主体部分。
配置文件:
@Configuration @ConfigurationProperties(prefix = "m2kc") @PropertySource("classpath:kafka.properties") @Validated public class M2KCKafkaConfig { @Value("${m2kc.kafka.bootstrap.servers}") private String kafkaBootStrapServers; @Value("${m2kc.kafka.key.serializer.class}") private String kafkaKeySerializerClass; ...... ...... }
生产者:
@Service @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) public class KafkaProducer { private static final Logger LOGGER = LogManager.getLogger(KafkaProducer.class); private String mTopic = "test"; private M2KCKafkaConfig mM2KCKafkaConfig; private KafkaTemplate<String, String> mKafkaTemplate; @Autowired public KafkaProducer(M2KCKafkaConfig kafkaConfig) { mTopic = kafkaConfig.getKafkaSourceTopic(); mM2KCKafkaConfig = kafkaConfig; mKafkaTemplate = getKafkaTemplate(); } public KafkaTemplate<String, String> getKafkaTemplate() { KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<String, String>(producerFactory()); return kafkaTemplate; } public ProducerFactory<String, String> producerFactory() { Map<String, Object> properties = new HashMap<String, Object>(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, mM2KCKafkaConfig.getKafkaBootStrapServers()); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, mM2KCKafkaConfig.getKafkaKeySerializerClass()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, mM2KCKafkaConfig.getKafkaValueSerializerClass()); if (mM2KCKafkaConfig.isKafkaSslEnable()) { // TODO : to test properties.put("security.protocol", mM2KCKafkaConfig.getKafkaSslProtocol()); properties.put("ssl.truststore.location", mM2KCKafkaConfig.getKafkaSslStoreLocation()); properties.put("ssl.truststore.password", mM2KCKafkaConfig.getKafkaSslTrustStorePassword()); properties.put("ssl.keystore.location", mM2KCKafkaConfig.getKafkaSslStoreLocation()); properties.put("ssl.keystore.password", mM2KCKafkaConfig.getKafkaSslKeyStorePassword()); properties.put("ssl.key.password", mM2KCKafkaConfig.getKafkaSslKeyPassword()); } return new DefaultKafkaProducerFactory<String, String>(properties); } public void sendMessage(String msg) { LOGGER.info("===Producing message[{}]: {}", mTopic, msg); ListenableFuture<SendResult<String, String>> future = mKafkaTemplate.send(mTopic, msg); future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() { @Override public void onSuccess(SendResult<String, String> result) { LOGGER.info("===Producing message success"); } @Override public void onFailure(Throwable ex) { LOGGER.info("===Producing message failed"); } }); } }
消费者:
@Service @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) public class KafkaConsumer implements InitializingBean { private static final Logger LOGGER = LogManager.getLogger(KafkaConsumer.class); private String mTopic; private M2KCKafkaConfig mM2KCKafkaConfig; private KafkaMessageListenerContainer<String, String> mKafkaMessageListenerContainer; @Autowired public KafkaConsumer(M2KCKafkaConfig kafkaConfig) { LOGGER.info("===KafkaConsumer construct"); mTopic = kafkaConfig.getKafkaSourceTopic(); mM2KCKafkaConfig = kafkaConfig; } @PostConstruct public void start(){ LOGGER.info("===KafkaConsumer start"); } @Override public void afterPropertiesSet() throws Exception { LOGGER.info("===afterPropertiesSet is called"); createContainer(); } private void createContainer() { mKafkaMessageListenerContainer = createKafkaMessageListenerContainer(); mKafkaMessageListenerContainer.setAutoStartup(false);; mKafkaMessageListenerContainer.start(); LOGGER.info("===", mKafkaMessageListenerContainer); } private KafkaMessageListenerContainer<String, String> createKafkaMessageListenerContainer() { KafkaMessageListenerContainer<String, String> container = new KafkaMessageListenerContainer<>(consumerFactory(), createContainerProperties()); LOGGER.info("===createKafkaMessageListenerContainer"); return container; } private ContainerProperties createContainerProperties() { ContainerProperties containerProps = new ContainerProperties(mTopic); containerProps.setMessageListener(createMessageListener()); return containerProps; } private ConsumerFactory<String, String> consumerFactory() { Map<String, Object> properties = new HashMap<String, Object>(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, mM2KCKafkaConfig.getKafkaBootStrapServers()); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, mM2KCKafkaConfig.getKafkaKeyDeserializerClass()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, mM2KCKafkaConfig.getKafkaValueDeserializerClass()); properties.put(ConsumerConfig.GROUP_ID_CONFIG, mM2KCKafkaConfig.getKafkaConsumerGroupID()); if (mM2KCKafkaConfig.isKafkaSslEnable()) { // TODO : to test properties.put("security.protocol", mM2KCKafkaConfig.getKafkaSslProtocol()); properties.put("ssl.truststore.location", mM2KCKafkaConfig.getKafkaSslStoreLocation()); properties.put("ssl.truststore.password", mM2KCKafkaConfig.getKafkaSslTrustStorePassword()); properties.put("ssl.keystore.location", mM2KCKafkaConfig.getKafkaSslStoreLocation()); properties.put("ssl.keystore.password", mM2KCKafkaConfig.getKafkaSslKeyStorePassword()); properties.put("ssl.key.password", mM2KCKafkaConfig.getKafkaSslKeyPassword()); } return new DefaultKafkaConsumerFactory<String, String>(properties); } private MessageListener<String, String> createMessageListener() { return new MessageListener<String, String>() { @Override public void onMessage(ConsumerRecord<String, String> data) { // TODO Auto-generated method stub LOGGER.info("===Consuming msg: {}", data.value()); } }; } }
继承InitializingBean
只是为了初始化,也能够去掉,将初始化写入了构造函数中。这里的消费者和生产者都使用@Scope
,因此须要手动获取实例,经过context去调用getBean()。另外配置文件没有写全,这里须要注意。
Spring Integration也有对Kafka支持的适配器,采用Spring Integration,咱们也可以快速的实现发布订阅功能,且实现群组多消费者批量消费功能:
咱们能够先看看总体的Kafka消息传递通道:
具体的Demo能够参考Github中的一个sample :
本篇文章详细介绍了Spring Kafka的发送消息和接收消息功能,其余包括Spring Kafka Stream的简单介绍、Spring Kafka参数配置,以及在Spring Boot中如何经过三种方式去实现Kafka的发布订阅功能,涉及了Kafka的多消费者多订阅者,SSL安全传输,Spring Integration Kafka等。文章很长,把握整体,结合实际,差很少基本内容都有所涉及了。
Spring Expression Language(简称SpEL),在Spring中,不一样于属性占位符${...}
,而SpEL
表达式则要放到#{...}
中(除代码块中用Expression外)。如配置文件中有topics参数spring.kafka.topics
,则能够将配置文件中参数传入注解@KafkaListener(id = "foo", topics = "#{'${topicOne:annotated1,foo}'.split(',')}")
。
SpEL
表达式经常使用示例:
// 字面量 #{3.1415926} // 浮点数 #{9.87E4} // 科学计数法表示98700 #{'Hello'} // String 类型 #{false} // Boolean 类型 // 引用Bean、属性和方法 #{sgtPeppers} // 使用这个bean #{sgtPeppers.artist} // 引用bean中的属性 #{sgtPeppers.selectArtist()} // 引用bean中的方法 #{sgtPeppers.selectArtist().toUpperCase()} // 方法返回值的操做 #{sgtPeppers.selectArtist()?.toUpperCase()} // 防止selectArtist()方法返回null,?表示非null则执行toUpperCase() // 访问类做用域的方法和常量的话,使用T()这个关键的运算符 #{T(java.lang.Math)} #{T(java.lang.Math).PI} // 引用PI的值 #{T(java.lang.Math).random()} // 获取0-1的随机数 #{T(System).currentTimeMillis()} // 获取时间到当前的毫秒数 // 替代属性占位符获取配置文件属性值 @Value("#{表达式}" private String variable;