Kafka是一种高吞吐量的分布式发布订阅的消息队列系统,Kafka对消息进行保存时是经过tipic进行分组的。今天咱们仅实现Kafka集群的配置。理论的抽空在聊
了解kafka的都知道。kafka是经过Zookeeper实现分布操做的。不论是broker,consumer,仍是provide信息都是存储在Zookeeper中的。当broker挂掉都是Zookeeper来进行从新分配选择的。因此实现kafka集群前咱们得先实现Zookeeper的集群配置。java
首先咱们从官网上下载Zookeeper到本地。我这里下载的是Zookeeper-3.4.6.tar.gz版本的。读者能够根据本身状况下载。下载好以后进行文件解压。而后找到conf文件中的zoo_sample.cfg文件。该文件是Zookeeper官网给咱们提供的一套样板。咱们赋值该文件到同级下并更名为zoo.cfg.以下图shell
而后咱们来看看这个配置文件里面都有些啥apache
tickTime=2000
initLimit=10
syncLimit=5
dataDir
dataLogDir
clientPort=2181
server.1=192.168.1.130:28881:38881
server.2=192.168.1.130:28882:38882
server.3=192.168.1.130:28883:38883
知道配置文件里的意思应该就知道如何修改了吧windows
dataDir+dataLogDir+clientPort
B: 标识Zookeeper集群中某一个服务的ip或者域名 192.168.1.130api
C:表示server.num这个服务于集群中leader进行信息交流的端口。在kafka中咱们leader和follower须要进行数据备份。具体服务就是经过这个地方制定的端口进行通讯的。ruby
D:表示万一leader宕机了,咱们就经过这个端口来进行再follower中选举新的leader。bash
网上的不少教程也就介绍到这里。稍微好点就提了一下建立myid文件的事,我当时就纠结在这里。由于我根本不知道穿件的myid的类型。我就随便建立txt文件。结果是错的。这里咱们建立myid我有两种方式。还有myid里面的内容就是咱们对应的配置文件中server.num中的num。服务器
第一种就是咱们经过cmd窗口到咱们要建立myid的文件夹下
执行以下命令markdown
echo 1 > myid
第二种是咱们先建立TXT文件将对应的内容写入。而后txt后缀删掉就能够了。网络
顺便提一下myid应该放在咱们conf/zoo.cfg文件中指定的dataDir 的对应的文件路径下。
所谓的集群就是讲上面的Zookeeper复制成多个,将上面提到的几个重要的属性更改掉就好了。
若是你到这一步说明你离成功已经不远了。下面咱们只须要开启服务就好了。开启服务在咱们解压的bin目录下。
上面咱们配置的Zookeeper在开启第一个时候回报错。为何呢。缘由就是咱们开启了一个服务,。可是咱们的配置文件配置的是集群的信息。这个时候就回去寻找其余服务。可是这个时候其余的服务尚未开启呢。因此这个错误是正常。等咱们集群中的全部的服务都开启了就不会报错。这里你们不要被吓到。
除此以外,还有一点就是Zookeeper的安装目录(解压目录)是绝对不能包含汉字的。我上面的截图有汉字那是我计算机上设置的。实际的路径是没有汉字的。不要被上面的图片诱导。
当全部的服务都开启了,咱们如何查看咱们的服务是否开启成功呢。这很简单。咱们从新打开一个新的cmd窗口。直接执行jps就能够看到咱们的服务了。QuorumPeerMain就是咱们的服务主类
这里个人Kafka版本选择的是0.8.1.1。建议单价不要选择过高的版本。刚出的版本可能有未知的bug。
一样这里的集群就是讲Kafka复制多个。这里我选择其中一个进行讲解。其余的都是同样的主要就是讲端口改掉就好了。
将官网下载的Kafka解压更名为kafka1(其余的更名数字递增就行。或者自定义别的名字)。找到config/server.properties文件。
一样的先来了解里面的参数含义吧
broker.id=1
port=9091
host.name=192.168.1.130
advertised.host.name=192.168.1.130
num.network.threads=2
num.io.threads=8
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600
log.dirs=\logs
num.partitions=2
log.retention.hours=168
log.segment.bytes=536870912
log.retention.check.interval.ms=60000
log.cleaner.enable=false
zookeeper.connect=192.168.1.130:num1,192.168.1.130:num2,192.168.1.130:num3
zookeeper.connection.timeout.ms=1000000
broker.id + port
一样的咱们观察bin目录中咱们会发现Kafka针对Linux和windows提供了不一样的组件。windows的组件放在了windows的文件夹下了。可是我在实际操做中没法使用里面的命令。报一些错误。这里个人解决办法是将windows里的bat所有复制到外面。就是复制到bin目录下。
上图中指出来的bat本来是在windows文件中。拷贝到bin目录以后咱们须要修改一下kafka-run-class.bat文件。由于里面写的相对路径和引入的jar会致使出错。因此咱们将里面的这段代码
set ivyPath=%USERPROFILE%\.ivy2\cache
set snappy=%ivyPath%/org.xerial.snappy/snappy-java/bundles/snappy-java-1.0.5.jar
call :concat %snappy%
set library=%ivyPath%/org.scala-lang/scala-library/jars/scala-library-2.8.0.jar
call :concat %library%
set compiler=%ivyPath%/org.scala-lang/scala-compiler/jars/scala-compiler-2.8.0.jar
call :concat %compiler%
set log4j=%ivyPath%/log4j/log4j/jars/log4j-1.2.15.jar
call :concat %log4j%
set slf=%ivyPath%/org.slf4j/slf4j-api/jars/slf4j-api-1.6.4.jar
call :concat %slf%
set zookeeper=%ivyPath%/org.apache.zookeeper/zookeeper/jars/zookeeper-3.3.4.jar
call :concat %zookeeper%
set jopt=%ivyPath%/net.sf.jopt-simple/jopt-simple/jars/jopt-simple-3.2.jar
call :concat %jopt%
for %%i in (%BASE_DIR%\core\target\scala-2.8.0\*.jar) do (
call :concat %%i ) for %%i in (%BASE_DIR%\core\lib\*.jar) do (
call :concat %%i ) for %%i in (%BASE_DIR%\perf\target\scala-2.8.0/kafka*.jar) do (
call :concat %%i )
for %%i in (%BASE_DIR%\libs\*.jar) do (
call :concat %%i )
首先第一行提示 set JMX_PORT to default value 9999 这个错误是由于我没有设置这个值。这却是小事。可是后面报说找不到或没法加载主类kafka.Kafka这就让我费解。在这里我也是卡了一天了。后来在网上找到了一个方法。我不知道这是否是Kafka的bug。反正用这个方法我是解决了这个错误了。
set COMMAND= %JAVA% %KAFKA_OPTS% %KAFKA_JMX_OPTS% -cp %CLASSPATH% %*
set COMMAND= %JAVA% %KAFKA_OPTS% %KAFKA_JMX_OPTS% -cp "%CLASSPATH%" %*
到这一步咱们离kafka的成功又不远了。咱们新开cmd窗口cd到kafka的bin目录中。
可是在执行开启以前咱们须要先执行
Set JMX_PORT=19091(每一个服务数字不能同样)
kafka-server-start.bat ..\config\server.properties
kafka-run-class.bat kafka.admin.TopicCommand %*
建立Topic
kafka-topics.bat --create --zookeeper 192.168.1.130:2181 --replication-factor 2 --partitions 3 --topic my-replicated-topic
查看Topic
kafka-topics.bat --describe --zookeeper 192.168.1.130:2181 --topic my-replicated-topic
生产topic消息
kafka-console-producer.bat --broker-list 192.168.1.130:9093 --topic my-replicated-topic
消费topic消息
kafka-console-consumer.bat --zookeeper 192.168.1.130:2181 --from-beginning --topic my-replicated-topic
资源因为大小限制暂时没法上传!后续再上传