这段时间一直在学习大数据相关的知识,从Spark,Spark Streaming,Scala到Kafka等等,涉及到的知识面不少,整体看下来,以为大数据仍是很好玩的,在如今及之后的方方面面都很适用。下面说下Kafka消息的发布-订阅。html
(一)基本环境准备java
本人在虚拟机下安装了CentOs7,这里自行略过,必备的环境jdk,zookeeper,kafkalinux
(二)环境搭建(若不是root登陆,则如下操做需加上sudo)bootstrap
一、JDK安装及环境配置(最好jdk8以上,此处略过)vim
二、Zookeeper安装及环境配置ide
(1)解压及移动至其余目录oop
#解压Zookeeper并重命名 sudo tar -zxvf zookeeper-3.3.6.tar.gz sudo mv zookeeper-3.3.6 zookeeper #将zookeeper移动到/usr/local/目录下,按本身喜爱 sudo mv zookeeper /usr/local
(2)编辑Zookeeper的配置文件学习
# 复制一份zoo_sample.cfg文件并更名为zoo.cfg sudo cp /opt/zookeeper/zoo_sample.cfg zoo.cfg # 编辑zoo.cfg 文件 sudo vim /opt/zookeeper/zoo.cfg #主要修改dataDir和server.1=127.0.0.1:2888:3888这2处 # the directory where the snapshot is stored. dataDir=/usr/local/zookeeper/data # the port at which the clients will connect clientPort=2181 server.1=127.0.0.1:2888:3888
上面配置中的参数可参考:https://www.linuxidc.com/Linux/2017-06/144950.htm大数据
(3)配置Zookeeper环境变量spa
sudo vim /etc/profile #修改以下 JAVA_HOME=/usr/java/jdk1.8.0_161 JRE_HOME=/usr/java/jdk1.8.0_161/jre SCALA_HOME=/usr/local/scala ZOOKEEPER_HOME=/usr/local/zookeeper KAFKA_HOME=/usr/local/kafka PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin:$SCALA_HOME/bin:$ZOOKEEPER_HOME/bin:$KAFKA_HOME/bin CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib export JAVA_HOME JRE_HOME SCALA_HOME ZOOKEEPER_HOME KAFKA_HOME PATH CLASSPATH
注意,配置完成后必须执行:source /etc/profile,不然不生效。
(4)启动Zookeeper
#cd 到Zookeeper/bin目录下 ./zkServer.sh start
启动成功以下:
ZooKeeper JMX enabled by default Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg Starting zookeeper ... STARTED
关闭Zookeeper同上,只需修改部分:
#cd 到Zookeeper/bin目录下 ./zkServer.sh stop
三、Kafka安装及环境配置
(1)解压及移动至其余目录
# 解压及重命名为kafka sudo tar -zxvf kafka_2.12-1.0.0.tgz sudo mv kafka_2.12-1 kafka # 移动至/usr/local/目录下 sudo mv kafka /usr/local
(2)编辑Kafka的配置文件
#建立日志存放目录 cd /usr/local/kafka mkdir logs #修改配置文件/usr/local/kafka/config/server.properties sudo vim /usr/local/kafka/config/server.properties #主要修改下面几项内容以下: broker.id=0 delete.topic.enable=true listeners = PLAINTEXT://127.0.0.1:9092 log.dirs=/usr/local/kafka/logs/ zookeeper.connect=127.0.0.1:2181
上面配置中的参数可参考:https://www.cnblogs.com/wangb0402/p/6187503.html
(3)配置Kafka环境变量:详见上面的Zookeeper配置
(4)启动Kafka
# cd到kafka/bin目录下 ./kafka-server-start.sh /usr/local/kafka/config/server.properties
(三)Kafka的消息发布-订阅:打开四个终端
(1)建立一个Topic
# kafka/bin目录下 ./kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic test
显示以下:
[hadoop@bogon bin]$ ./kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic test Created topic "test".
若已存在该test的Topic,能够经过delete删除:
#利用命令删除须要删除的topic(配置中已经设置为true) ./kafka-topics.sh --delete --zookeeper localhost:2181 --topic test
(2)Producer向Topic发送消息
./kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test
显示以下:
[hadoop@bogon bin]$ ./kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test >producer send message >hello kafka >hello world >spark >heihei >send everything for people >
(3)Consumer读取Topic的消息
./kafka-console-consumer.sh --zookeeper 127.0.0.1:2181 --topic test --from-beginning
显示以下,注意开始的警告信息不影响使用,需稍等片刻:
[hadoop@bogon bin]$ ./kafka-console-consumer.sh --zookeeper 127.0.0.1:2181 -topic test --from-beginning Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper]. producer send message hello kafka hello world spark heihei send everything for people