Kafka集群配置---Windows版

Kafka是一种高吞吐量的分布式发布订阅的消息队列系统,Kafka对消息进行保存时是经过tipic进行分组的。今天咱们仅实现Kafka集群的配置。理论的抽空在聊

前言

  • 最近研究kafka,发现网上不少关于kafka的介绍都是基于Linux操做系统的。虽然这些服务最后都是配置Linux上的。可是咱们平时使用的大多都是Windows系统。因此研究非常吃力。通过借鉴不一样的网络文章终于在Windows上实现了kafka的配置。在这里如今的配置是基于Zookeeper 3.4.6 版本的。若是读者在操做但愿能和个人版本保持一致。

zookeeper

  • 了解kafka的都知道。kafka是经过Zookeeper实现分布操做的。不论是broker,consumer,仍是provide信息都是存储在Zookeeper中的。当broker挂掉都是Zookeeper来进行从新分配选择的。因此实现kafka集群前咱们得先实现Zookeeper的集群配置。java

  • 首先咱们从官网上下载Zookeeper到本地。我这里下载的是Zookeeper-3.4.6.tar.gz版本的。读者能够根据本身状况下载。下载好以后进行文件解压。而后找到conf文件中的zoo_sample.cfg文件。该文件是Zookeeper官网给咱们提供的一套样板。咱们赋值该文件到同级下并更名为zoo.cfg.以下图shell

这里写图片描述

修改配置文件

而后咱们来看看这个配置文件里面都有些啥apache

tickTime=2000
  • 服务器之间或客户端与服务端之间维持心跳的时间。就是每隔tickTime就会发送一次心跳。单位毫秒
initLimit=10
  • 这个是配置Zookeeper接收客户端初始化链接最长能忍受initLimit心跳时间间隔。
syncLimit=5
  • Leader与follow之间发送消息和应答的时间 总时间=syncLimit*tickTime
dataDir
  • zookeeper数据保持路径 默认将log日志也保存在dataDir
dataLogDir
  • zookeeper log日志保存地址 不设置默认是dataDir
clientPort=2181
  • 客户端链接的端口
server.1=192.168.1.130:28881:38881
server.2=192.168.1.130:28882:38882
server.3=192.168.1.130:28883:38883
  • 由于个人集群都是在同一台电脑上配置的,因此这里端口不能同样

知道配置文件里的意思应该就知道如何修改了吧windows

  • 对于新手咱们只须要该如下地方呢。
dataDir+dataLogDir+clientPort
  • 可是下面的server是Zookeeper配置里须要重点讲解的部分
    上面的格式咱们能够简单的总结为 server.num=B:C:D。
    num:是正整数表明的服务的惟一标识。这个要和后面说道的myid文件保持一致。

B: 标识Zookeeper集群中某一个服务的ip或者域名 192.168.1.130api

C:表示server.num这个服务于集群中leader进行信息交流的端口。在kafka中咱们leader和follower须要进行数据备份。具体服务就是经过这个地方制定的端口进行通讯的。ruby

D:表示万一leader宕机了,咱们就经过这个端口来进行再follower中选举新的leader。bash

大坑预防

  • 网上的不少教程也就介绍到这里。稍微好点就提了一下建立myid文件的事,我当时就纠结在这里。由于我根本不知道穿件的myid的类型。我就随便建立txt文件。结果是错的。这里咱们建立myid我有两种方式。还有myid里面的内容就是咱们对应的配置文件中server.num中的num。服务器

  • 第一种就是咱们经过cmd窗口到咱们要建立myid的文件夹下
    执行以下命令markdown

echo 1 > myid

这里写图片描述

  • 第二种是咱们先建立TXT文件将对应的内容写入。而后txt后缀删掉就能够了。网络

  • 顺便提一下myid应该放在咱们conf/zoo.cfg文件中指定的dataDir 的对应的文件路径下。

服务开启

  • 所谓的集群就是讲上面的Zookeeper复制成多个,将上面提到的几个重要的属性更改掉就好了。

  • 若是你到这一步说明你离成功已经不远了。下面咱们只须要开启服务就好了。开启服务在咱们解压的bin目录下。

这里写图片描述

  • 这里咱们得有些常识,已sh结尾的是Linux 系统的shell文件。在windows上没有装插件是没法使用的。咱们windows认识的就是bat文件。就是上面的cmd结尾才是咱们能够用的功能。可是咱们还须要进行一下修改。其实这里已经能够了。咱们到cmd窗口中经过该命令去执行咱们zoo.cfg文件。可是为了方便咱们这里讲zoo.cfg配置进咱们的zkServer.cmd文件中

这里写图片描述

  • 好了。配置完成。咱们只须要每次点击zkServer.cmd就开启了Zookeeper中的服务了。

友情提醒

  • 上面咱们配置的Zookeeper在开启第一个时候回报错。为何呢。缘由就是咱们开启了一个服务,。可是咱们的配置文件配置的是集群的信息。这个时候就回去寻找其余服务。可是这个时候其余的服务尚未开启呢。因此这个错误是正常。等咱们集群中的全部的服务都开启了就不会报错。这里你们不要被吓到。

  • 除此以外,还有一点就是Zookeeper的安装目录(解压目录)是绝对不能包含汉字的。我上面的截图有汉字那是我计算机上设置的。实际的路径是没有汉字的。不要被上面的图片诱导。

  • 当全部的服务都开启了,咱们如何查看咱们的服务是否开启成功呢。这很简单。咱们从新打开一个新的cmd窗口。直接执行jps就能够看到咱们的服务了。QuorumPeerMain就是咱们的服务主类

这里写图片描述

Kafka集群配置

  • 上面咱们就完成了Zookeeper的集群的配置。实际上Kafka中就自带有Zookeeper的服务。可是为了数据的高可用性。咱们最好选择本身搭建Zookeeper集群。这也是官网上的建议。
  • 这里个人Kafka版本选择的是0.8.1.1。建议单价不要选择过高的版本。刚出的版本可能有未知的bug。

  • 一样这里的集群就是讲Kafka复制多个。这里我选择其中一个进行讲解。其余的都是同样的主要就是讲端口改掉就好了。

  • 将官网下载的Kafka解压更名为kafka1(其余的更名数字递增就行。或者自定义别的名字)。找到config/server.properties文件。

server.properties修改

一样的先来了解里面的参数含义吧

broker.id=1
  • 在kafka这个集群中的惟一标识,且只能是正整数
port=9091
  • 该服务监听的端口
host.name=192.168.1.130
  • broker 绑定的主机名称(IP) 若是不设置将绑定全部的接口。
advertised.host.name=192.168.1.130
  • broker服务将通知消费者和生产者 换言之,就是消费者和生产者就是经过这个主机(IP)来进行通讯的。若是没有设置就默认采用host.name。
num.network.threads=2
  • broker处理消息的最大线程数,通常状况是CPU的核数
num.io.threads=8
  • broker处理IO的线程数 通常是num.network.threads的两倍
socket.send.buffer.bytes=1048576
  • socket发送的缓冲区。socket调优参数SO_SNDBUFF
socket.receive.buffer.bytes=1048576
  • socket接收的缓冲区 socket的调优参数SO_RCVBUF
socket.request.max.bytes=104857600
  • socket请求的最大数量,防止serverOOM。
log.dirs=\logs
  • kafka数据的存放地址,多个地址的话用逗号隔开。多个目录分布在不一样的磁盘上能够提升读写性能
num.partitions=2
  • 每一个tipic的默认分区个数,在建立topic时能够从新制定
log.retention.hours=168
  • 数据文件的保留时间 log.retention.minutes也是一个道理。
log.segment.bytes=536870912
  • topic中的最大文件的大小 -1表示没有文件大小限制 log.segment.bytes 和log.retention.minutes 任意一个
    达到要求 都会删除该文件 在建立topic时能够从新制定。若没有.则选取该默认值
log.retention.check.interval.ms=60000
  • 文件大小检查的周期时间,是否处罚 log.cleanup.policy中设置的策略
log.cleaner.enable=false
  • 是否开启日志清理
zookeeper.connect=192.168.1.130:num1,192.168.1.130:num2,192.168.1.130:num3
  • 上面咱们的Zookeeper集群
zookeeper.connection.timeout.ms=1000000
  • 进群连接时间超时

  • 一样的咱们每次赋值kafka服务咱们只需该配置文件里的下面两个属性就好了。
broker.id  +  port

服务启动前的命令准备

  • 一样的咱们观察bin目录中咱们会发现Kafka针对Linux和windows提供了不一样的组件。windows的组件放在了windows的文件夹下了。可是我在实际操做中没法使用里面的命令。报一些错误。这里个人解决办法是将windows里的bat所有复制到外面。就是复制到bin目录下。
    这里写图片描述

  • 上图中指出来的bat本来是在windows文件中。拷贝到bin目录以后咱们须要修改一下kafka-run-class.bat文件。由于里面写的相对路径和引入的jar会致使出错。因此咱们将里面的这段代码

set ivyPath=%USERPROFILE%\.ivy2\cache

set snappy=%ivyPath%/org.xerial.snappy/snappy-java/bundles/snappy-java-1.0.5.jar
     call :concat %snappy%

set library=%ivyPath%/org.scala-lang/scala-library/jars/scala-library-2.8.0.jar
     call :concat %library%

set compiler=%ivyPath%/org.scala-lang/scala-compiler/jars/scala-compiler-2.8.0.jar
     call :concat %compiler%

set log4j=%ivyPath%/log4j/log4j/jars/log4j-1.2.15.jar
     call :concat %log4j%

set slf=%ivyPath%/org.slf4j/slf4j-api/jars/slf4j-api-1.6.4.jar
     call :concat %slf%

set zookeeper=%ivyPath%/org.apache.zookeeper/zookeeper/jars/zookeeper-3.3.4.jar
     call :concat %zookeeper%

set jopt=%ivyPath%/net.sf.jopt-simple/jopt-simple/jars/jopt-simple-3.2.jar
     call :concat %jopt%

for %%i in (%BASE_DIR%\core\target\scala-2.8.0\*.jar) do (
     call :concat %%i ) for %%i in (%BASE_DIR%\core\lib\*.jar) do (
     call :concat %%i ) for %%i in (%BASE_DIR%\perf\target\scala-2.8.0/kafka*.jar) do (
     call :concat %%i ) 
  • 替换成
for %%i in (%BASE_DIR%\libs\*.jar) do (
     call :concat %%i ) 
  • 咱们仔细观察原来的配置大概意思是引入一些jar包啥的。可是会出现有的时候咱们的文件根本没有那个jar。可是又引入了。会常常报错。因此咱们改为引入libs下的全部jar.有啥就引入啥。这样就不会报错的。

大坑预防

  • 到这里我本来天真的认为就已经完事了。可是谁知我按照网上的教程继续的时候就出现以下错误

这里写图片描述

  • 首先第一行提示 set JMX_PORT to default value 9999 这个错误是由于我没有设置这个值。这却是小事。可是后面报说找不到或没法加载主类kafka.Kafka这就让我费解。在这里我也是卡了一天了。后来在网上找到了一个方法。我不知道这是否是Kafka的bug。反正用这个方法我是解决了这个错误了。

    • 解决办法就是将kafka-run-class.bat文件中
set COMMAND= %JAVA% %KAFKA_OPTS% %KAFKA_JMX_OPTS% -cp %CLASSPATH% %*
  • 修改成
set COMMAND= %JAVA% %KAFKA_OPTS% %KAFKA_JMX_OPTS% -cp "%CLASSPATH%" %*
  • 对比咱们发现就是将classpath加上双引号。搞了半天就是系统变量路径没有找到的缘由。不过这个问题值得引发咱们的注意。咱们的kafka寄去你的搭建实在Java 的jdk基础是搭建的。因此前提咱们得将jdk等这些配置到环境变量中去。这里的配置网上搜去吧不少。

服务开启

  • 到这一步咱们离kafka的成功又不远了。咱们新开cmd窗口cd到kafka的bin目录中。

  • 可是在执行开启以前咱们须要先执行

Set JMX_PORT=19091(每一个服务数字不能同样)
  • 而后在执行
kafka-server-start.bat ..\config\server.properties

建立Topic批处理

  • 官网上是没有提供windows版本的topic处理程序的。咱们须要本身新建一个bat文件。这个bat文件的内容填写以下
kafka-run-class.bat  kafka.admin.TopicCommand  %*

这里写图片描述

消息处理

  • 有了这个批处理咱们就能够经过它实现topic的建立。生产者发送消息和消费者的接收消息

建立Topic

  • replication-factor:表示该topic须要在不一样的broker中保存
  • partitions : 对该top的分区数量
  • topic : 该top的名称。建议指定。不然采用默认
kafka-topics.bat --create --zookeeper 192.168.1.130:2181 --replication-factor 2 --partitions 3 --topic my-replicated-topic

查看Topic

kafka-topics.bat --describe --zookeeper 192.168.1.130:2181 --topic my-replicated-topic

生产topic消息

kafka-console-producer.bat --broker-list 192.168.1.130:9093 --topic my-replicated-topic

消费topic消息

kafka-console-consumer.bat --zookeeper 192.168.1.130:2181 --from-beginning --topic my-replicated-topic
  • 最后在接收发送消息是咱们须要从新建立新的cmd窗口。下面看看效果图。最终实现实时接收消息

这里写图片描述


资源因为大小限制暂时没法上传!后续再上传

相关文章
相关标签/搜索