kafka最初是由linkedin开发的,是一个分布式,分区的,多副本的,基于Zookeeper协调的分布式日志系统,固然它也能够当作消息队列来使用。
常见的能够用于Web,nginx日志,访问日志,消息服务等等。
因此kafka的应用场景主要有:日志收集系统和消息系统。html
消费者生产者之间不想相互耦合,只要都遵循一样的接口约束就行。nginx
这里主要是为了保证数据不会丢失,许多消息队列采用"插入-获取-删除"的模式,在把一个消息从队列中年删除以前,须要系统明确指出这个消息已经被处理完毕,从而确保数据被安全地保存直到使用完毕。web
支持扩展spring
在访问量剧增的状况下,使用消息队列可以使得关键组件顶住忽然的访问压力,使得应用仍然须要继续发挥做用。apache
系统的一部分组件失效时,不会影响整个系统,即便一个处理消息的线程挂掉,加入队列中的消息也能够在系统恢复后被处理。json
Kafka保证一个Partition中的消息的有序性。bootstrap
经过一个缓冲层来帮助任务最高效率地执行,写入队列的处理尽量地传递。缓存
采用异步通讯机制,容许先把消息放入队列,但并不当即处理,而是在须要的时候再去用它们。安全
Kafka集群包括一个或者多个服务器,服务器节点称为broker。broker存储topic的数据,若是某个topic有N个partition,集群有N个broker,那么每一个broker存储该topic的一个partition,若是某个topic有N个partition,集群有N+m个broker,那么N个broker存储该topic中的一个partition,剩下的m个broker不存储该topic的partition数据。若是某个topic的broker数量比partition的数量少,那么一个broker可能会存储多个该topic的partition。
在实际生产中应该尽可能避免这种状况发生,由于很容易形成kafka集群数据不均衡。服务器
每条发布到kafka的集群消息都有一个类别,这个类别称为topic。
Topic的数据分割为一个或者多个partition,每一个partition中的数据使用过个segment文件存储。partition的数据是有序的,不一样partition间的数据丢失了数据的顺序,若是topic有多个partition,消费数据就不能保证数据的顺序,在须要严格保证消息的消息顺序的场景下,须要将partition数目须要1。
生产者
消费者
每一个Consumer属于一个特定的ComsumerGroup,可为每一个Consumer指定GroupName,不指定则为默认。
每一个Partition有多个副本,其中有且仅有一个Leader,即负责读写数据的Partition。
Follower跟随Leader,全部的写请求都经过Leader路由,数据变动会广播到全部的Follower。若是Leader失效,那么Follower中会选举出一个新的Leader。
本想继续写一写kafka的架构,高可用设计和其中的一些特性的,可是我这两天在看这些东西的时候发现这些仍是在一个demo的基础上再去学习比较好,因此这些留在下一篇写了。
安装kafka和Zookeeper,kafka运行须要Zookeeper来支持,来进行心跳等机制,因此在运行kafka以前安装好Zookeeper。网上帖子不少,就不细写了,可是我这里Zookeeper和kafka都是单实例的,并无配置集群。
IDEA用SpringInitializer创建一个大工程,而后创建KafkaConsumer和KafkaProducer两个module就好了。
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.16.18</version> </dependency> </dependencies>
生产者
server.port=8099 # kafka地址 spring.kafka.bootstrap-servers=127.0.0.1:9092 #写入失败的时候的重试次数 spring.kafka.producer.retries=0 # 每次批量发送消息的数量 spring.kafka.producer.batch-size=16384 # producer积累数据一次性发送,缓存大小到达这个值就发送数据 spring.kafka.producer.buffer-memory=33554432 #acks = 0 若是设置为零,则生产者将不会等待来自服务器的任何确认,该记录将当即添加到套接字缓冲区并视为已发送。在这种状况下,没法保证服务器已收到记录,而且重试配置将不会生效(由于客户端一般不会知道任何故障),为每条记录返回的偏移量始终设置为-1。 #acks = 1 这意味着leader会将记录写入其本地日志,但无需等待全部副本服务器的彻底确认便可作出回应,在这种状况下,若是leader在确认记录后当即失败,但在将数据复制到全部的副本服务器以前,则记录将会丢失。 #acks = all 这意味着leader将等待完整的同步副本集以确认记录,这保证了只要至少一个同步副本服务器仍然存活,记录就不会丢失,这是最强有力的保证,这至关于acks = -1的设置。 spring.kafka.producer.acks=1 # 指定消息key和消息体的编解码方式 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
消费者
server.port=8090 # kafka地址 spring.kafka.bootstrap-servers=127.0.0.1:9092 # 自动提交的时间间隔 spring.kafka.consumer.auto-commit-interval=1S # 指定消费者在读取一个没有偏移量的分区或者偏移量无效的分区的状况下如何处理。 # latest在偏移量无效的状况下,消费者将从最新的记录开始读取数据 # earliest在偏移量无效的状况下,消费者将从起始位置读取分区的记录 spring.kafka.consumer.auto-offset-reset=earliest # 是否自动提交偏移量,为了不出现重复数据和数据丢失,能够把它设置为false,而后手动提交偏移量 spring.kafka.consumer.enable-auto-commit=false # key的反序列化方式 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.listener.concurrency=5 spring.kafka.listener.ack-mode=manual_immediate spring.kafka.listener.missing-topics-fatal=false
我这里采用的就是简单的StringSerializer和StringDeserializer,若是是传递对象,有两种方式,一种是自定义解码和编码器,须要实现Serializer接口,另外一种就是用已有的格式来解码和编码,好比json格式来传递信息,而后用fastjson等框架来解码和编码。
另一点就是消费者的监听器必需要设置ack-mode,由于上面设置的自动提交的选项设置为了false,因此须要手动设置提交offset的模式。
@Component @Slf4j public class KafkaProducer { @Autowired private KafkaTemplate<String,Object> kafkaTemplate; public void send(Object o){ String objStr = JSONObject.toJSONString((o)); log.info("sending info:"+objStr); ListenableFuture<SendResult<String,Object>> future= kafkaTemplate.send("test-topic-1",o); future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() { @Override public void onFailure(Throwable throwable) { log.info("test-topic-1发送失败,"+throwable.getMessage()); } @Override public void onSuccess(SendResult<String, Object> stringObjectSendResult) { log.info("test-topic-1发送成功,"+stringObjectSendResult.toString()); } }); } }
而后简单写一个Controller来触发消息的发送。
@RestController public class KafkaController { @Autowired private KafkaProducer kafkaProducer; @GetMapping("/message/send") public boolean send(){ kafkaProducer.send("this is a test message"); return true; } }
@Component @Slf4j public class KafkaConsumer { @KafkaListener(topics = "test-topic-1",groupId = "test-group-1") public void topic_test(ConsumerRecord<?,?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC)String topic){ Optional message = Optional.ofNullable(record.value()); if(message.isPresent()){ Object msg = message.get(); log.info("消费了: topic:"+topic+",message:"+msg); ack.acknowledge(); } } @KafkaListener(topics = "test-topic-1",groupId = "test-group-2") public void topic_test_1(ConsumerRecord<?,?>record,Acknowledgment ack,@Header(KafkaHeaders.RECEIVED_TOPIC)String topic){ Optional message = Optional.ofNullable(record.value()); if (message.isPresent()) { Object msg = message.get(); log.info("消费了: topic:"+topic+",message:"+msg); ack.acknowledge(); } } }
在启动这两个模块以前,须要确认kafka和Zookeeper都已经启动。
启动生产者,控制台有以下信息:
2020-09-13 21:53:10.892 INFO 17928 --- [nio-8099-exec-1] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.5.1 2020-09-13 21:53:10.894 INFO 17928 --- [nio-8099-exec-1] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 0efa8fb0f4c73d92 2020-09-13 21:53:10.894 INFO 17928 --- [nio-8099-exec-1] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1600005190890 2020-09-13 21:53:11.125 INFO 17928 --- [ad | producer-1] org.apache.kafka.clients.Metadata : [Producer clientId=producer-1] Cluster ID: OtDSNkOFT4eFbSso_V8qAQ 2020-09-13 21:53:11.167 INFO 17928 --- [ad | producer-1] c.e.k.producer.KafkaProducer : test-topic-1发送成功,SendResult [producerRecord=ProducerRecord(topic=test-topic-1, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=null, value=this is a test message, timestamp=null), recordMetadata=test-topic-1-0@4] 2020-09-13 21:55:34.570 INFO 17928 --- [nio-8099-exec-3] c.e.k.producer.KafkaProducer : sending info:"this is a test message" 2020-09-13 21:55:34.579 INFO 17928 --- [ad | producer-1] c.e.k.producer.KafkaProducer : test-topic-1发送成功,SendResult [producerRecord=ProducerRecord(topic=test-topic-1, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=null, value=this is a test message, timestamp=null), recordMetadata=test-topic-1-0@5]
启动消费者,能够看到控制台打印了发过来的信息
2020-09-13 21:55:24.077 INFO 13296 --- [ntainer#1-4-C-1] o.s.k.l.KafkaMessageListenerContainer : test-group-2: partitions assigned: [test-topic-1-0] 2020-09-13 21:55:24.077 INFO 13296 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : test-group-1: partitions assigned: [test-topic-1-0] 2020-09-13 21:55:24.114 INFO 13296 --- [ntainer#0-0-C-1] c.e.k.consumer.KafkaConsumer : 消费了: topic:test-topic-1,message:this is a test message 2020-09-13 21:55:24.114 INFO 13296 --- [ntainer#1-4-C-1] c.e.k.consumer.KafkaConsumer : topic_test1 消费了: Topic:test-topic-1,Message:this is a test message 2020-09-13 21:55:34.579 INFO 13296 --- [ntainer#0-0-C-1] c.e.k.consumer.KafkaConsumer : 消费了: topic:test-topic-1,message:this is a test message 2020-09-13 21:55:34.580 INFO 13296 --- [ntainer#1-4-C-1] c.e.k.consumer.KafkaConsumer : topic_test1 消费了: Topic:test-topic-1,Message:this is a test message