Spring Boot 自定义kafka 消费者配置 ContainerFactory最佳实践
本篇博文主要提供一个在 SpringBoot 中自定义 kafka配置的实践,想象这样一个场景:你的系统须要监听多个不一样集群的消息,在不一样的集群中topic冲突了,因此你须要分别定义kafka消息配置。java
此篇文章会在SpringBoot 提供的默认模板上提供扩展,不会由于你自定义了消费者配置,而致使原生SpringBoot的Kakfa模板配置失效。算法
引入 MAVEN 依赖
版本须要你本身指定spring
<dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>xxx</version> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>xxx</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>xxx</version> </dependency>
引入Java配置类
/** * 手动自定义 kafka 消费者 ContainerFactory 配置demo */ @Configuration @EnableConfigurationProperties(KafkaProperties.class) public class KafkaConsumerConfig { @Autowired private KafkaProperties properties; @Value("${监听服务地址}") private List<String> myServers; @Bean("myKafkaContainerFactory") @ConditionalOnBean(ConcurrentKafkaListenerContainerFactoryConfigurer.class) public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory( ConcurrentKafkaListenerContainerFactoryConfigurer configurer) { ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>(); configurer.configure(factory, consumerFactory()); return factory; } //得到建立消费者工厂 public ConsumerFactory<Object, Object> consumerFactory() { KafkaProperties myKafkaProperties = JSON.parseObject(JSON.toJSONString(this.properties), KafkaProperties.class); //对模板 properties 进行定制化 //.... //例如:定制servers myKafkaProperties.setBootstrapServers(myServers); return new DefaultKafkaConsumerFactory<>(myKafkaProperties.buildConsumerProperties()); } }
yml模板
#kafka配置,更多配置请参考:KafkaProperties spring.kafka: #公共参数,其余的timeout.ms, request.timeout.ms, metadata.fetch.timeout.ms保持默认值 properties: #这个参数指定producer在发送批量消息前等待的时间,当设置此参数后,即使没有达到批量消息的指定大小(batch-size),到达时间后生产者也会发送批量消息到broker。默认状况下,生产者的发送消息线程只要空闲了就会发送消息,即使只有一条消息。设置这个参数后,发送线程会等待必定的时间,这样能够批量发送消息增长吞吐量,但同时也会增长延迟。 linger.ms: 50 #默认值:0毫秒,当消息发送比较频繁时,增长一些延迟可增长吞吐量和性能。 #这个参数指定producer在一个TCP connection可同时发送多少条消息到broker而且等待broker响应,设置此参数较高的值能够提升吞吐量,但同时也会增长内存消耗。另外,若是设置太高反而会下降吞吐量,由于批量消息效率下降。设置为1,能够保证发送到broker的顺序和调用send方法顺序一致,即使出现失败重试的状况也是如此。 #注意:当前消息符合at-least-once,自kafka1.0.0之后,为保证消息有序以及exactly once,这个配置可适当调大为5。 max.in.flight.requests.per.connection: 1 #默认值:5,设置为1即表示producer在connection上发送一条消息,至少要等到这条消息被broker确认收到才继续发送下一条,所以是有序的。 #生产者的配置,可参考org.apache.kafka.clients.producer.ProducerConfig producer: #这个参数能够是任意字符串,它是broker用来识别消息是来自哪一个客户端的。在broker进行打印日志、衡量指标或者配额限制时会用到。 clientId: ${spring.application.name} #方便kafkaserver打印日志定位请求来源 bootstrap-servers: 127.0.0.1:8080 #kafka服务器地址,多个以逗号隔开 #acks=0:生产者把消息发送到broker即认为成功,不等待broker的处理结果。这种方式的吞吐最高,但也是最容易丢失消息的。 #acks=1:生产者会在该分区的leader写入消息并返回成功后,认为消息发送成功。若是群首写入消息失败,生产者会收到错误响应并进行重试。这种方式可以必定程度避免消息丢失,但若是leader宕机时该消息没有复制到其余副本,那么该消息仍是会丢失。另外,若是咱们使用同步方式来发送,延迟会比前一种方式大大增长(至少增长一个网络往返时间);若是使用异步方式,应用感知不到延迟,吞吐量则会受异步正在发送中的数量限制。 #acks=all:生产者会等待全部副本成功写入该消息,这种方式是最安全的,可以保证消息不丢失,可是延迟也是最大的。 #若是是发送日志之类的,容许部分丢失,可指定acks=0,若是想不丢失消息,可配置为all,但需密切关注性能和吞吐量。 acks: all #默认值:1 #当生产者发送消息收到一个可恢复异常时,会进行重试,这个参数指定了重试的次数。在实际状况中,这个参数须要结合retry.backoff.ms(重试等待间隔)来使用,建议总的重试时间比集群从新选举leader的时间长,这样能够避免生产者过早结束重试致使失败。 #另外需注意,当开启重试时,若未设置max.in.flight.requests.per.connection=1,则可能出现发往同一个分区的两批消息的顺序出错,好比,第一批发送失败了,第二批成功了,而后第一批重试成功了,此时二者的顺序就颠倒了。 retries: 2 #发送失败时重试多少次,0=禁用重试(默认值) #默认状况下消息是不压缩的,此参数可指定采用何种算法压缩消息,可取值:none,snappy,gzip,lz4。snappy压缩算法由Google研发,这种算法在性能和压缩比取得比较好的平衡;相比之下,gzip消耗更多的CPU资源,可是压缩效果也是最好的。经过使用压缩,咱们能够节省网络带宽和Kafka存储成本。 compressionType: "none" #若是不开启压缩,可设置为none(默认值),比较大的消息可开启。 #当多条消息发送到一个分区时,Producer会进行批量发送,这个参数指定了批量消息大小的上限(以字节为单位)。当批量消息达到这个大小时,Producer会一块儿发送到broker;但即便没有达到这个大小,生产者也会有定时机制来发送消息,避免消息延迟过大。 batch-size: 16384 #默认16K,值越小延迟越低,可是吞吐量和性能会下降。0=禁用批量发送 #这个参数设置Producer暂存待发送消息的缓冲区内存的大小,若是应用调用send方法的速度大于Producer发送的速度,那么调用会阻塞必定(max.block.ms)时间后抛出异常。 buffer-memory: 33554432 #缓冲区默认大小32M #消费者的配置,可参考:org.apache.kafka.clients.consumer.ConsumerConfig consumer: #这个参数能够为任意值,用来指明消息从哪一个客户端发出,通常会在打印日志、衡量指标、分配配额时使用。 #暂不用提供clientId,2.x版本可放出来,1.x有多个topic且concurrency>1会出现JMX注册时异常 #clientId: ${spring.application.name} #方便kafkaserver打印日志定位请求来源 # 签中kafka集群 bootstrap-servers: 127.0.0.1:8080 #kafka服务器地址,多个以逗号隔开 #这个参数指定了当消费者第一次读取分区或者无offset时拉取那个位置的消息,能够取值为latest(从最新的消息开始消费),earliest(从最老的消息开始消费),none(若是无offset就抛出异常) autoOffsetReset: latest #默认值:latest #这个参数指定了消费者是否自动提交消费位移,默认为true。若是须要减小重复消费或者数据丢失,你能够设置为false,而后手动提交。若是为true,你可能须要关注自动提交的时间间隔,该间隔由auto.commit.interval.ms设置。 enable-auto-commit: false #周期性自动提交的间隔,单位毫秒 auto-commit-interval: 2000 #默认值:5000 #这个参数容许消费者指定从broker读取消息时最小的Payload的字节数。当消费者从broker读取消息时,若是数据字节数小于这个阈值,broker会等待直到有足够的数据,而后才返回给消费者。对于写入量不高的主题来讲,这个参数能够减小broker和消费者的压力,由于减小了往返的时间。而对于有大量消费者的主题来讲,则能够明显减轻broker压力。 fetchMinSize: 1 #默认值: 1 #上面的fetch.min.bytes参数指定了消费者读取的最小数据量,而这个参数则指定了消费者读取时最长等待时间,从而避免长时间阻塞。这个参数默认为500ms。 fetchMaxWait: 500 #默认值:500毫秒 #这个参数控制一个poll()调用返回的记录数,即consumer每次批量拉多少条数据。 maxPollRecords: 500 #默认值:500 listener: #建立多少个consumer,值必须小于等于Kafk Topic的分区数。 ack-mode: MANUAL_IMMEDIATE concurrency: 1 #推荐设置为topic的分区数
配置释义
点开 KafkaProperties 这个类,能够看到这个是SpringBoot 自动配置kafka的配置类,引入这个实例,就至关于你拿到了SpringBoot kafka配置模板的参数,就是上述贴的配置,而后再此基础上从新定义你须要改变的配置,这里主要讲消费者配置。apache
代码中举了个重写监听servers的例子:json
//例如:定制servers myKafkaProperties.setBootstrapServers(myServers);
@KafkaListener 使用 containerFactory
@Slf4j @Component public class ConsumerDemo { //声明consumerID为demo,监听topicName为topic.quick.demo的Topic //这个消费者的 containerFactory 是SpringBoot 提供的 kafkaListenerContainerFactory 这个bean @KafkaListener(id = "demo", topics = "topic.quick.demo") public void listen(String msgData) { log.info("demo receive : " + msgData); } @KafkaListener(topics = "k010", containerFactory = "myKafkaContainerFactory") public void listen(String msgData, Acknowledgment ack) { log.info("demo receive : " + msgData); //手动提交 //enable.auto.commit参数设置成false。那么就是Spring来替为咱们作人工提交,从而简化了人工提交的方式。 //因此kafka和springboot结合中的enable.auto.commit为false为spring的人工提交模式。 //enable.auto.commit为true是采用kafka的默认提交模式。 ack.acknowledge(); } }
若是在@KafkaListener属性中没有指定 containerFactory 那么Spring Boot 会默认注入 name 为“kafkaListenerContainerFactory” 的 containerFactory。具体源码可跟踪:KafkaListenerAnnotationBeanPostProcessor中的常量:bootstrap
public static final String DEFAULT_KAFKA_LISTENER_CONTAINER_FACTORY_BEAN_NAME = "kafkaListenerContainerFactory";