一.springboot自动配置方式整合kafka:git
springboot提供自动配置整合kafka的方式,须要作一下步骤:
1.引入kafka依赖包:github
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.2.7.RELEASE</version> </dependency>
2.在springboot配置中加入kafka相关配置,springboot启动时候会自动加载这些配置,完成连接kafka,建立producer,consumer等。spring
spring: kafka: # kafka服务地址 bootstrap-servers: 127.0.0.1:9092 # 消费者配置 consumer: bootstrap-servers: 127.0.0.1:9092 group-id: myGroup enable-auto-commit: true auto-offset-reset: earliest auto-commit-interval: 1000 max-poll-records: 10 # 生产者配置 producer: retries: 5 batch-size: 16384 buffer-memory: 33554432 acks: 1
3.消息发送端:bootstrap
@Component public class MqProviderImpl{ @Autowired private KafkaTemplate<String, String> kafkaTemplate; @Override public void sendSkMessage(String message, Properties properties) { // 发送消息,注册一个回调事件 ListenableFuture<SendResult<String, String>> futureMessage = KafkaConfig.kafkaTemplateStatic.send("test_topic", message); futureMessage.addCallback(new ListenableFutureCallback<SendResult<String, String>>(){ @Override public void onSuccess(SendResult<String, String> sendResult) { log.info(" rev "+sendResult.getProducerRecord().value()); } @Override public void onFailure(Throwable ex) { log.error(" error "+ex.getMessage()); } }); }}
4.消息消费端:springboot
@KafkaListener(topics = {"test_topic"}) public void receiveSkMessageInfo(ConsumerRecord<String, String> record, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, Acknowledgment ack) { log.info(record.value()); }
以上实现是最简单的方式,但使用springboot自动配置的方式,全部配置项必须事先写好在在applicantion.yml的spring.kafka下面,试想在分布式的场景中,若是某一项发生变更,每一个应用下面的配置都须要修改,这就须要将这些配置使用服务治理统一管理起来,这里就须要一种自定义配置的方式来解决。app
springboot自动配置kafka是在KafkaAutoConfiguration这个类中实现的,它有一个成员KafkaProperties,这个properties中保存全部关于kafka的配置。分布式
// 自动配置是在KafkaAutoConfiguration类实现的 @Configuration @ConditionalOnClass(KafkaTemplate.class) @EnableConfigurationProperties(KafkaProperties.class) @Import({ KafkaAnnotationDrivenConfiguration.class, KafkaStreamsAnnotationDrivenConfiguration.class }) public class KafkaAutoConfiguration { private final KafkaProperties properties;
KafkaProperties类的注解能够看出,配置都是从yml里的spring.kafka配置读出来的ide
@ConfigurationProperties(prefix = "spring.kafka") public class KafkaProperties {
二.springboot手动配置方式整合kafka,使用zk作配置中心:ui
在分布式的环境下,须要使用服务治理把yml里的配置统一管理起来,这里使用zookeeper来统一管理kafka的配置。若是将原有的配置放到zk中,来实现从zk上读取配置,让springboot接收到,这里就须要从新定义kafka的配置类,不能使用原有的KafkaAutoConfiguration了。this
1.从zk上拉取配置,这里使用当当开源的Config Toolkit,还自带一个操做zk的管理界面,引入pom:
<dependency> <groupId>com.dangdang</groupId> <artifactId>config-toolkit</artifactId> <version>3.3.2-RELEASE</version> </dependency>
2.在yml中添加连接zk的配置,有这些配置才能保证应用能连接zk:
configs: # zk地址 address: 192.168.1.30:2181 # 保存应用配置的节点名 env: /projectx/modulex version: 1 # zk数据组 groupdefault: groupdefault
3.下载当当的config-toolkit,访问http://localhost:8080/,加入相关配置,github上有详细说明。
4.新建一个ZKConfiguration类,实现EnvironmentAware接口,实现EnvironmentAware接口的setEnvironment能够在项目启动时设置项目的环境变量,能够在这个类中结合config-toolkit,把zk的配置加载到项目环境变量当中:
@Component public class ZKConfiguration implements EnvironmentAware { @Autowired private Environment env; private static Map<String, GeneralConfigGroup> GROUPMAP = new HashMap<>(); public ZKConfiguration() { } // 加载zk的基本配置 @Bean public ZookeeperConfigProfile zookeeperConfigProfile() { ZookeeperConfigProfile configProfile = new ZookeeperConfigProfile( Objects.requireNonNull(this.env.getProperty("configs.address")), Objects.requireNonNull(this.env.getProperty("configs.env")), this.env.getProperty("configs.version")); return configProfile; } //获得具体组里的配置 @Bean({"groupPropDefault"}) public GeneralConfigGroup generalConfigGroupDefault() { ZookeeperConfigProfile configProfile = this.zookeeperConfigProfile(); GeneralConfigGroup group = new ZookeeperConfigGroup(configProfile, this.env.getProperty("configs.groupdefault")); return group; } /** * 获取配置组 * @return */ public GeneralConfigGroup getConfigGroup(String group) { return GROUPMAP.get(group); } /** * * 项目启动时会调用这个方法,把zk里的配置组存在临时变量GROUPMAP里,之后会用到 * 因此 数据源初始化,就设置在这个方法里 * @param environment */ @Override public void setEnvironment(Environment environment) { this.env = environment; ZookeeperConfigProfile configProfile = this.zookeeperConfigProfile(); GROUPMAP.put("groupdefault", new ZookeeperConfigGroup(configProfile, this.env.getProperty("configs.groupdefault"))); }}
5.得到全部配置项后,就是让springboot去创建kafka连接了,这里至关于要从新实现KafkaAutoConfiguration
的配置。创建一个KafkaConfig配置类,这里主要是配置全部kafka须要的bean:
@ConditionalOnClass({KafkaTemplate.class}) @EnableKafka public class KafkaConfig { // 把刚刚加载zk配置的类注入进来 @Autowired private ZKConfiguration zkConfiguration; // 建立 消费者工厂 @Bean("consumerFactory") @ConditionalOnMissingBean({ConsumerFactory.class}) public ConsumerFactory<String, String> consumerFactory() { // 建立工厂须要三个参数: // 1. 消费者配置的map // 2. key的反序列化实现类 // 3. value的反序列化实现类 return new DefaultKafkaConsumerFactory<String, String>(makeKafkaConfig(), new StringDeserializer(), new StringDeserializer()); } // 建立生产者工厂 @Bean("producerFactory") @ConditionalOnMissingBean({ProducerFactory.class}) public ProducerFactory<String, String> kafkaProducerFactory() { // 生产者工厂的参数如消费者工厂 return new DefaultKafkaProducerFactory(makeKafkaConfig(), new StringSerializer(), new StringSerializer()); } // 建立 kafkaTemplate 这个bean,有了这个bean才能在实际业务中使用kafka @Bean("kafkaTemplate") @ConditionalOnMissingBean({com.seckill.boot.common.util.KafkaTemplate.class}) public KafkaTemplate<String, Protobufable> kafkaTemplate(@Qualifier("producerFactory") ProducerFactory<String, String> kafkaProducerFactory, @Qualifier("producerListener") ProducerListener<String, Protobufable> producerListener) { KafkaTemplate<String, Protobufable> kafkaTemplate = new KafkaTemplate(kafkaProducerFactory); kafkaTemplate.setProducerListener(producerListener); kafkaTemplate.setDefaultTopic("groupdefault"); return kafkaTemplate; } @Bean("producerListener") @ConditionalOnMissingBean({ProducerListener.class}) public ProducerListener<String, Protobufable> kafkaProducerListener() { return new LoggingProducerListener(); } @Bean @ConditionalOnProperty( name = {"spring.kafka.producer.transaction-id-prefix"} ) @ConditionalOnMissingBean public KafkaTransactionManager<?, ?> kafkaTransactionManager(ProducerFactory<?, ?> producerFactory) { return new KafkaTransactionManager(producerFactory); } // zk里拿到的配置取出来 private Map<String, Object> makeKafkaConfig() { // 得到配置的group GeneralConfigGroup configGroup = zkConfiguration.getConfigGroup("groupdefault"); Map<String, Object> kafkaConfig = new HashMap<>(); kafkaConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, configGroup.get("spring.kafka.bootstrap-servers")); kafkaConfig.put(ConsumerConfig.GROUP_ID_CONFIG, configGroup.get("spring.kafka.consumer.group-id")); kafkaConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, configGroup.get("spring.kafka.consumer.auto-offset-reset")); kafkaConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, configGroup.get("spring.kafka.consumer.enable-auto-commit")); kafkaConfig.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, configGroup.get("spring.kafka.consumer.auto-commit-interval")); kafkaConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, configGroup.get("spring.kafka.consumer.key-serializer")); kafkaConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, configGroup.get("spring.kafka.consumer.value-serializer")); kafkaConfig.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, configGroup.get("spring.kafka.consumer.max-poll-records")); kafkaConfig.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, configGroup.get("spring.kafka.consumer.max-poll-interval-ms")); kafkaConfig.put("ack-mode", configGroup.get("spring.kafka.listener.ack-mode")); kafkaConfig.put("concurrency", configGroup.get("spring.kafka.listener.concurrency")); kafkaConfig.put(ProducerConfig.ACKS_CONFIG, configGroup.get("spring.kafka.producer.acks")); kafkaConfig.put(ProducerConfig.BATCH_SIZE_CONFIG, configGroup.get("spring.kafka.producer.batch-size")); kafkaConfig.put(ProducerConfig.BUFFER_MEMORY_CONFIG, configGroup.get("spring.kafka.producer.buffer-memory")); kafkaConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, configGroup.get("spring.kafka.producer.key-serializer")); kafkaConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, configGroup.get("spring.kafka.producer.value-serializer")); kafkaConfig.put(ProducerConfig.RETRIES_CONFIG, configGroup.get("spring.kafka.producer.retries")); return kafkaConfig; } }
6.将kafka须要的bean配置好后,就能在实际业务中使用KafkaTemplate操做消息了
@Component public class MqProviderImpl{ @Autowired private KafkaTemplate<String, String> kafkaTemplate;