上一篇Kakfa学习笔记(二)——体验Kafkajava
此次咱们用Java API来发送和消费消息shell
首先启动zk和三个brokerapache
> bin/zookeeper-server-start.sh config/zookeeper.properties
> bin/kafka-server-start.sh config/server.properties
> bin/kafka-server-start.sh config/server-1.properties
> bin/kafka-server-start.sh config/server-2.properties
复制代码
建立chat主题,分区和副本都为3bootstrap
> bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3 --topic chat
复制代码
maven引入依赖bash
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.2.0</version>
</dependency>
复制代码
编写生产者代码,指定topic为chat,发送10条消息maven
public class Sender {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 10; i++)
producer.send(new ProducerRecord<String, String>("chat", ""+Integer.toString(i), Integer.toString(i)));
producer.close();
}
}
复制代码
编写消费者代码,很简单,把消费的内容打印出来post
public class Receiver {
public static void main(String[] args) {
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "chat-room-1");//消费组id
props.setProperty("enable.auto.commit", "true");
props.setProperty("auto.commit.interval.ms", "1000");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("chat"));//chat主题
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}
复制代码
咱们能够前后启动一个消费者和一个生产者,能够看到消费者输出10条数据学习
offset = 8, key = 0, value = 0
offset = 9, key = 2, value = 2
offset = 10, key = 3, value = 3
offset = 11, key = 9, value = 9
offset = 4, key = 4, value = 4
offset = 5, key = 6, value = 6
offset = 8, key = 1, value = 1
offset = 9, key = 5, value = 5
offset = 10, key = 7, value = 7
offset = 11, key = 8, value = 8
复制代码
能够看到消息并非有序的,由于消息分发到三个分区,这个消息者消费了三个分区spa
由于咱们这个topic有三个分区,咱们能够先挂起三个消费者(同一个消费组),再起一个生产者,此次能够看到消息是分摊在三个消费者,不会出现重复消息code
consumer-0:
offset = 4, key = 0, value = 0
offset = 5, key = 2, value = 2
offset = 6, key = 3, value = 3
offset = 7, key = 9, value = 9
consumer-1:
offset = 4, key = 1, value = 1
offset = 5, key = 5, value = 5
offset = 6, key = 7, value = 7
offset = 7, key = 8, value = 8
consumer-2:
offset = 2, key = 4, value = 4
offset = 3, key = 6, value = 6
复制代码
若是咱们起四个消费者,看一下结果
consumer-0:
offset = 16, key = 0, value = 0
offset = 17, key = 2, value = 2
offset = 18, key = 3, value = 3
offset = 19, key = 9, value = 9
consumer-1:
consumer-2:
offset = 16, key = 1, value = 1
offset = 17, key = 5, value = 5
offset = 18, key = 7, value = 7
offset = 19, key = 8, value = 8
consumer-3
offset = 8, key = 4, value = 4
offset = 9, key = 6, value = 6
复制代码
会发现consumer-1是消费不到消息的,由于只有3个分区,因此最多只有3个消费者同时工做,多出来的一个不会消费
若是咱们想消息有序被送达并消费,在发送时能够指定partition
producer.send(new ProducerRecord<String, String>("chat",1, Integer.toString(i), Integer.toString(i)));//第二个参数指定partition=1
复制代码
咱们起两个消费者
consumer-0:
consumer-1:
offset = 10, key = 0, value = 0
offset = 11, key = 1, value = 1
offset = 12, key = 2, value = 2
offset = 13, key = 3, value = 3
offset = 14, key = 4, value = 4
offset = 15, key = 5, value = 5
offset = 16, key = 6, value = 6
offset = 17, key = 7, value = 7
offset = 18, key = 8, value = 8
offset = 19, key = 9, value = 9
复制代码
结果只有一个消费者消费了消息,而且能够看到,消息是有序的