kafka中消费者消费消息之每一个线程维护一个KafkaConsumer实例

一、首先启动本身的kafka集群哟。java

1 启动zk:
2 bin/zkServer.sh start conf/zoo.cfg。
3 验证zk是否启动成功:
4 bin/zkServer.sh status conf/zoo.cfg。
5 启动kafka:
6 bin/kafka-server-start.sh -daemon config/server.properties。

二、生产者生产消息,模拟生产一百条数据。apache

 1 package com.bie.kafka.producer;
 2 
 3 import java.util.Properties;
 4 import java.util.concurrent.ExecutionException;
 5 
 6 import org.apache.kafka.clients.producer.KafkaProducer;
 7 import org.apache.kafka.clients.producer.Producer;
 8 import org.apache.kafka.clients.producer.ProducerRecord;
 9 import org.apache.kafka.clients.producer.RecordMetadata;
10 import org.apache.kafka.common.serialization.StringSerializer;
11 
12 /**
13  * 
14  * @Description TODO
15  * @author biehl
16  * @Date 2019年5月25日 下午2:34:46
17  *
18  */
19 public class KafKaProducerHeart {
20 
21     public static void main(String[] args) {
22         Properties props = new Properties();
23         props.put("bootstrap.servers", "slaver1:9092,slaver2:9092,slaver3:9092");
24         props.put("acks", "-1");
25         props.put("retries", 3);
26         props.put("batch.size", 323840);
27         props.put("linger.ms", 10);
28         props.put("buffer.memory", 33554432);
29         props.put("max.block.ms", 3000);
30         StringSerializer keySerializer = new StringSerializer();
31         StringSerializer valueSerializer = new StringSerializer();
32         Producer<String, String> producer = new KafkaProducer<String, String>(props, keySerializer, valueSerializer);
33         String topic = "topic1";
34         String value = " biehl 💗   wj 1314 ";
35         /*
36          * for (int i = 0; i < 100; i++) { 
37          *     //topic-key-value三元组肯定消息所在位置
38          *     producer.send(new ProducerRecord<String, String>(topic, Integer.toString(i),
39          *     value)); 
40          * }
41          */
42 
43         // 异步发送
44         /*for (int i = 0; i < 100; i++) {
45             ProducerRecord<String, String> record = new ProducerRecord<>(topic, Integer.toString(i), value);
46             // 异步发送
47             producer.send(record, new Callback() {
48 
49                 @Override
50                 public void onCompletion(RecordMetadata recordmetadata, Exception exception) {
51                     if (exception == null) {
52                         System.out.println("消息发送成功");
53                         System.out.println(
54                                 "topic: " + recordmetadata.topic() + ", partition分区: " + recordmetadata.partition());
55                     } else {
56                         System.out.println("消息发送失败");
57                     }
58                 }
59             });
60         }*/
61         
62         // 同步发送
63         for (int i = 0; i < 100; i++) {
64             ProducerRecord<String, String> record = new ProducerRecord<>(topic, Integer.toString(i), value);
65             try {
66                 RecordMetadata recordMetadata = producer.send(record).get();
67                 System.out.println(
68                         "topic: " + recordMetadata.topic() + ", partition分区: " + recordMetadata.partition());
69             } catch (InterruptedException e) {
70                 e.printStackTrace();
71             } catch (ExecutionException e) {
72                 e.printStackTrace();
73             }
74         }
75 
76         producer.close();
77     }
78 
79 }

三、kafka中消费者消费消息之每一个线程维护一个KafkaConsumer实例:
bootstrap

ConsumerRunnable,消费线程类,执行真正的消费任务安全

 1 package com.bie.kafka.kafkaThrea;
 2 
 3 import java.time.Duration;
 4 import java.util.Arrays;
 5 import java.util.Properties;
 6 
 7 import org.apache.kafka.clients.consumer.ConsumerRecord;
 8 import org.apache.kafka.clients.consumer.ConsumerRecords;
 9 import org.apache.kafka.clients.consumer.KafkaConsumer;
10 
11 /**
12  * 
13  * @Description TODO
14  * @author biehl
15  * @Date 2019年6月1日 上午11:48:53
16  * 
17  *       一、KafkaConsumer是非线程安全的,KafkaProducer是线程安全的。
18  *       二、该案例是每一个线程维护一个KafkaConsumer实例
19  *       用户建立多个线程消费topic数据,每一个线程都会建立专属该线程的KafkaConsumer实例
20  *       三、ConsumerRunnable,消费线程类,执行真正的消费任务
21  */
22 public class ConsumerRunnable implements Runnable {
23 
24     // 每一个线程维护私有的kafkaConsumer实例
25     private final KafkaConsumer<String, String> consumer;
26 
27     /**
28      * 默认每一个消费者的配置参数初始化
29      * 
30      * @param brokerList
31      * @param groupId
32      * @param topic
33      */
34     public ConsumerRunnable(String brokerList, String groupId, String topic) {
35         // 带参数的构造方法
36         Properties props = new Properties();
37         // kafka的列表
38         props.put("bootstrap.servers", brokerList);
39         // 消费者组编号
40         props.put("group.id", groupId);
41         // 自动提交
42         props.put("enable.auto.commit", true);
43         // 提交提交每一个一秒钟
44         props.put("auto.commit.interval.ms", "1000");
45         // 反序列化key
46         props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
47         // 反序列化value
48         props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
49         // 将配置信息进行初始化操做
50         this.consumer = new KafkaConsumer<>(props);
51         // 定义响应的主题信息topic
52         consumer.subscribe(Arrays.asList(topic));
53     }
54 
55     /**
56      * 
57      */
58     @Override
59     public void run() {
60         // 消费者保持一直消费的状态
61         while (true) {
62             // 将获取到消费的信息
63             ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(200));
64             // 遍历出每一个消费的消息
65             for (ConsumerRecord<String, String> record : records) {
66                 // 输出打印消息
67                 System.out.println(
68                         "当前线程名称 : " + Thread.currentThread().getName() + ", 主题名称 :" + record.topic() + ", 分区名称 :"
69                                 + record.partition() + ", 位移名称 :" + record.offset() + ", value :" + record.value());
70             }
71         }
72     }
73 
74 }

消费线程管理类,建立多个线程类执行消费任务:多线程

 1 package com.bie.kafka.kafkaThrea;
 2 
 3 import java.util.ArrayList;
 4 import java.util.List;
 5 
 6 /**
 7  * 
 8  * @Description TODO
 9  * @author biehl
10  * @Date 2019年6月1日 上午11:56:42
11  * 
12  *       一、消费线程管理类,建立多个线程类执行消费任务
13  */
14 public class ConsumerGroup {
15 
16     // 消费者群组,多消费者。
17     private List<ConsumerRunnable> consumers;
18 
19     /**
20      * 
21      * @param consumerNum
22      * @param groupId
23      * @param topic
24      * @param brokerList
25      */
26     public ConsumerGroup(int consumerNum, String groupId, String topic, String brokerList) {
27         // 初始化消费者组
28         consumers = new ArrayList<>(consumerNum);
29         // 初始化消费者,建立多少个消费者
30         for (int i = 0; i < consumerNum; i++) {
31             // 根据消费者构造方法,建立消费者实例
32             ConsumerRunnable consumerRunnable = new ConsumerRunnable(brokerList, groupId, topic);
33             // 将建立的消费者实例添加到消费者组中
34             consumers.add(consumerRunnable);
35         }
36     }
37 
38     /**
39      * 
40      */
41     public void execute() {
42         // 将消费者组里面的消费者遍历出来
43         for (ConsumerRunnable task : consumers) {
44             // 建立一个消费者线程,而且启动该线程
45             new Thread(task).start();
46         }
47     }
48 
49 }
 1 package com.bie.kafka.kafkaThrea;
 2 
 3 /**
 4  * 
 5  * @Description TODO
 6  * @author biehl
 7  * @Date 2019年6月1日 下午2:19:52
 8  *
 9  */
10 public class ConsumerMain {
11 
12     public static void main(String[] args) {
13         // kafka即broker列表
14         String brokerList = "slaver1:9092,slaver2:9092,slaver3:9092";
15         // group组名称
16         String groupId = "group1";
17         // topic主题名称
18         String topic = "topic1";
19         // 消费者的数量
20         int consumerNum = 3;
21         // 经过构造器建立出一个对象
22         ConsumerGroup consumerGroup = new ConsumerGroup(consumerNum, groupId, topic, brokerList);
23         // 执行execute的方法,建立出ConsumerRunnable消费者实例。多线程多消费者实例
24         consumerGroup.execute();
25     }
26 
27 }

效果以下所示:异步

生产者生产消息的案例:ide

 消费者消费消息的案例:this

 

待续......spa