以Spring Boot 1.5.19.RELEASE 为例。java
PS:此文是已在服务端安装好Kafka的前提下进行的。(请自行查找怎么安装Kafka及建立Topic等)spring
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.1.0</version> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <exclusions> <exclusion> <artifactId>kafka-clients</artifactId> <groupId>org.apache.kafka</groupId> </exclusion> </exclusions> </dependency>
#============== kafka =================== # 指定kafka 代理地址,能够多个 kafka.bootstrap.servers=xxxxxxx:9093 #=============== provider ======================= kafka.producer.retries=2 # 每次批量发送消息的数量 kafka.producer.batch.size=16384 kafka.producer.buffer.memory=33554432 kafka.producer.linger=1 kafka.producer.acks=all # 指定消息key和消息体的编解码方式 producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer #=============== consumer ======================= # 指定默认消费者group id kafka.consumer.group.id=dev-consumer-group #earliest 当分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费。 #latest 当分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据。 #none 当该topic下全部分区中存在未提交的offset时,抛出异常。 kafka.consumer.auto.offset.reset=earliest kafka.consumer.enable.auto.commit=true kafka.consumer.session.timeout=60000 kafka.consumer.auto.commit.interval=1000 # 指定listener 容器中的线程数,用于提升并发量 kafka.consumer.concurrency=2 # 指定消息key和消息体的编解码方式 kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
其中kafka.bootstrap.servers是Kafka的地址,若是是服务器地址,要确保打开对应端口的外网访问,若是多个能够用逗号隔开
@Configuration @EnableKafka public class KafkaProducerConfig { @Value("${kafka.bootstrap.servers}") private String servers; @Value("${kafka.producer.retries}") private int retries; @Value("${kafka.producer.batch.size}") private int batchSize; @Value("${kafka.producer.linger}") private int linger; @Value("${kafka.producer.buffer.memory}") private int bufferMemory; @Value("${kafka.producer.acks}") private String acks; public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); props.put(ProducerConfig.RETRIES_CONFIG, retries); props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize); props.put(ProducerConfig.LINGER_MS_CONFIG, linger); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.ACKS_CONFIG, acks); return props; } public ProducerFactory<String, String> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } @Bean public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } }
@Configuration @EnableKafka public class KafkaConsumerConfig { @Value("${kafka.bootstrap.servers}") private String servers; @Value("${kafka.consumer.enable.auto.commit}") private boolean enableAutoCommit; @Value("${kafka.consumer.session.timeout}") private String sessionTimeout; @Value("${kafka.consumer.auto.commit.interval}") private String autoCommitInterval; @Value("${kafka.consumer.group.id}") private String groupId; @Value("${kafka.consumer.auto.offset.reset}") private String autoOffsetReset; @Value("${kafka.consumer.concurrency}") private int concurrency; @Bean(name = "kafkaListenerContainerFactory") public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(concurrency); factory.getContainerProperties().setPollTimeout(1500); return factory; } public ConsumerFactory<String, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } public Map<String, Object> consumerConfigs() { Map<String, Object> propsMap = new HashMap<>(); // propsMap.put("zookeeper.connect", "master1.hdp.com:2181,master2.hdp.com:2181,slave1.hdp.com:2181"); propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval); propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout); propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId + HostUtil.getLocalHost().getHostAddress().replace(".","")); propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); return propsMap; } }
配置都完成了,先往Topic里发送几条消息:apache
@Resource private KafkaTemplate<String, String> kafkaTemplate; private Gson gson = new GsonBuilder().create(); //发送消息方法 public void send() { KafkaMessage message = new KafkaMessage(); message.setId(System.currentTimeMillis()); message.setMsg(UUID.randomUUID().toString()); message.setSendTime(new Date()); log.info("++ message = {}", gson.toJson(message)); //发送消息到 test ListenableFuture<SendResult<String, String>> listenableFuture = kafkaTemplate.send("test", gson.toJson(message)); sendCallBack(listenableFuture); } private void sendCallBack(ListenableFuture> listenableFuture) { try { SendResult sendResult = listenableFuture.get(3, TimeUnit.SECONDS); listenableFuture.addCallback( successCallBack -> log.info("kafka Producer发送消息成功!topic=" + sendResult.getRecordMetadata().topic() + ",partition=" + sendResult.getRecordMetadata().partition() + ",offset=" + sendResult.getRecordMetadata().offset()), failureCallBack -> log.error("kafka Producer发送消息失败!sendResult=" + gson.toJson(sendResult.getProducerRecord()))); } catch (Exception e) { log.error("获取producer返回值失败", e); } }
发送完成了,还要有消费方消费消息:bootstrap
@KafkaListener(topics = {"test"}, containerFactory = "kafkaListenerContainerFactory") public void listen(ConsumerRecord<?, ?> record) { Optional<?> kafkaMessage = Optional.ofNullable(record.value()); if (kafkaMessage.isPresent()) { Object message = kafkaMessage.get(); log.info("----------------- record =" + record); log.info("------------------ message =" + message); } }
这样就能够接收test这个topic的消息了。服务器
注意这里的Topic test是已经在Kafka里面建立好的,若是没有建立Topic test,是发送不到这个Topic里面的,怎么建立Topic这里再也不多作介绍。session