如下的操做都是基于kafka_2.11-2.2.0java
bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 2 --topic spring-kafka-demo2
/bin/kafka-topics.sh --zookeeper localhost:2181 --list
/bin/kafka-topics.sh --zookeeper localhost:2181 --delete -topic spring-kafka-demo
./bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --list
老版本是指定zk的地址,相似这样:spring
kafka-consumer-groups.sh --zookeeper 127.0.0.1:2181 --list
新版本使用bootstrap,这个是区别。bootstrap
kafka-consumer-groups.sh --new-consumer --bootstrap-server 127.0.0.1:9092 --group test-consumer-group --describe
这个是查看组名为test-consumer-group
的消费组的状况。工具
查询的结果相似下面这样,spa
简单对每列进行说明:code
消费者的topic名称 server
分区数的名称 rem
consumer group最后一次提交的offsetkafka
最后提交的生产消息offsetit
消费offset与生产offset之间的差值
消费者的ID编号,咱们知道消费者组里面能够有最少要有一个消费者,固然也能够有多个消费者。
消费者的主机IP地址。
连接的ID编号。
关于offset补充一些知识点。
kafka有个经常使用的设置是 auto.offset.reset
,它的意义是,
该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的状况下(因消费者长
时间失效,包含偏移量的记录已通过时井被删除)该做何处理。它的默认值是 latest , 意
思是说,在偏移量无效的状况下,消费者将从最新的记录开始读取数据(在消费者启动之
后生成的记录)。另外一个值是 earliest ,意思是说,在偏移量无效的状况下,消费者将从
起始位置读取分区的记录。
这个属性有如下几个值,
我要强调的是,这个设置只有在咱们的消费者(或者消费者群组)在分区内找不到有效的offset时才会生效。
举个例子,
咱们使用java kafka客户端来操做kafak。
好比咱们在消费组group1
有个消费者,消费了5条消息而后节点挂了。
而后咱们重启这个消费节点,那么我来问你,这个消费者会从哪里开始消费?
若是你回答根据auto.offset.reset
的配置来决定那就说明你没理解我上面所说的。
正确的答案是,消费者会继续从上次挂掉的offset(kafka broker保存)那里继续消费,根本不理会auto.offset.reset
。
再举个例子,
生产者在某个topic生产了一些消息,而后咱们启动一个消费组group2
,里面有一个消费者。
若是这个时候kafka没有这个topic消息的offset信息,那么auto.offset.reset
的值就决定从哪里消费。