开发简单的Kafka应用

  以前基于集群和单机安装过kafka,如今利用kafka提供的API构建一个简单的生产者消费者的项目示例,来跑通kafka的流程,具体过程以下:java

  首先使用eclipse for javaee创建一个maven项目,而后在pom.xml添加以下依赖配置:apache

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.9.2</artifactId>
        <version>0.8.2.2</version>
    </dependency>

  这里kafka版本是kafka_2.9.2-0.8.2.2,保存以后maven会自动下载依赖,注意要关闭windows防火墙,尽可能专用网络和外网都要关闭,不然下载的很慢,下载好以后就能够编写项目代码了,这里的pom.xml全部配置以下:vim

 1 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 2   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 3   <modelVersion>4.0.0</modelVersion>
 4 
 5   <groupId>kafkatest</groupId>
 6   <artifactId>kafkatest</artifactId>
 7   <version>0.0.1-SNAPSHOT</version>
 8   <packaging>jar</packaging>
 9 
10   <name>kafkatest</name>
11   <url>http://maven.apache.org</url>
12 
13   <properties>
14     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
15   </properties>
16 
17   <dependencies>
18     <dependency>
19       <groupId>junit</groupId>
20       <artifactId>junit</artifactId>
21       <version>3.8.1</version>
22       <scope>test</scope>
23     </dependency>
24     <dependency>
25         <groupId>org.apache.kafka</groupId>
26         <artifactId>kafka_2.9.2</artifactId>
27         <version>0.8.2.2</version>
28     </dependency>
29   </dependencies>
30 </project>

  而后,咱们创建一个简单生产者类SimpleProducer,代码以下:windows

 1 package test;
 2 
 3 import java.util.Properties;
 4 
 5 import kafka.javaapi.producer.Producer;
 6 import kafka.producer.KeyedMessage;
 7 import kafka.producer.ProducerConfig;
 8 
 9 public class SimpleProducer {
10         private static Producer<Integer,String> producer;
11         private final Properties props=new Properties();
12         public SimpleProducer(){
13                 //定义链接的broker list
14                 props.put("metadata.broker.list", "192.168.1.216:9092");
15                 //定义序列化类 Java中对象传输以前要序列化
16                 props.put("serializer.class", "kafka.serializer.StringEncoder");
17                 producer = new Producer<Integer, String>(new ProducerConfig(props));
18         }
19         public static void main(String[] args) {
20                 SimpleProducer sp=new SimpleProducer();
21                 //定义topic
22                 String topic="mytopic";
23                 
24                 //定义要发送给topic的消息
25                 String messageStr = "This is a message";
26                 
27                 //构建消息对象
28                 KeyedMessage<Integer, String> data = new KeyedMessage<Integer, String>(topic, messageStr);
29          
30                 //推送消息到broker
31                 producer.send(data);
32                 producer.close();
33         }
34 }

  类的代码很简单,我这里是kafka单机环境端口就是kafka broker端口9092,这里定义topic为mytopic固然能够本身随便定义不用考虑服务器是否建立,对于发送消息的话上面代码是简单的单条发送,若是发送数据量很大的话send方法屡次推送会耗费时间,因此建议把data数据按必定量分组放到List中,最后send一下AarrayList便可,这样速度会大幅度提升api

  接下来写一个简单的消费者类SimpleHLConsumer,代码以下:服务器

 1 package test;
 2 
 3 import java.util.HashMap;
 4 import java.util.List;
 5 import java.util.Map;
 6 import java.util.Properties;
 7 
 8 import kafka.consumer.Consumer;
 9 import kafka.consumer.ConsumerConfig;
10 import kafka.consumer.ConsumerIterator;
11 import kafka.consumer.KafkaStream;
12 import kafka.javaapi.consumer.ConsumerConnector;
13 
14 public class SimpleHLConsumer {
15         private final ConsumerConnector consumer;
16         private final String topic;
17 
18         public SimpleHLConsumer(String zookeeper, String groupId, String topic) {
19                 Properties props = new Properties();
20                 //定义链接zookeeper信息
21                 props.put("zookeeper.connect", zookeeper);
22                 //定义Consumer全部的groupID
23                 props.put("group.id", groupId);
24                 props.put("zookeeper.session.timeout.ms", "500");
25                 props.put("zookeeper.sync.time.ms", "250");
26                 props.put("auto.commit.interval.ms", "1000");
27                 consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
28                 this.topic = topic;
29         }
30 
31         public void testConsumer() {
32                 Map<String, Integer> topicCount = new HashMap<String, Integer>();
33                 //定义订阅topic数量
34                 topicCount.put(topic, new Integer(1));
35                 //返回的是全部topic的Map
36                 Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount);
37                 //取出咱们要须要的topic中的消息流
38                 List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic);
39                 for (final KafkaStream stream : streams) {
40                         ConsumerIterator<byte[], byte[]> consumerIte = stream.iterator();
41                         while (consumerIte.hasNext())
42                                 System.out.println("Message from Topic :" + new String(consumerIte.next().message()));
43                 }
44                 if (consumer != null)
45                         consumer.shutdown();
46         }
47 
48         public static void main(String[] args) {
49                 String topic = "mytopic";
50                 SimpleHLConsumer simpleHLConsumer = new SimpleHLConsumer("192.168.1.216:2181/kafka", "testgroup", topic);
51                 simpleHLConsumer.testConsumer();
52         }
53 
54 }

  消费者代码主要逻辑就是对生产者发送过来的数据作简单处理和输出,注意这里的地址是zookeeper的地址而且包括节点/kafka,topic名称要一致网络

  上面两个类已经能够实现消息的生产和消费了,可是如今服务器须要作必定的配置才能够,不然会抛出异常,就是在以前配置的server.properties基础之上进行修改,进入kafka安装目录下,使用命令 vim config/server.properties 打开配置文件,找到host.name这个配置,首先去掉前面的#注释,而后把默认的localhost改为IP地址192.168.1.216,由于eclipse远程运行代码时读取到localhost再执行时就是提交到本地了,因此会抛出异常,固然把代码打成jar包在服务器运行就不会出现这样的问题了,这里要注意:session

  

  修改以后保存并退出,而后确保zookeeper的正常运行eclipse

  若是以前kafka正在运行,那么就执行 bin/kafka-server-stop.sh  中止kafka服务,而后再执行maven

   nohup bin/kafka-server-start.sh config/server.properties >> /dev/null & 启动服务,若是原来就是中止的,那么直接启动便可

  启动以后先运行启动消费者,消费者处于运行等待

  

  而后启动生产者发送消息,生产者发送完成当即关闭,消费者消费输出以下:

  

  到这里,就完成了kafka从生产到消费简单示例的开发,消息队列能够跑通了

相关文章
相关标签/搜索