本文讲述一下如何自定义spring kafka的consumer线程池java
spring-kafka-1.2.3.RELEASE-sources.jar!/org/springframework/kafka/listener/KafkaMessageListenerContainer.javaspring
protected void doStart() { if (isRunning()) { return; } ContainerProperties containerProperties = getContainerProperties(); if (!this.consumerFactory.isAutoCommit()) { AckMode ackMode = containerProperties.getAckMode(); if (ackMode.equals(AckMode.COUNT) || ackMode.equals(AckMode.COUNT_TIME)) { Assert.state(containerProperties.getAckCount() > 0, "'ackCount' must be > 0"); } if ((ackMode.equals(AckMode.TIME) || ackMode.equals(AckMode.COUNT_TIME)) && containerProperties.getAckTime() == 0) { containerProperties.setAckTime(5000); } } Object messageListener = containerProperties.getMessageListener(); Assert.state(messageListener != null, "A MessageListener is required"); if (messageListener instanceof GenericAcknowledgingMessageListener) { this.acknowledgingMessageListener = (GenericAcknowledgingMessageListener<?>) messageListener; } else if (messageListener instanceof GenericMessageListener) { this.listener = (GenericMessageListener<?>) messageListener; } else { throw new IllegalStateException("messageListener must be 'MessageListener' " + "or 'AcknowledgingMessageListener', not " + messageListener.getClass().getName()); } if (containerProperties.getConsumerTaskExecutor() == null) { SimpleAsyncTaskExecutor consumerExecutor = new SimpleAsyncTaskExecutor( (getBeanName() == null ? "" : getBeanName()) + "-C-"); containerProperties.setConsumerTaskExecutor(consumerExecutor); } if (containerProperties.getListenerTaskExecutor() == null) { SimpleAsyncTaskExecutor listenerExecutor = new SimpleAsyncTaskExecutor( (getBeanName() == null ? "" : getBeanName()) + "-L-"); containerProperties.setListenerTaskExecutor(listenerExecutor); } this.listenerConsumer = new ListenerConsumer(this.listener, this.acknowledgingMessageListener); setRunning(true); this.listenerConsumerFuture = containerProperties .getConsumerTaskExecutor() .submitListenable(this.listenerConsumer); }
这里涉及到了两个线程池,一个是ConsumerTaskExecutor,一个是ListenerTaskExecutor,都在containerProperties里头配置
若是没有默认配置,则分别建立带"-C-"和"-L-"的SimpleAsyncTaskExecutor
ConsumerTaskExecutor用来去poll kafka消息
ListenerTaskExecutor用来调用@KafkaListener标注的方法ide
自定义executor,将其托管给spring容器的好处就是能够跟随容器的生命周期,在容器销毁以前优雅关闭线程池ui
能够重写spring-kafka-1.2.3.RELEASE-sources.jar!/org/springframework/kafka/config/ConcurrentKafkaListenerContainerFactory.java的initializeContainer方法,而后进行设置this
public class CustomConcurrentKafkaListenerContainerFactory<K,V> extends ConcurrentKafkaListenerContainerFactory<K,V> { /** * The executor for threads that poll the consumer. */ private AsyncListenableTaskExecutor consumerTaskExecutor; /** * The executor for threads that invoke the listener. */ private AsyncListenableTaskExecutor listenerTaskExecutor; public CustomConcurrentKafkaListenerContainerFactory(AsyncListenableTaskExecutor consumerTaskExecutor, AsyncListenableTaskExecutor listenerTaskExecutor) { this.consumerTaskExecutor = consumerTaskExecutor; this.listenerTaskExecutor = listenerTaskExecutor; } @Override protected void initializeContainer(ConcurrentMessageListenerContainer<K, V> instance) { super.initializeContainer(instance); instance.getContainerProperties().setConsumerTaskExecutor(consumerTaskExecutor); instance.getContainerProperties().setListenerTaskExecutor(listenerTaskExecutor); } }
应用自定义kafkaListenerContainerFactory,替换为本身扩展的ConcurrentKafkaListenerContainerFactory便可。线程
@Configuration @AutoConfigureBefore(KafkaAutoConfiguration.class) public class KafkaExecutorConfig { @Bean(name = "consumerTaskExecutor") public ThreadPoolTaskExecutor consumerTaskExecutor(){ ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(Runtime.getRuntime().availableProcessors()); executor.setMaxPoolSize(5); executor.setQueueCapacity(100); executor.setThreadNamePrefix("my-C-"); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.setWaitForTasksToCompleteOnShutdown(true); executor.initialize(); return executor; } @Bean(name = "listenerTaskExecutor") public ThreadPoolTaskExecutor listenerTaskExecutor(){ ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(5); executor.setMaxPoolSize(10); executor.setQueueCapacity(100); executor.setThreadNamePrefix("my-L-"); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.setWaitForTasksToCompleteOnShutdown(true); executor.initialize(); return executor; } @Bean("kafkaListenerContainerFactory") public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory( ConcurrentKafkaListenerContainerFactoryConfigurer configurer, ConsumerFactory<Object, Object> kafkaConsumerFactory) { ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new CustomConcurrentKafkaListenerContainerFactory<Object, Object>(consumerTaskExecutor(),listenerTaskExecutor()); configurer.configure(factory, kafkaConsumerFactory); return factory; } }
这样就大功告成了code