@Test public void myConsumer(){ Properties props = new Properties(); props.put("bootstrap.servers", "47.100.131.226:9092,47.100.95.99:9092,47.100.91.36:9092"); props.put("group.id", "etlGroupTEST003"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("max.poll.records", "10"); //props.put("auto.offset.reset", "latest"); // latest : 不一样group 若没有已提交的offset 则从最新的offset开始消费 props.put("auto.offset.reset", "earliest"); // earliest : 不一样group 若没有已提交的offset 则从头开始消费 //建立消费者配置对象 KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String,String>(props); /*TopicPartition[] topicPartitions = new TopicPartition[]{ new TopicPartition("SC.CMCC_Children_login", 0), new TopicPartition("SC.CMCC_Children_login", 1), new TopicPartition("SC.CMCC_Children_login", 2), };*/ //kafkaConsumer.assign(Arrays.asList(topicPartitions)); kafkaConsumer.subscribe(Arrays.asList("SC.CMCC_Children_ui")); //订阅该主题下所有分区, 各分区内容不一样 while (true) { ConsumerRecords<String, String> records = kafkaConsumer.poll(1000); System.out.println("拉取记录数:"+records.count()); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, partition = %d, value = %s", record.offset(), record.partition(), record.value()); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(); } } }