1.在kafka中若是不设置消费的信息的话,一个消息只能被一个group.id消费一次,而新加如的group.id则会被“消费管理”记录,并指定从当前记录的消息位置开始向后消费。若是有段时间消费者关闭了,并有发送者发送消息那么下次这个消费者启动时也会接收到,可是咱们若是想要从这个topic的第一条消息消费呢?java
public class SimpleConsumerPerSonIndex2 { public static void main(String[] args) throws Exception { //Kafka consumer configuration settings String topicName = "mypartition001"; Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "partitiontest112"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); //要发送自定义对象,须要指定对象的反序列化类 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "com.ys.test.SpringBoot.zktest.encode.DecodeingKafka"); KafkaConsumer<String, Object> consumer = new KafkaConsumer<String, Object>(props); Map<TopicPartition, OffsetAndMetadata> hashMaps = new HashMap<TopicPartition, OffsetAndMetadata>(); hashMaps.put(new TopicPartition(topicName, 0), new OffsetAndMetadata(0)); consumer.commitSync(hashMaps); consumer.subscribe(Arrays.asList(topicName)); while (true) { ConsumerRecords<String, Object> records = consumer.poll(100); for (ConsumerRecord<String, Object> record : records){ System.out.println(record.toString()); } } } }
首先咱们在consumer.subscribe(Arrays.asList(topicName));订阅一个topic以前要设置从这个topic的offset为0的地方获取。
注意:这样的方法要保证这个group.id是新加入,若是是之前存在的,那么会抛异常。
2.若是之前就存在的groupid想要获取指定的topic的offset为0开始以后的消息:apache
public class SimpleConsumerPerSonIndex2 { public static void main(String[] args) throws Exception { //Kafka consumer configuration settings String topicName = "mypartition001"; Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "partitiontest002"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); //要发送自定义对象,须要指定对象的反序列化类 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "com.ys.test.SpringBoot.zktest.encode.DecodeingKafka"); KafkaConsumer<String, Object> consumer = new KafkaConsumer<String, Object>(props); // consumer.subscribe(Arrays.asList(topicName)); consumer.assign(Arrays.asList(new TopicPartition(topicName, 0))); consumer.seekToBeginning(Arrays.asList(new TopicPartition(topicName, 0)));//不改变当前offset // consumer.seek(new TopicPartition(topicName, 0), 10);//不改变当前offset while (true) { ConsumerRecords<String, Object> records = consumer.poll(100); for (ConsumerRecord<String, Object> record : records){ System.out.println(record.toString()); } } } }
使用 consumer.assign(Arrays.asList(new TopicPartition(topicName, 0)));来分配topic和partition,
而consumer.seekToBeginning(Arrays.asList(new TopicPartition(topicName, 0)));指定从这个topic和partition的开始位置获取。bootstrap
3.存在的groupid获取指定的topic任意的offset
上面的代码放开 consumer.seek(new TopicPartition(topicName, 0), 10);//不改变当前offset
并注释 consumer.seekToBeginning(Arrays.asList(new TopicPartition(topicName, 0)));//不改变当前offset;
其中consumer.seek(new TopicPartition(topicName, 0), 10)中的10是表示从这个topic的partition中的offset为10的开始获取消息。session
须要注意的是 consumer.assign()是不会被消费者的组管理功能管理的,他相对因而一个临时的,不会改变当前group.id的offset,好比:
在使用consumer.subscribe(Arrays.asList(topicName));时offset为20,若是经过2和3,已经获取了最新的消息offset是最新的,
在下次经过 consumer.subscribe(Arrays.asList(topicName));来获取消息时offset仍是20.仍是会获取20之后的消息。
其实在二、3的结果截图中咱们也能够发现没有1中结果图的joining group的日志输出,表示没有加入到group中。3d