Kafka 经过 KafkaConsumer 构造器初始化生产者客户端的配置。
经常使用的重要配置,详见官网。html
// 基础配置 Map<String, Object> configs = new HashMap<>(); configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094"); configs.put(ConsumerConfig.GROUP_ID_CONFIG, "my_test"); configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);
Kafka 消费者提供4种方式订阅主题,1种方式指定分区。正则表达式
// 指定主题 public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) public void subscribe(Collection<String> topics) public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) public void subscribe(Pattern pattern) // 指定分区 public void assign(Collection<TopicPartition> partitions)
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs); consumer.subscribe(Collections.singletonList("test")); // 指定主题 ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(3));
TopicPartition tp = new TopicPartition("test", 0); consumer.assign(Collections.singletonList(tp)); // 订阅指定分区 consumer.seek(tp, 4L); // 指定分区偏移量值为4 ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(3));
TopicPartition tp = new TopicPartition("test", 0); consumer.assign(Collections.singletonList(tp)); // 订阅指定分区 Map<TopicPartition, Long> tpTime = new HashMap<>(); tpTime.put(tp, 1563027475113L); // 指定时间戳 Map<TopicPartition, OffsetAndTimestamp> tpOffsetAndTime = consumer.offsetsForTimes(tpTime); long offset = tpOffsetAndTime.get(tp).offset(); // 获取偏移量 consumer.seek(tp, offset); // 指定偏移量 ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(3));
参数说明apache
public void commitSync() public void commitSync(Duration timeout) public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets) public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets, final Duration timeout)
参数说明bootstrap
public void commitAsync() public void commitAsync(OffsetCommitCallback callback) public void commitAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback)
// 获取分配给当前消费者的分区集合 public Set<TopicPartition> assignment() // 取消订阅 public void unsubscribe() // 找到指定分区的第一个偏移量 public void seekToBeginning(Collection<TopicPartition> partitions) // 找到指定分区的最后一个偏移量 public void seekToEnd(Collection<TopicPartition> partitions) // 获取指定分区即将消费的下一个偏移量 public long position(TopicPartition partition) // 获取指定分区最后提交的偏移量 public OffsetAndMetadata committed(TopicPartition partition) // 获取指定主题的分区列表 public List<PartitionInfo> partitionsFor(String topic) // 获取全部主题的信息 public Map<String, List<PartitionInfo>> listTopics() // 暂停消费 public void pause(Collection<TopicPartition> partitions) // 恢复被暂停的消费 public void resume(Collection<TopicPartition> partitions) // 获取暂停的分区列表 public Set<TopicPartition> paused() // 获取指定分区第一个偏移量 public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions) // 获取指定分区最后一个偏移量 public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions) // 唤醒消费者 public void wakeup()