不少人都认为Kafka是一个消息队列,实际上并不彻底对,在官网的标题中使用了一句话来描述Kafka:html
A distributed streaming platform(一个分布式的流平台)java
因此在本质上来说,Kafka并非一个消息队列,它其实是用来作流处理的一个平台。那么什么是分布式的流平台呢?这就须要简单说明下Kafka诞生的背景:apache
Kafka早期设计的目的是用做于 LinkedIn 的活动流(Activity Stream)和运营数据处理管道(Pipeline)的一个分布式消息发布和订阅系统,起初基于Scala编写(如今是Scala + Java),以后成为 Apache 基金会的一个顶级项目。bootstrap
活动流数据是全部的网站对用户的使用状况作分析的时候要用到的最常规的部分,活动数据包括页面的访问量(PV)、被查看内容方面的信息以及搜索内容。这种数据一般的处理方式是先把各类活动以日志的形式写入某种文件,而后周期性的对这些文件进行统计分析。运营数据指的是服务器的性能数据(CPU、IO 使用率、请求时间、服务日志等)。vim
实际上在这种大数据的流处理里面,Kafka常常会对接Spark、Flink等实时流计算引擎,这些场景要求流处理平台具备高性能、高吞吐量、低延迟等特色,而 Kafka 具备很好的吞吐量、内置分区、冗余及容错性的优势(Kafka 每秒能够处理几十万消息),所以让 Kafka 成为了一个很好的大规模消息处理应用的解决方案。bash
因为 Kafka 提供了相似 JMS 的特性,因此不少人对它的第一印象就是消息队列。可是在设计和实现上是彻底不一样的,并且它也不是 JMS 规范的实现,所以咱们须要纠正对Kafka的错误认知。服务器
接下来演示一下Kafka的安装与配置。Kafka是基于Zookeeper来实现分布式协调的,因此在安装Kafka以前须要先安装Zookeeper。Zookeeper和Kafka都依赖于JDK,我这里已经事先安装好了JDK:tcp
[root@txy-server2 ~]# java --version java 11.0.5 2019-10-15 LTS Java(TM) SE Runtime Environment 18.9 (build 11.0.5+10-LTS) Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.5+10-LTS, mixed mode) [root@txy-server2 ~]#
准备好JDK后,到Zookeeper的官网下载地址,复制下载连接:分布式
而后到Linux中使用wget
命令进行下载,以下:ide
[root@txy-server2 ~]# cd /usr/local/src [root@txy-server2 /usr/local/src]# wget https://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.6.1/apache-zookeeper-3.6.1-bin.tar.gz
解压下载好的压缩包,并将解压后的目录移动和重命名:
[root@txy-server2 /usr/local/src]# tar -zxvf apache-zookeeper-3.6.1-bin.tar.gz [root@txy-server2 /usr/local/src]# mv apache-zookeeper-3.6.1-bin ../zookeeper
进入到Zookeeper的配置文件目录,将zoo_sample.cfg
这个示例配置文件拷贝一份并命名为zoo.cfg
,这是Zookeeper默认的配置文件名称:
[root@txy-server2 /usr/local/src]# cd ../zookeeper/conf/ [root@txy-server2 /usr/local/zookeeper/conf]# ls configuration.xsl log4j.properties zoo_sample.cfg [root@txy-server2 /usr/local/zookeeper/conf]# cp zoo_sample.cfg zoo.cfg
修改一下配置文件中的dataDir
配置项,指定一个磁盘空间较大的目录:
[root@txy-server2 /usr/local/zookeeper/conf]# vim zoo.cfg # 指定Zookeeper的数据存储目录,类比于MySQL的dataDir dataDir=/data/zookeeper [root@txy-server2 /usr/local/zookeeper/conf]# mkdir -p /data/zookeeper
接下来就能够进入bin
目录,使用启动脚原本启动Zookeeper了,以下示例:
[root@txy-server2 /usr/local/zookeeper/conf]# cd ../bin/ [root@txy-server2 /usr/local/zookeeper/bin]# ./zkServer.sh start ZooKeeper JMX enabled by default Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg Starting zookeeper ... STARTED [root@txy-server2 /usr/local/zookeeper/bin]#
启动完成后,能够经过查看是否正常监听端口号来判断是否启动成功,以下则是启动成功了:
[root@txy-server2 ~]# netstat -lntp |grep 2181 tcp6 0 0 :::2181 :::* LISTEN 7825/java [root@txy-server2 ~]#
安装完Zookeeper后,接下来就能够安装Kafka了,一样的套路首先去Kafka的官网下载地址,复制下载连接:
而后到Linux中使用wget
命令进行下载,以下:
[root@txy-server2 ~]# cd /usr/local/src [root@txy-server2 /usr/local/src]# wget https://mirror.bit.edu.cn/apache/kafka/2.5.0/kafka_2.13-2.5.0.tgz
解压下载好的压缩包,并将解压后的目录移动和重命名:
[root@txy-server2 /usr/local/src]# tar -xvf kafka_2.13-2.5.0.tgz [root@txy-server2 /usr/local/src]# mv kafka_2.13-2.5.0 ../kafka
进入Kafka的配置文件目录,修改配置文件:
[root@txy-server2 /usr/local/src]# cd ../kafka/config/ [root@txy-server2 /usr/local/kafka/config]# vim server.properties # 指定监听的地址及端口号,该配置项是指定内网ip listeners=PLAINTEXT://127.0.0.1:9092 # 若是须要开放外网访问,则在该配置项指定外网ip advertised.listeners=PLAINTEXT://127.0.0.1:9092 # 指定kafka日志文件的存储目录 log.dirs=/usr/local/kafka/kafka-logs # 指定zookeeper的链接地址,多个地址用逗号分隔 zookeeper.connect=localhost:2181 [root@txy-server2 /usr/local/kafka/config]# mkdir /usr/local/kafka/kafka-logs
在完成配置文件的修改后,为了方便使用Kafka的命令脚本,咱们能够将Kafka的bin
目录配置到环境变量中:
[root@txy-server2 ~]# vim /etc/profile export KAFKA_HOME=/usr/local/kafka export PATH=$PATH:$KAFKA_HOME/bin [root@txy-server2 ~]# source /etc/profile # 让配置生效
这样就可使用以下命令启动Kafka了:
[root@txy-server2 ~]# kafka-server-start.sh /usr/local/kafka/config/server.properties &
执行以上命令后,启动日志会输出到控制台,能够经过日志判断是否启动成功,也能够经过查看是否监听了9092
端口来判断是否启动成功:
[root@txy-server2 ~]# netstat -lntp |grep 9092 tcp6 0 0 172.21.0.10:9092 :::* LISTEN 31943/java [root@txy-server2 ~]#
咱们可使用以下命令建立一个Topic:
[root@txy-server2 ~]# kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test-topic Created topic test-topic. # 控制台输出这句表示建立完成 ... [root@txy-server2 ~]#
参数说明:
--create
:表示建立一个Topic--zookeeper
:指定zookeeper的ip及端口号--replication-factor
:指定复制因子--partitions
:指定partition的数量--topic
:指定建立的topic的名称查看已经建立的Topic信息:
[root@txy-server2 ~]# kafka-topics.sh --list --zookeeper localhost:2181 test-topic [root@txy-server2 ~]#
向test-topic
生产一些消息:
[root@txy-server2 ~]# kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test-topic >helloworld >第二条消息 >第三条消息
从test-topic
中消费消息:
[root@txy-server2 ~]# kafka-console-consumer.sh --bootstrap-server 127.0.0.1:2181 --topic test-topic --from-beginning helloworld 第二条消息 消息 第四条消息 第五条消息
最后,中止Kafka的命令以下:
[root@txy-server2 ~]# kafka-server-stop.sh