Java编写程序将数据存入Kafka中

Kafka是一个相似于RabbitMQ的消息系统,它的主要功能是消息的发布和订阅、处理和存储。java

1.它相似于一个消息系统,读写流式的数据。apache

2.编写可扩展的流应用处理程序,用于实时事件响应的场景。bootstrap

3.安全的将流式的数据存储在一个分布式,有副本备份,容错的集群。安全

 

本篇博文主要介绍如何使用Java编写程序将数据写入到Kafka中,即Kafka生产者,并不涉及Kafka消费者。另外,像Spark,Storm等都有相应的程序从Kafka消费者中获取数据的方法,直接调用便可。服务器

 

Kafka的运行须要Zookeeper的帮助,因此,须要先安装Zookeeper。分布式

 

1.先启动Zookeeperthis

bin/zookeeper-server-start.sh config/zookeeper.properties

 再启动Kafka服务器:spa

bin/kafka-server-start.sh config/server.properties

2.建立一个Topic:code

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

 显示topicorm

bin/kafka-topics.sh --list --zookeeper localhost:2181

 也能够在程序中进行topic的建立。

3.发送消息:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

4.接收消息

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

 

 

 

下面,是本次的程序:

 1 import org.apache.kafka.clients.producer.*;  2 
 3 import java.util.Properties;  4 import java.util.concurrent.ExecutionException;  5 
 6 public class MyProducer extends Thread {  7     private final KafkaProducer<Integer, String> producer;  8     private final String topic;  9     private final Boolean isAsync; 10 
11     public MyProducer(String topic, Boolean isAsync) { 12         Properties prop = new Properties(); 13         prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); 14         prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerSerializer"); 15         prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); 16         producer = new KafkaProducer<Integer, String>(prop); 17         this.topic = topic; 18         this.isAsync = isAsync; 19  } 20 
21     public void run() { 22         int messageNo = 1; 23         while (true) { 24             String messageStr = "Message_" + messageNo; 25             long startTime = System.currentTimeMillis(); 26             if (isAsync) { 27                 producer.send(new ProducerRecord<Integer, String>(topic, messageNo, messageStr), new DemoCallback(startTime, messageNo, messageStr)); 28             } else { 29                 try { 30                     producer.send(new ProducerRecord<Integer, String>(topic, messageNo, messageStr)).get(); 31                     System.out.println("Sent message: (" + messageNo + ", " + messageStr + ")"); 32                 } catch (InterruptedException | ExecutionException e) { 33  e.printStackTrace(); 34  } 35  } 36             ++messageNo; 37  } 38  } 39     public static void main(String[] args) { 40         boolean isAsync = args.length == 0 || !args[0].trim().equalsIgnoreCase("sync"); 41 
42         MyProducer producerThread = new MyProducer("test", isAsync); 43  producerThread.start(); 44 
45  } 46 } 47 
48 class DemoCallback implements Callback { 49     private final long startTime; 50     private final int key; 51     private final String message; 52 
53     public DemoCallback(long startTime, int key, String message) { 54         this.startTime = startTime; 55         this.key = key; 56         this.message = message; 57  } 58 
59     public void onCompletion(RecordMetadata metadata, Exception exception) { 60         long elapsedTime = System.currentTimeMillis() - startTime; 61         if (metadata != null) { 62  System.out.println( 63                     "message(" + key + ", " + message + ") sent to partition(" + metadata.partition() +
64                             "), " +
65                             "offset(" + metadata.offset() + ") in " + elapsedTime + " ms"); 66         } else { 67  exception.printStackTrace(); 68  } 69  } 70 }

 

好了,完成!

相关文章
相关标签/搜索