Kafka的配置 亲测

@Test
public void  myConsumer(){

   Properties props = new Properties();
   props.put("bootstrap.servers", "47.100.131.226:9092,47.100.95.99:9092,47.100.91.36:9092");
   props.put("group.id", "etlGroupTEST003");
   props.put("enable.auto.commit", "true");
   props.put("auto.commit.interval.ms", "1000");

   props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
   props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
   props.put("max.poll.records", "10");
   //props.put("auto.offset.reset", "latest");    // latest : 不一样group 若没有已提交的offset  则从最新的offset开始消费
   props.put("auto.offset.reset", "earliest"); // earliest : 不一样group 若没有已提交的offset  则从头开始消费

   //建立消费者配置对象
   KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String,String>(props);

   /*TopicPartition[] topicPartitions = new TopicPartition[]{
         new TopicPartition("SC.CMCC_Children_login", 0),
         new TopicPartition("SC.CMCC_Children_login", 1),
         new TopicPartition("SC.CMCC_Children_login", 2),
   };*/
   //kafkaConsumer.assign(Arrays.asList(topicPartitions));
   kafkaConsumer.subscribe(Arrays.asList("SC.CMCC_Children_ui")); //订阅该主题下所有分区, 各分区内容不一样
   while (true) {
      ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
      System.out.println("拉取记录数:"+records.count());
      for (ConsumerRecord<String, String> record : records) {
         System.out.printf("offset = %d, partition = %d, value = %s", record.offset(), record.partition(), record.value());
         try {
            Thread.sleep(100);
         } catch (InterruptedException e) {
            e.printStackTrace();
         }
         System.out.println();
      }
   }

}
相关文章
相关标签/搜索