Kafka Java API+自定义分区

kafka的API

第一步:导入kafka的开发jar包apache

 


   

   

 


 

 

Kafka生产者bootstrap


@Testdom

   public void kafkaProducer() throws Exception {ide

      //一、准备配置文件oop

       Properties props = new Properties();server

       props.put("bootstrap.servers", "hadoop-001:9092,hadoop-002:9092,hadoop-003:9092");hadoop

       props.put("acks", "all");开发

       props.put("retries", 0);kafka

       props.put("batch.size", 16384);it

       props.put("linger.ms", 1);

       props.put("buffer.memory", 33554432);

       props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

       props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

       //二、建立KafkaProducer

       KafkaProducer

       for (int i=0;i<100;i++){

           //三、发送数据

           kafkaProducer.send(new ProducerRecord

       }

 

      kafkaProducer.close();

   }


 

 

Kafka消费者


@Test

   public void kafkaConsum() throws Exception {

        // 一、准备配置文件

       Properties props = new Properties();

       props.put("bootstrap.servers", "hadoop-001:9092,hadoop-002:9092,hadoop-003:9092");

       props.put("group.id", "test");

       props.put("enable.auto.commit", "true");

       props.put("auto.commit.interval.ms", "1000");

       props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

       props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

 

       // 二、建立KafkaConsumer

       KafkaConsumer

       // 三、订阅数据,这里的topic能够是多个

       kafkaConsumer.subscribe(Arrays.asList("yun01"));

       // 四、获取数据

       while (true) {

           ConsumerRecords

           for (ConsumerRecord

               System.out.printf("topic = %s,offset = %d, key = %s, value = %s%n",record.topic(), record.offset(), record.key(), record.value());

           }

 

       }

   }

 


 

 

 

kafka的自定义分区

第一种方式:直接指定分区


kafkaProducer.send(new ProducerRecord

 

第二种自定义分区

public class KafkaCustomPartitioner implements Partitioner {

   @Override

   public void configure(Map

   }

 

   @Override

   public int partition(String topic, Object arg1, byte[] keyBytes, Object arg3, byte[] arg4, Cluster cluster) {

      List

       int partitionNum = partitions.size();

      Random random = new Random();

      int partition = random.nextInt(partitionNum);

       return partition;

   }

 

   @Override

   public void close() {

     

   }

 

}


 

 

 

主代码中添加配置

 


@Test

   public void kafkaProducer() throws Exception {

      //一、准备配置文件

       Properties props = new Properties();

       props.put("bootstrap.servers", "hadoop-001:9092,hadoop-002:9092,hadoop-003:9092");

       props.put("acks", "all");

       props.put("retries", 0);

       props.put("batch.size", 16384);

       props.put("linger.ms", 1);

       props.put("buffer.memory", 33554432);

       props.put("partitioner.class", "com.gec.kafkaclient.MyCustomerPartitons");

       props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

       props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

       //二、建立KafkaProducer

       KafkaProducer

       for (int i=0;i<100;i++){

           //三、发送数据

           kafkaProducer.send(new ProducerRecord

       }

 

      kafkaProducer.close();

   }

相关文章
相关标签/搜索