Kafka 最先是由 LinkedIn 公司开发一种分布式的基于发布/订阅的消息系统,以后成为 Apache 的顶级项目。主要特色以下:html
Kafka 的设计目标是以时间复杂度为 O(1) 的方式提供消息持久化能力,即便对TB 级以上数据也能保证常数时间的访问性能。即便在很是廉价的商用机器上也能作到单机支持每秒 100K 条消息的传输。java
将消息持久化到磁盘,所以可用于批量消费,例如 ETL 以及实时应用程序。经过将数据持久化到硬盘以及 replication 防止数据丢失。node
支持 Server 间的消息分区及分布式消费,同时保证每一个 partition 内的消息顺序传输。这样易于向外扩展,全部的producer、broker 和 consumer 都会有多个,均为分布式的。无需停机便可扩展机器。apache
消息被处理的状态是在 consumer 端维护,而不是由 server 端维护,broker 无状态,consumer 本身保存 offset。bootstrap
同时支持离线数据处理和实时数据处理。后端
Kafka 集群中的一台或多台服务器统称为 Broker缓存
每条发布到 Kafka 的消息都有一个类别,这个类别被称为 Topic 。(物理上不一样 Topic 的消息分开存储。逻辑上一个 Topic 的消息虽然保存于一个或多个broker上,但用户只需指定消息的 Topic 便可生产或消费数据而没必要关心数据存于何处)安全
Topic 物理上的分组,一个 Topic 能够分为多个 Partition ,每一个 Partition 是一个有序的队列。Partition 中的每条消息都会被分配一个有序的 id(offset)bash
消息和数据的生产者,能够理解为往 Kafka 发消息的客户端服务器
消息和数据的消费者,能够理解为从 Kafka 取消息的客户端
每一个 Consumer 属于一个特定的 Consumer Group(可为每一个 Consumer 指定Group Name,若不指定 Group Name 则属于默认的 Group)。 这是 Kafka 用来实现一个 Topic 消息的广播(发给全部的 Consumer )和单播(发给任意一个 Consumer )的手段。一个 Topic 能够有多个 Consumer Group。Topic 的消息会复制(不是真的复制,是概念上的)到全部的 Consumer Group,但每一个 Consumer Group 只会把消息发给该 Consumer Group 中的一个 Consumer。若是要实现广播,只要每一个 Consumer 有一个独立的 Consumer Group 就能够了。若是要实现单播只要全部的 Consumer 在同一个 Consumer Group 。用 Consumer Group 还能够将 Consumer 进行自由的分组而不须要屡次发送消息到不一样的 Topic 。
Mac 用户用 HomeBrew 来安装,安装前要先更新 brew
brew update
复制代码
接着安装 kafka
brew install kafka
复制代码
安装完成以后能够查看 kafka 的配置文件
cd /usr/local/etc/kafka
复制代码
kafka 须要用到 zookeeper,HomeBrew 安装kafka 的时候会同时安装 zookeeper。下面先启动 zookeeper:
zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties
复制代码
接着启动 kafka
cd /usr/local/Cellar/kafka/0.11.0.1
./bin/kafka-server-start /usr/local/etc/kafka/server.properties
复制代码
建立 topic,设置 partition 数量为2,topic 的名字叫 test-topic,下面的例子都用这个 topic
cd /usr/local/Cellar/kafka/0.11.0.1
./bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic test-topic
复制代码
查看建立的 topic
cd /usr/local/Cellar/kafka/0.11.0.1
./bin/kafka-topics --list --zookeeper localhost:2181
复制代码
cd /usr/local/Cellar/kafka/0.11.0.1/bin
kafka-console-producer --broker-list localhost:9092 --topic test-topic
复制代码
cd /usr/local/Cellar/kafka/0.11.0.1/bin
kafka-console-consumer --bootstrap-server localhost:9092 --topic test-topic --from-beginning
复制代码
cd /usr/local/Cellar/kafka/0.11.0.1/bin
./bin/kafka-topics --delete --zookeeper localhost:2181 --topic test-topic
复制代码
若是 kafka 启动时加载的配置文件中 server.properties 没有配置delete.topic.enable=true,那么此时的删除并非真正的删除,而是把 topic 标记为:marked for deletion
cd /usr/local/Cellar/kafka/0.11.0.1/bin
./bin/kafka-topics --zookeeper localhost:2181 --list
复制代码
登陆zookeeper客户端:/usr/local/Cellar/zookeeper/3.4.10/bin/zkCli
找到topic所在的目录:ls /brokers/topics
找到要删除的topic,执行命令:rmr /brokers/topics/test-topic 便可,此时topic被完全删除
复制代码
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.1</version>
</dependency>
复制代码
package org.study.kafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.HashMap;
import java.util.Map;
public class ProducerSample {
public static void main(String[] args) {
Map<String, Object> props = new HashMap<String, Object>();
props.put("zk.connect", "127.0.0.1:2181");//zookeeper 的地址
props.put("bootstrap.servers", "localhost:9092");//用于创建与 kafka 集群链接的 host/port 组。
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
String topic = "test-topic";
Producer<String, String> producer = new KafkaProducer<String, String>(props);
producer.send(new ProducerRecord<String, String>(topic, "idea-key2", "java-message 1"));
producer.send(new ProducerRecord<String, String>(topic, "idea-key2", "java-message 2"));
producer.send(new ProducerRecord<String, String>(topic, "idea-key2", "java-message 3"));
producer.close();
}
}
复制代码
package org.study.kafka;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
public class ConsumerSample {
public static void main(String[] args) {
String topic = "test-topic";// topic name
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");//用于创建与 kafka 集群链接的 host/port 组。
props.put("group.id", "testGroup1");// Consumer Group Name
props.put("enable.auto.commit", "true");// Consumer 的 offset 是否自动提交
props.put("auto.commit.interval.ms", "1000");// 自动提交 offset 到 zookeeper 的时间间隔,时间是毫秒
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
Consumer<String, String> consumer = new KafkaConsumer(props);
consumer.subscribe(Arrays.asList(topic));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("partition = %d, offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value());
}
}
}
复制代码
zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties
复制代码
kafka-server-start /usr/local/etc/kafka/server.properties
复制代码
先运行 Consumer ,这样当生产者发送消息的时候能在消费者后端看到消息记录。
运行 Producer,发布几条消息,在 Consumer 的控制台能看到接收的消息
kafka 的集群配置通常有三种,即: single node - single broker ,single node - multiple broker ,multiple node - multiple broker
前两种实际上官网有介绍。
zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties
复制代码
kafka-server-start /usr/local/etc/kafka/server.properties
复制代码
kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic topic-singlenode-singlebroker
复制代码
kafka-console-producer --broker-list localhost:9092 --topic topic-singlenode-singlebroker
复制代码
broker-list 和 topic 这两个参数是必须的,broker-list 指定要链接的 broker 的地址,格式为 node_address:port 。topic 是必须的,由于须要发送消息给订阅了该 topic 的 consumer group 。 如今能够在命令行里输入一些信息,每一行会被做为一个消息。
kafka-console-consumer --bootstrap-server localhost:9092 --topic topic-singlenode-singlebroker
复制代码
在不一样的终端窗口里分别启动 zookeeper、broker、producer、consumer 后,在 producer 终端里输入消息,消息就会在 consumer 终端中显示了。
zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties
复制代码
若是须要在单个节点(即一台机子)上面启动多个 broker(这里做为例子启动三个 broker),须要准备多个server.properties文件便可,因此须要复制 /usr/local/etc/kafka/server.properties 文件。由于须要为每一个 broker 指定单独的属性配置文件,其中 broker.id 、 port 、 log.dir 这三个属性必须是不一样的。
新建一个 kafka-example 目录和三个存放日志的目录
mkdir kafka-example
mkdir kafka-logs-1
mkdir kafka-logs-2
mkdir kafka-logs-3
复制代码
复制 /usr/local/etc/kafka/server.properties 文件三份
cp server.properties /Users/niwei/Downloads/kafka-example/server-1.properties
cp server.properties /Users/niwei/Downloads/kafka-example/server-2.properties
cp server.properties /Users/niwei/Downloads/kafka-example/server-3.properties
复制代码
在 broker1 的配置文件 server-1.properties 中,相关要修改的参数为:
broker.id=1
port=9093
log.dirs=/Users/niwei/Downloads/kafka-example/kafka-logs-1
复制代码
broker2 的配置文件 server-2.properties 中,相关要修改的参数为:
broker.id=2
port=9094
log.dirs=/Users/niwei/Downloads/kafka-example/kafka-logs-2
复制代码
broker3 的配置文件 server-3.properties 中,相关要修改的参数为:
broker.id=3
port=9095
log.dirs=/Users/niwei/Downloads/kafka-example/kafka-logs-3
复制代码
启动每一个 broker
cd /Users/niwei/Downloads/kafka-example
kafka-server-start server-1.properties
kafka-server-start server-2.properties
kafka-server-start server-3.properties
复制代码
建立一个名为 topic-singlenode-multiplebroker 的topic
kafka-topics --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic topic-singlenode-multiplebroker
复制代码
若是一个 producer 须要链接多个 broker 则须要传递参数 broker-list
kafka-console-producer --broker-list localhost:9093, localhost:9094, localhost:9095 --topic topic-singlenode-multiplebroker
复制代码
kafka-console-consumer --zookeeper localhost:2181 --topic topic-singlenode-multiplebroker
复制代码
broker.id=1 #当前机器在集群中的惟一标识
port=9093 #当前 kafka 对外提供服务的端口,默认是 9092
host.name=192.168.121.101 #这个参数默认是关闭的,在0.8.1有个bug,DNS解析问题,失败率的问题。
log.dirs=/Users/niwei/Downloads/kafka-example/kafka-logs-1 #消息存放的目录,这个目录能够配置为逗号分割的表达式
zookeeper.connect=192.168.120.101:2181,192.168.120.102:2181,192.168.120.103:2181 #设置 zookeeper 集群的链接端口
num.network.threads=3 #这个是 borker 进行网络处理的线程数
num.io.threads=5 #这个是 borker 进行 IO 处理的线程数
socket.send.buffer.bytes=102400 #发送缓冲区的大小,数据先回存储到缓冲区了到达必定的大小后在发送能提升性能
socket.receive.buffer.bytes=102400 #接收缓冲区的大小,当数据到达必定大小后在序列化到磁盘
socket.request.max.bytes=104857600 #这个参数是向 kafka 请求消息或者向 kafka 发送消息的请求的最大数,这个值不能超过 jvm 的堆栈大小
num.partitions=1 #默认的分区数,一个 topic 默认1个分区数
log.retention.hours=24 #默认消息的最大持久化时间,24小时
message.max.byte=5242880 #消息保存的最大值5M
default.replication.factor=2 #kafka 保存消息的副本数,若是一个副本失效了,另外一个还能够继续提供服务
replica.fetch.max.bytes=5242880 #取消息的最大直接数
log.segment.bytes=1073741824 #这个参数是由于 kafka 的消息是以追加的形式落地到文件,当超过这个值的时候,kafka 会新建一个文件
log.retention.check.interval.ms=300000 #每隔 300000 毫秒去检查上面配置的 log 失效时间(log.retention.hours=24 ),到目录查看是否有过时的消息若是有则删除
log.cleaner.enable=false #是否启用 log 压缩,通常不用启用,启用的话能够提升性能
复制代码
因为是多节点多 broker 的,因此每一个 broker 的配置文件 server.properties 都要按以上说明修改
kafka-console-producer --broker-list 192.168.21.1:9092,192.168.21.2:9092,192.168.21.3:9092 --topic topic-multiplenode-multiplebroker
复制代码
kafka-console-consumer --zookeeper 192.168.120.101:2181,192.168.120.102:2181,192.168.120.103:2181 --topic topic-multiplenode-multiplebroker
复制代码
Kafka 提供了很高的数据冗余弹性,对于须要数据高可靠性的场景能够增长数据冗余备份数(replication.factor),调高最小写入副本数的个数(min.insync.replicas)等等,可是这样会影响性能。反之,性能提升而可靠性则下降,用户须要自身业务特性在彼此之间作一些权衡性选择。
要保证数据写入到 Kafka 是安全的、高可靠的,须要以下的配置:
replication.factor>=3,即副本数至少是3个2<=min.insync.replicas<=replication.factor
leader 的选举条件 unclean.leader.election.enable=false
request.required.acks=-1,producer.type=sync
消息中间件从功能上看就是写入数据、读取数据两大类,优化也能够从这两方面来看。
为了优化写入速度 Kafak 采用如下技术:
磁盘大多数都仍是机械结构(SSD不在讨论的范围内),若是将消息以随机写的方式存入磁盘,就须要按柱面、磁头、扇区的方式寻址,缓慢的机械运动(相对内存)会消耗大量时间,致使磁盘的写入速度与内存写入速度差好几个数量级。为了规避随机写带来的时间消耗,Kafka 采起了顺序写的方式存储数据,以下图所示:
即使是顺序写入硬盘,硬盘的访问速度仍是不可能追上内存。因此 Kafka 的数据并非实时的写入硬盘,它充分利用了现代操做系统分页存储来利用内存提升I/O效率。Memory Mapped Files (后面简称mmap)也被翻译成内存映射文件,在64位操做系统中通常能够表示 20G 的数据文件,它的工做原理是直接利用操做系统的 Page 来实现文件到物理内存的直接映射。完成映射以后对物理内存的操做会被同步到硬盘上(由操做系统在适当的时候)。 经过 mmap 进程像读写硬盘同样读写内存,也没必要关心内存的大小,有虚拟内存为咱们兜底。使用这种方式能够获取很大的 I/O 提高,由于它省去了用户空间到内核空间复制的开销(调用文件的 read 函数会把数据先放到内核空间的内存中,而后再复制到用户空间的内存中) 但这样也有一个很明显的缺陷——不可靠,写到 mmap 中的数据并无被真正的写到硬盘,操做系统会在程序主动调用 flush 的时候才把数据真正的写到硬盘。因此 Kafka 提供了一个参数—— producer.type 来控制是否是主动 flush,若是Kafka 写入到 mmap 以后就当即 flush 而后再返回 Producer 叫同步(sync);若是写入 mmap 以后当即返回,Producer 不调用 flush ,就叫异步(async)。
为了不无效率的字节复制,尤为是在负载比较高的状况下影响是显著的。为了不这种状况,Kafka 采用由 Producer,Broker 和 Consumer 共享的标准化二进制消息格式,这样数据块就能够在它们之间自由传输,无需转换,下降了字节复制的成本开销。
而在读取速度的优化上 Kafak 采起的主要是零拷贝
传统模式下咱们从硬盘读取一个文件是这样的
(2) 应用将数据从内核空间读到用户空间的缓存中
(3) 应用将数据写会内核空间的套接字缓存中
(4)操做系统将数据从套接字缓存写到网卡缓存中,以便将数据经网络发出
这样作明显是低效的,这里有四次拷贝,两次系统调用。 针对这种状况 Unix 操做系统提供了一个优化的路径,用于将数据从页缓存区传输到 socket。在 Linux 中,是经过 sendfile 系统调用来完成的。Java提供了访问这个系统调用的方法:FileChannel.transferTo API。这种方式只须要一次拷贝:操做系统将数据直接从页缓存发送到网络上,在这个优化的路径中,只有最后一步将数据拷贝到网卡缓存中是须要的。
Kafka 速度的秘诀在于它把全部的消息都变成一个的文件。经过 mmap 提升 I/O 的速度,写入数据的时候是末尾添加因此速度最优;读取数据的时候配合sendfile 直接暴力输出。因此单纯的去测试 MQ 的速度没有任何意义,Kafka 的这种暴力的作法已经脱了 MQ 的底裤,更像是一个暴力的数据传送器。