前言:html
上周末本身学习了一下Kafka,参考网上的文章,学习过程当中仍是比较顺利的,遇到的一些问题最终也都解决了,如今将学习的过程记录与此,供之后本身查阅,若是能帮助到其余人,天然是更好的。java
===============================================================长长的分割线====================================================================apache
正文:服务器
关于Kafka的理论介绍,网上能够搜到到不少的资料,你们能够自行搜索,我这里就不在重复赘述。app
本文中主要涉及三块内容: 第一,就是搭建Zookeeper环境;第二,搭建Kafka环境,并学习使用基本命令发送接收消息;第三,使用Java API完成操做,以便初步了解在实际项目中的使用方式。less
闲话少说,言归正传,本次的目的是利用VMware搭建一个属于本身的ZooKeeper和Kafka集群。本次咱们选择的是VMware10,具体的安装步骤你们能够到网上搜索,资源不少。maven
第一步,肯定目标:分布式
ZooKeeperOne 192.168.224.170 CentOS学习
ZooKeeperTwo 192.168.224.171 CentOSui
ZooKeeperThree 192.168.224.172 CentOS
KafkaOne 192.168.224.180 CentOS
KafkaTwo 192.168.224.181 CentOS
咱们安装的ZooKeeper是3.4.6版本,能够从这里下载zookeeper-3.4.6; Kafka安装的是0.8.1版本,能够从这里下载kafka_2.10-0.8.1.tgz; JDK安装的版本是1.7版本。
另: 我在学习的时候,搭建了两台Kafka服务器,正式环境中咱们最好是搭建2n+1台,此处仅做为学些之用,暂不计较。
第二步,搭建Zookeeper集群:
此处你们能够参照我以前写的一篇文章 ZooKeeper1 利用虚拟机搭建本身的ZooKeeper集群 ,我在搭建Kafka的环境的时候就是使用的以前搭建好的Zookeeper集群。
第三步,搭建Kafka集群:
(1). 将第一步中下载的 kafka_2.10-0.8.1.tgz 解压缩后,进入config目录,会看到以下图所示的一些配置文件,咱们准备编辑server.properties文件。
(2). 打开 server.properties 文件,须要编辑的属性以下所示:
1 broker.id=0 2 port=9092 3 host.name=192.168.224.180 4 5 log.dirs=/opt/kafka0.8.1/kafka-logs 6 7 zookeeper.connect=192.168.224.170:2181,192.168.224.171:2181,192.168.224.172:2181
注意:
a. broker.id: 每一个kafka对应一个惟一的id,自行分配便可
b. port: 默认的端口号是9092,使用默认端口便可
c. host.name: 配置的是当前机器的ip地址
d. log.dirs: 日志目录,此处自定义一个目录路径便可
e. zookeeper.connect: 将咱们在第二步搭建的Zookeeper集群的配置所有写上
(3). 上边的配置完毕后,咱们须要执行命令 vi /etc/hosts,将相关服务器的host配置以下图,若是没有执行此步,后边咱们在执行一些命令的时候,会报没法识别主机的错误。
(4). 通过上述操做,咱们已经完成了对Kafka的配置,很简单吧?!可是若是咱们执行 bin/kafka-server-start.sh config/server.properties & 这个启动命令,可能咱们会遇到以下两个问题:
a. 咱们在启动的报 Unrecognized VM option '+UseCompressedOops'.Could not create the Java virtual machine. 这个错误。
解决方式:
查看 bin/kafka-run-class.sh
找到下面这段代码,去掉-XX:+UseCompressedOops
1 if [ -z "$KAFKA_JVM_PERFORMANCE_OPTS" ]; then 2 KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseCompressedOops -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled -XX:+CMSScavengeBeforeRemark -XX:+DisableExplicitGC -Djava.awt.headless=true" 3 fi
b. 解决了第一个问题,咱们还有可能在启动的时候遇到 java.lang.NoClassDefFoundError: org/slf4j/impl/StaticLoggerBinder 这个错误。
解决方式:
从网上的下载 slf4j-nop-1.6.0.jar 这个jar包,而后放到kafka安装目录下的libs目录中便可。注意,基于我目前的kafka版本,我最开始从网上下载的slf4j-nop-1.5.0.jar 这个jar包,可是启动的时候依然会报错,因此必定要注意版本号哦~
(5). 如今咱们执行 bin/kafka-server-start.sh config/server.properties & 这个启动命令,应该就能够正常的启动Kafka了。命令最后的 & 符号是为了让启动程序在后台执行。若是不加这个 & 符号,当执行完启动后,咱们一般会使用 ctrl + c 退出当前控制台,kafka此时会自动执行shutdown,因此此处最好加上 & 符号。
第三步,使用基本命令建立消息主题,发送和接收主题消息:
(1). 建立、查看消息主题
1 #链接zookeeper, 建立一个名为myfirsttopic的topic 2 bin/kafka-topics.sh --create --zookeeper 192.168.224.170:2181 --replication-factor 2 --partitions 1 --topic myfirsttopic 3 4 # 查看此topic的属性 5 bin/kafka-topics.sh --describe --zookeeper 192.168.224.170:2181 --topic myfirsttopic 6 7 # 查看已经建立的topic列表 8 bin/kafka-topics.sh --list --zookeeper 192.168.224.170:2181
上述命令执行完毕后,截图以下:
(2). 建立一个消息的生产者:
1 #启动生产者,发送消息 2 bin/kafka-console-producer.sh --broker-list 192.168.224.180:9092 --topic myfirsttopic 3 4 #启动消费者,接收消息 5 bin/kafka-console-consumer.sh --zookeeper 192.168.224.170:2181 --from-beginning --topic myfirsttopic
上述命令执行完毕后,截图以下:
(3). 按照(1)、(2)这两步,你应该能够利用Kafka感觉到了分布式消息系统。这里须要着重的再说一下我在这个过程当中发现的一个问题: 你们能够看下上图中的consumer的命令,我选择了zookeeper的其中一台192.168.224.170:2181接收消息是能够正常接收的!不要忘了,我是三台zookeeper的,因此我又尝试了向192.168.224.171:2181和192.168.224.172:2181接收myfirsttopic这个主题的消息。正常状况下,三台访问的结果应该都是能够正常的接收消息,可是当时个人状况在访问了192.168.224.171:2181这台时会报 org.apache.zookeeper.clientcnxn 这个错误!!!
我当时多试了两遍,发现个人三台zookeeper中,谁是leader(zkServer.sh status命令),concumer链接的时候就会报上面的那个异常。后来定位到了zookeeper的zoo.cfg配置文件中的maxClientCnxns属性,即客户端最大链接数,我当时使用的是默认配置是2。后来我把这个属性的值调大一些,consumer链接zookeeper leader时,就不会报这个错误了。若是你选择将这个属性注释掉(从网上查询到注释掉该属性默认值是10),也不会报这个错误了。其实网上的不少文章也只是说了此属性能够尽可能设置的大一些,没有解释其余的。
但我后来仍是仔细想了想,当我把maxClientCnxns这个属性设置为2时,若是两台kafka启动时,每一个kafka和zookeeper的节点之间创建了一个客户端链接,那么此时zookeeper的每一个节点的客户端链接数就已经达到了最大链接数2,那么我建立consumer的时候,应该是三台zookeeper链接都有问题,而不是只有leader会有问题。因此,此处须要各位有看法的再帮忙解释一下!!!
第四步,使用Java API 操做Kafka:
其实Java API提供的功能基本也是基于上边的客户端命令来实现的,万变不离其宗,我将我整理的网上的例子贴到下面,你们能够在本地Java工程中执行一下,便可了解调用方法。
(1). 个人maven工程中pom.xml的配置
1 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 2 <modelVersion>4.0.0</modelVersion> 3 <groupId>com.ismurf.study</groupId> 4 <artifactId>com.ismurf.study.kafka</artifactId> 5 <version>0.0.1-SNAPSHOT</version> 6 <name>Kafka_Project_0001</name> 7 <packaging>war</packaging> 8 9 <dependencies> 10 <dependency> 11 <groupId>org.apache.kafka</groupId> 12 <artifactId>kafka_2.10</artifactId> 13 <version>0.8.1.1</version> 14 </dependency> 15 </dependencies> 16 17 <build> 18 <plugins> 19 <plugin> 20 <groupId>org.apache.maven.plugins</groupId> 21 <artifactId>maven-war-plugin</artifactId> 22 <version>2.1.1</version> 23 <configuration> 24 <outputFileNameMapping>@{artifactId}@.@{extension}@</outputFileNameMapping> 25 </configuration> 26 </plugin> 27 28 <!-- Ensures we are compiling at 1.6 level --> 29 <plugin> 30 <groupId>org.apache.maven.plugins</groupId> 31 <artifactId>maven-compiler-plugin</artifactId> 32 <configuration> 33 <source>1.6</source> 34 <target>1.6</target> 35 </configuration> 36 </plugin> 37 38 <plugin> 39 <groupId>org.apache.maven.plugins</groupId> 40 <artifactId>maven-surefire-plugin</artifactId> 41 <configuration> 42 <skipTests>true</skipTests> 43 </configuration> 44 </plugin> 45 </plugins> 46 </build> 47 48 </project>
(2). 实例代码: 你们能够参考这片文章的 http://blog.csdn.net/honglei915/article/details/37563647 中的代码,粘贴到工程后便可使用,上述文章中的代码整理后目录截图以下: