Kafka读取本地文件做为生产者

package com.qf.utils;import kafka.javaapi.producer.Producer;import kafka.producer.KeyedMessage;import kafka.producer.ProducerConfig;import kafka.serializer.StringEncoder;import java.io.*;import java.util.Properties;public class CollectLog {   public static void main(String[] args){     Properties properties = new Properties();     properties.setProperty("metadata.broker.list",             "mini4:9092,mini5:9092,mini6:9092");     //消息传递到broker时的序列化方式      properties.setProperty("serializer.class",StringEncoder.class.getName());      //zk的地址      properties.setProperty("zookeeper.connect",              "mini4:2181,mini5:2181,mini6:2181");      //是否反馈消息 0是不反馈消息 1是反馈消息      properties.setProperty("request.required.acks","1");      ProducerConfig producerConfig = new ProducerConfig(properties);      Producer<String,String> producer = new Producer<String,String>(producerConfig);      try {         BufferedReader bf = new BufferedReader(                 new FileReader(                         new File(                                 "D:\\qf大数据\\spark\\day13_项目\\考试i\\JsonTest.json")));         String line = null;         while((line=bf.readLine())!=null){             KeyedMessage<String,String> keyedMessage = new KeyedMessage<String,String>("JsonData3",line);           Thread.sleep(5000);            producer.send(keyedMessage);         }         bf.close();         producer.close();         System.out.println("已经发送完毕");      } catch (Exception e) {         e.printStackTrace();      }   }}
相关文章
相关标签/搜索