在上一章中SpringBoot整合RabbitMQ,已经详细介绍了消息队列的做用,这一种咱们直接来学习SpringBoot如何整合kafka发送消息。java
kafka是用Scala和Java语言开发的,高吞吐量的分布式消息中间件。高吞吐量使它在大数据领域具备自然的优点,被普遍用来记录日志。git
注1:图中的红色箭头表示消息的流动过程,蓝色表示分区备份,绿色表示kafka集群注册到zookeeper。github
注2:在kafka0.9版本以前,消费者消费消息的位置记录在zookeeper中,在0.9版本以后,消费消息的位置记录在kafka的一个topic上。spring
kafka名词简介:apache
采用ack确认机制来保证消息的可靠性。bootstrap
kafka在发送消息后会同步到其余分区副本,等全部副本都接收到消息后,kafka才会发送ack进行确认。采用这种模式的劣势就是当其中一个副本宕机后,则消息生产者就不会收到kafka的ack。缓存
kafka采用ISR来解决这个问题。服务器
ISR:Leader维护的一个和leader保持同步的follower集合。微信
当ISR中的folower完成数据同步以后,leader就会向follower发送ack,若是follower长时间未向leader同步数据,则该follower就会被踢出ISR,该时间阀值的设置参数为replica.lag.time.max.ms
,默认时间为10s,leader发生故障后,就会从ISR中选举新的leader。架构
注:本文所讲的kafka版本为0.11,在0.9版本之前成为ISR还有一个条件,就是同步消息的条数。
ack参数配置
0:生产者不等待broker的ack。
1:leader分区接收到消息向生产者发送ack。
-1(all):ISR中的leader和follower同步成功后,向生产者发送ack。
假如leader中有10条消息,向两个follower同步数据,follower A同步了8条,follower B同步了9条。这时候leader宕机了,follower A和follower B中的消息是不一致的,剩下两个follower就会从新选举出一个leader。
为了保证数据的一致性,全部的follower会将各自的log文件高出HW的部分截掉,而后再重新的leader中同步数据。
在kafka0.11版本中引入了一个新特性:幂等性。启用幂等性后,ack默认为-1。将生产者中的enable.idompotence
设置为true,即启用了幂等性。
开启幂等性的Producer在初始化的时候会被分配一个PID,发往同一Partition的消息会附带Sequence Number。Broker端会对<PID,Partition,SeqNumber>作缓存,当具备相同主键的消息提交时,Broker只会缓存一条。可是每次重启PID就会发生变化,所以只能保证一次会话同一分区的消息不重复。
kafka有两种分配策略,一种是RoundRobin,另外一种是Range
RoundRobin是按照消费者组以轮询的方式去给消费者分配分区的方式,前提条件是消费者组中的消费者须要订阅同一个topic。
Range是kafka默认的分配策略,它是经过当前的topic按照必定范围来分配的,假若有3个分区,消费者组有两个消费者,则消费者A去消费1和2分区,消费者B去消费3分区。
Kafka 0.9 版本以前,consumer默认将offset保存在zookeeper中,0.9 版本开始,offset保存在kafka的一个内置topic中,该topic为_consumer_offsets
。
为了实现跨分区会话的事务,须要引入一个全局惟一的Tracscation ID,并将Producer 得到的PID与之绑定。这样当Producer重启后就能够经过正在进行的Transaction ID得到原来的PID。
为了管理Transcation ID,kafka引入了一个新的组件Transcation Coordinator。Producer就是经过和Transcation Coordinator交互得到Transction ID对应的任务状态。
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
spring: kafka: # kafka服务地址 bootstrap-servers: 47.104.155.182:9092 producer: # 生产者消息key序列化方式 key-serializer: org.apache.kafka.common.serialization.StringSerializer # 生产者消息value序列化方式 value-serializer: org.apache.kafka.common.serialization.StringSerializer consumer: # 消费者组 group-id: test-consumer-group # 消费者消息value反序列化方式 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 消费者消息value反序列化方式 value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
@Component @Slf4j @KafkaListener(topics = {"first-topic"},groupId = "test-consumer-group") public class Consumer { @KafkaHandler public void receive(String message){ log.info("我是消费者,我接收到的消息是:"+message); } }
@RestController public class Producer { @Autowired private KafkaTemplate kafkaTemplate; @GetMapping("send") public void send(){ String message = "你好,我是Java旅途"; // 第一个参数 topic // 第二个参数 消息 kafkaTemplate.send("first-topic",message); } }
此是spring-boot-route系列的第十四篇文章,这个系列的文章都比较简单,主要目的就是为了帮助初次接触Spring Boot 的同窗有一个系统的认识。本文已收录至个人github,欢迎各位小伙伴star
!
github:https://github.com/binzh303/s...
若是以为文章不错,欢迎关注、点赞、收藏,大家的支持是我创做的动力,感谢你们。
若是文章写的有问题,请不要吝啬,欢迎留言指出,我会及时核查修改。
若是你还想更加深刻的了解我,能够微信搜索「Java旅途」进行关注。回复「1024」便可得到学习视频及精美电子书。天天7:30准时推送技术文章,让你的上班路不在孤独,并且每个月还有送书活动,助你提高硬实力!