kafka是一个高吞吐量的分布式消息队列,具备高性能、持久化、多副本备份、横向扩展能力,一般用于大数据及流处理平台。消息队列里都有生产者/消费者的概念,生产者往队列里写消息,而消费者则是从队列里获取消息。通常在架构设计中起到解耦、削峰、异步处理的做用。html
kafka对外使用topic的概念,生产者往topic里写消息,消费者则从topic里读消息。为了作到水平扩展,一个topic实际是由多个partition组成的,遇到瓶颈时,能够经过增长partition的数量来进行横向扩容。在单个parition内是保证消息有序。每新写一条消息,kafka就是在对应的文件append写,因此性能很是高。java
kafka的整体数据流是这样的:apache
大概用法就是,Producers往Brokers里面的指定Topic中写消息,Consumers从Brokers里面拉去指定Topic的消息,而后进行业务处理。bootstrap
图中有两个topic,topic 0有两个partition,topic 1有一个partition,三副本备份。能够看到consumer gourp 1中的consumer 2没有分到partition处理,这是有可能出现的。vim
kafka须要依赖zookeeper存储一些元信息,而kafka也自带了zookeeper。其中broker、topics、partitions的一些元信息用zookeeper来存储,监控和路由啥的也都会用到zookeeper。bash
kafka名词解释:网络
在kafka中每一个消息(也叫做record记录,也被称为消息)一般是由一个key,一个value和时间戳构成。架构
kafka有四个核心的API:app
kafka就先介绍到这,网络上有不少相关的理论文章,因此这里不过多赘述了,也能够直接查看官方文档。官方文档地址以下:异步
本小节咱们来在CentOS7上安装Kafka,因为kafka是由Scala和Java语言编写的,因此前提须要准备好java运行环境,我这里java环境是1.8的,因为jdk的安装配置都比较简单,这里就不演示jdk的安装过程了,直接安装Kafka。
到官网上复制下载地址,使用wget命令进行下载并解压:
[root@study-01 ~]# cd /usr/local/src/ [root@study-01 /usr/local/src]# wget http://mirrors.hust.edu.cn/apache/kafka/2.0.0/kafka_2.11-2.0.0.tgz [root@study-01 /usr/local/src]# tar -zxvf kafka_2.11-2.0.0.tgz [root@study-01 /usr/local/src]# mv kafka_2.11-2.0.0 /usr/local/kafka [root@study-01 /usr/local/src]# cd !$
没有特殊要求的话,咱们使用默认的kafka配置便可。若你但愿kafka可以被外部机器访问,则须要配置一下你机器的外网ip地址及Kafka监听端口,另外咱们可能还须要配置broker的id和kafka日志文件的存储目录,以下:
[root@study-01 /usr/local/kafka]# vim ./config/server.properties listeners=PLAINTEXT://192.168.190.129:9092 advertised.listeners=PLAINTEXT://192.168.190.129:9092 broker.id=1 # broker的id,必须是集群中惟一的 log.dirs=/tmp/kafka-logs # kafka日志文件的存储目录 [root@study-01 /usr/local/kafka]#
如今咱们就可使用kafka了,因为kafka依赖zookeeper,因此咱们在启动kafka前须要先启动kafka自带的zookeeper服务:
[root@study-01 /usr/local/kafka]# nohup ./bin/zookeeper-server-start.sh ./config/zookeeper.properties > zookeeper.out &
zookeeper服务启动成功后,启动kafka:
[root@study-01 /usr/local/kafka]# nohup ./bin/kafka-server-start.sh ./config/server.properties > kafka.out &
两个服务都启动成功后,监听的端口以下:
[root@study-01 ~]# netstat -lntp |grep java tcp6 0 0 :::38031 :::* LISTEN 3629/java tcp6 0 0 :::33620 :::* LISTEN 3945/java tcp6 0 0 :::9092 :::* LISTEN 3945/java tcp6 0 0 :::2181 :::* LISTEN 3629/java [root@study-01 ~]#
接下来咱们测试一下kafka是否正常可用,首先建立一个topic,命令以下:
[root@study-01 /usr/local/kafka]# ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic hello Created topic "hello". [root@study-01 /usr/local/kafka]#
建立topic成功后,使用以下命令测试是否能正常获取topic列表:
[root@study-01 /usr/local/kafka]# ./bin/kafka-topics.sh --list --zookeeper localhost:2181 hello [root@study-01 /usr/local/kafka]#
如今能够肯定topic建立成功了,而后咱们来启动producer,测试往一个topic上发送消息:
[root@study-01 /usr/local/kafka]# ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic hello>hello world >hello kafka >
接着启动consumer,测试从一个topic上消费消息:
[root@study-01 /usr/local/kafka]# ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic hello --from-beginning hello world hello kafka
经过以上测试,能够看到,kafka可以正常的建立topic进行发送/接收消息,那么就表明咱们安装成功了。