Kakfa学习笔记(三)——Java API发送消费消息

上一篇Kakfa学习笔记(二)——体验Kafkajava

此次咱们用Java API来发送和消费消息shell

一对一的发送消费

首先启动zk和三个brokerapache

> bin/zookeeper-server-start.sh config/zookeeper.properties
> bin/kafka-server-start.sh config/server.properties
> bin/kafka-server-start.sh config/server-1.properties
> bin/kafka-server-start.sh config/server-2.properties
复制代码

建立chat主题,分区和副本都为3bootstrap

> bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3 --topic chat
复制代码

maven引入依赖bash

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.2.0</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.2.0</version>
</dependency>
复制代码

编写生产者代码,指定topic为chat,发送10条消息maven

public class Sender {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("acks", "all");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 10; i++)
            producer.send(new ProducerRecord<String, String>("chat", ""+Integer.toString(i), Integer.toString(i)));

        producer.close();

    }
}
复制代码

编写消费者代码,很简单,把消费的内容打印出来post

public class Receiver {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "localhost:9092");
        props.setProperty("group.id", "chat-room-1");//消费组id
        props.setProperty("enable.auto.commit", "true");
        props.setProperty("auto.commit.interval.ms", "1000");
        props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("chat"));//chat主题
      
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records)
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        }
    }
}
复制代码

咱们能够前后启动一个消费者和一个生产者,能够看到消费者输出10条数据学习

offset = 8, key = 0, value = 0
offset = 9, key = 2, value = 2
offset = 10, key = 3, value = 3
offset = 11, key = 9, value = 9
offset = 4, key = 4, value = 4
offset = 5, key = 6, value = 6
offset = 8, key = 1, value = 1
offset = 9, key = 5, value = 5
offset = 10, key = 7, value = 7
offset = 11, key = 8, value = 8
复制代码

能够看到消息并非有序的,由于消息分发到三个分区,这个消息者消费了三个分区spa

多消费者消费

由于咱们这个topic有三个分区,咱们能够先挂起三个消费者(同一个消费组),再起一个生产者,此次能够看到消息是分摊在三个消费者,不会出现重复消息code

consumer-0:
offset = 4, key = 0, value = 0
offset = 5, key = 2, value = 2
offset = 6, key = 3, value = 3
offset = 7, key = 9, value = 9

consumer-1:
offset = 4, key = 1, value = 1
offset = 5, key = 5, value = 5
offset = 6, key = 7, value = 7
offset = 7, key = 8, value = 8

consumer-2:
offset = 2, key = 4, value = 4
offset = 3, key = 6, value = 6
复制代码

若是咱们起四个消费者,看一下结果

consumer-0:
offset = 16, key = 0, value = 0
offset = 17, key = 2, value = 2
offset = 18, key = 3, value = 3
offset = 19, key = 9, value = 9

consumer-1:

consumer-2:
offset = 16, key = 1, value = 1
offset = 17, key = 5, value = 5
offset = 18, key = 7, value = 7
offset = 19, key = 8, value = 8

consumer-3
offset = 8, key = 4, value = 4
offset = 9, key = 6, value = 6
复制代码

会发现consumer-1是消费不到消息的,由于只有3个分区,因此最多只有3个消费者同时工做,多出来的一个不会消费

有序消息

若是咱们想消息有序被送达并消费,在发送时能够指定partition

producer.send(new ProducerRecord<String, String>("chat",1, Integer.toString(i), Integer.toString(i)));//第二个参数指定partition=1
复制代码

咱们起两个消费者

consumer-0:

consumer-1:
offset = 10, key = 0, value = 0
offset = 11, key = 1, value = 1
offset = 12, key = 2, value = 2
offset = 13, key = 3, value = 3
offset = 14, key = 4, value = 4
offset = 15, key = 5, value = 5
offset = 16, key = 6, value = 6
offset = 17, key = 7, value = 7
offset = 18, key = 8, value = 8
offset = 19, key = 9, value = 9
复制代码

结果只有一个消费者消费了消息,而且能够看到,消息是有序的

相关文章
相关标签/搜索