以前写过一篇关于Kafka消息的发布-订阅,只不过是基于一台服务器,不够全面,下面我要说下Kafka集群环境的搭建和消息的发布-订阅,但愿你们喜欢。下面的集群搭建是基于单机部署的环境,因此你们先参照单机部署完成基本的环境配置吧:CentOs7 Kafka单机消息的发布-订阅java
1、环境:虚拟机CentOs7系统,完整的环境,请确认已安装JDK,Zookeeper(这里可参考以前的Zookeeper集群搭建:http://www.javashuo.com/article/p-climrngx-cm.html),可经过克隆已配置好的虚拟机环境,这次仍然沿用以前使用过的服务器,因此,使用的节点为3个。node
2、环境配置(Kafka的解压不说了)bootstrap
(1)进入config目录,修改server.properties文件内容以下:服务器
############################# Server Basics ############################# # The id of the broker. This must be set to a unique integer for each broker. broker.id=0 ############################# Socket Server Settings ############################# # The port the socket server listens on port=9092 # Hostname the broker will bind to. If not set, the server will bind to all interfaces host.name=slave01 //中间省略,默认配置便可 ############################# Zookeeper ############################# # Zookeeper connection string (see zookeeper docs for details). # This is a comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". # You can also append an optional chroot string to the urls to specify the # root directory for all kafka znodes. zookeeper.connect=slave01:2181,slave02:2181,slave03:2181 # Timeout in ms for connecting to zookeeper zookeeper.connection.timeout.ms=6000
网上的版本不尽相同,但异曲同工,主要修改的点只有几个:broker.id,zookeeper.connect,注意:broker.id和Zookeeper集群配置时myid文件中的值保持一致。同时,还要修改一处地方(这里可能被大多数人忽略了,致使以后启动报错),手动修改meta.properties文件,在配置的log.dir目录下,若不修改此处会报下面的错误:app
FATAL Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer) kafka.common.InconsistentBrokerIdException: Configured brokerId 2 doesn't match stored brokerId 1 in meta.properties at kafka.server.KafkaServer.getBrokerId(KafkaServer.scala:630) at kafka.server.KafkaServer.startup(KafkaServer.scala:175) at io.confluent.support.metrics.SupportedServerStartable.startup(SupportedServerStartable.java:99) at io.confluent.support.metrics.SupportedKafka.main(SupportedKafka.java:45)
总结一下,修改broker.id须要注意 :socket
server.prorperties文件;ide
meta.properties文件;oop
(2)拷贝修改配置测试
能够经过scp拷贝的方式将kafka安装目录放置到另外两个节点上,注意须要修改对应的值:broker.id和host.name,其余一致。url
(3)启动Kafka集群
启动以前请确保关闭防火墙(详见以前的集群配置)。
因为咱们使用的是外部的Zookeeper,而不是自身携带的Zookeeper,因此,先启动Zookeeper集群,成功后在三个节点上,均执行下面的命令:
./bin/kafka-server-start.sh config/server.properties
如无心外,都可启动成功。
(4)建立topic
在slave01节点上,执行如下命令建立一个topic:
bin/kafka-topics.sh --create --topic kafkatopictest --replication-factor 3 --partitions 2 --zookeeper slave01:2181
建立成功显示:
Created topic "kafkatopictest"
(5)发送消息至kafka
在slave02节点上执行如下命令向kafka发送消息:
[hadoop@slave02 bin]$ ./kafka-console-producer.sh --broker-list slave02:9092 --sync --topic kafkatopictest >hello world, 下班了,everyone!
(6)接收kafka发送来的消息
在slave03节点上执行如下命令接收kafka发送消息:
[hadoop@slave03 bin]$ ./kafka-console-consumer.sh --zookeeper slave01:2181 --topic kafkatopictest --from-beginning Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper]. hello world下班了,everyone!
(7)查看topic列表
如在slave01节点上想要查看已经建立的topic,可经过如下命令:
[hadoop@slave01 bin]$ ./kafka-topics.sh --zookeeper slave01:2181 --list kafkatopictest test
好了,到此Kafka集群搭建和测试完成了。