了解 Apache Kafka是什么java
掌握Apache Kafka的基本架构node
搭建Kafka集群git
掌握操做集群的两种方式github
了解Apache Kafka高级部分的内容算法
消息系统最核心的功能有三个,分别是解耦、异步、并行
。spring
下面咱们经过用户注册的案例来讲明消息系统的做用:shell
问题:随着后端流程愈来愈多,每步流程都须要额外的耗费不少时间,从而会致使用户更长的等待延迟。apache
问题:系统并行的发起了4 个请求,4 个请求中,若是某一个环节执行1 分钟,其余环节再快,用户也须要等待1 分钟。若是其中一个环节异常以后,整个服务挂掉了。bootstrap
如何解决???vim
可见,经过消息架构解决了并行架构的问题。
Apache Kafka 是一个开源消息系统,由Scala 写成。是由Apache 软件基金会开发的一个开源消息系统项目。
Kafka 最初是由LinkedIn 开发,并于2011 年初开源。2012 年10 月从Apache Incubator 毕业。该项目的目标是为处理实时数据提供一个统1、高通量、低等待(低延时)的平台。
Kafka 是一个分布式消息系统:具备生产者、消费者的功能。它提供了相似于JMS 的特性,可是在设计实现上彻底不一样,此外它并非JMS 规范的实现。【重点】
Producer:消息的发送者
Consumer:消息的接收者
kafka cluster:kafka的集群。
Topic:就是消息类别名,一个topic中一般放置一类消息。每一个topic都有一个或者多个订阅者(消费者)。
消息的生产者将消息推送到kafka集群,消息的消费者从kafka集群中拉取消息。
说明:
broker:集群中的每个kafka实例,称之为broker;
ZooKeeper:Kafka 利用ZooKeeper 保存相应元数据信息, Kafka 元数据信息包括如代理节点信息、Kafka集群信息、旧版消费者信息及其消费偏移量信息、主题信息、分区状态信息、分区副本分配方案信息、动态配置信息等。
ConsumerGroup:在Kafka 中每个消费者都属于一个特定消费组( ConsumerGroup ),咱们能够为每一个消费者指定一个消费组,以groupld 表明消费组名称,经过group.id 配置设置。若是不指定消费组,则该消费者属于默认消费组test-consumer-group 。
消息持久化
Kafka 基于文件系统来存储和缓存消息。
高吞吐量
Kafka 将数据写到磁盘,充分利用磁盘的顺序读写。同时, Kafka 在数据写入及数据同步采用了零拷贝( zero-copy )技术,采用sendFile()函数调用,sendFile()函数是在两个文件描述符之间直接传递数据,彻底在内核中操做,从而避免了内核缓冲区与用户缓冲区之间数据的拷贝,操做效率极高。
Kafka 还支持数据压缩及批量发送,同时Kafka 将每一个主题划分为多个分区,这一系列的优化及实现方法使得Kafka 具备很高的吞吐量。经大多数公司对Kafka 应用的验证, Kafka 支持每秒数百万级别的消息。
高扩展性
Kafka 依赖ZooKeeper来对集群进行协调管理,这样使得Kafka 更加容易进行水平扩展,生产者、消费者和代理都为分布式,可配置多个。
同时在机器扩展时无需将整个集群停机,集群可以自动感知,从新进行负责均衡及数据复制。
多客户端支持
Kafka 核心模块用Scala 语言开发,Kafka 提供了多种开发语言的接入,如Java 、Scala、C 、C++、Python 、Go 、Erlang 、Ruby 、Node. 等。
安全机制
的Kafka 支持如下几种安全措施:
经过SSL 和SASL(Kerberos), SASL/PLA时验证机制支持生产者、消费者与broker链接时的身份认证;
支持代理与ZooKeeper 链接身份验证;
通讯时数据加密;
客户端读、写权限认证;
Kafka 支持与外部其余认证受权服务的集成;
数据备份
Kafka 能够为每一个topic指定副本数,对数据进行持久化备份,这能够必定程度上防止数据丢失,提升可用性。
轻量级
Kafka 的实例是无状态的,即broker不记录消息是否被消费,消费偏移量的管理交由消费者本身或组协调器来维护。
同时集群自己几乎不须要生产者和消费者的状态信息,这就使得Kafka很是轻量级,同时生产者和消费者客户端实现也很是轻量级。
消息压缩
Kafka 支持Gzip, Snappy 、LZ4 这3 种压缩方式,一般把多条消息放在一块儿组成MessageSet,而后再把Message Set 放到一条消息里面去,从而提升压缩比率进而提升吞吐量。
消息系统。
Kafka 做为一款优秀的消息系统,具备高吞吐量、内置的分区、备份冗余分布式等特色,为大规模消息处理提供了一种很好的解决方案。
应用监控。
利用Kafka 采集应用程序和服务器健康相关的指标,如CPU 占用率、IO 、内存、链接数、TPS 、QPS 等,而后将指标信息进行处理,从而构建一个具备监控仪表盘、曲线图等可视化监控系统。例如,不少公司采用Kafka 与ELK (Elastic Search 、Logstash 和Kibana)整合构建应用服务监控系统。
网站用户行为追踪。
为了更好地了解用户行为、操做习惯,改善用户体验,进而对产品升级改进,将用户操做轨迹、内容等信息发送到Kafka 集群上,经过Hadoop 、Spark 或Strom等进行数据分析处理,生成相应的统计报告,为推荐系统推荐对象建模提供数据源,进而为每一个用户进行个性化推荐。
流处理。
须要将己收集的流数据提供给其余流式计算框架进行处理,用Kafka 收集流数据是一个不错的选择。
持久性日志。
Kafka 能够为外部系统提供一种持久性日志的分布式系统。日志能够在多个节点间进行备份, Kafka 为故障节点数据恢复提供了一种从新同步的机制。同时, Kafka很方便与HDFS 和Flume 进行整合,这样就方便将Kafka 采集的数据持久化到其余外部系统。
准备三台虚拟机,分别是node01,node02,node03,而且修改hosts文件以下:
vim /etc/hosts
#注意: 前面的ip地址改为本身的ip地址
192.168.40.133 node01
192.168.40.134 node02
192.168.40.135 node03
#3台服务器的时间要一致
#时间更新:
yum install -y rdate
rdate -s time-b.nist.gov
因为Kafka 是用Scala 语言开发的,运行在JVM上,所以在安装Kafka 以前须要先安装JDK 。
安装过程略过,我这里使用的是jdk1.8。
Kafka 依赖ZooKeeper ,经过ZooKeeper 来对服务节点、消费者上下线管理、集群、分区元数据管理等,所以ZooKeeper 也是Kafka 得以运行的基础环境之一。
#上传zookeeper-3.4.9.tar.gz到/export/software
cd /export/software
mkdir -p /export/servers/
tar -xvf zookeeper-3.4.9.tar.gz -C /export/servers/
#建立ZooKeeper的data目录
mkdir /export/data/zookeeper -p
cd /export/servers/zookeeper-3.4.9/conf/
#修改配置文件
mv zoo_sample.cfg zoo.cfg
vim zoo.cfg
#设置data目录
dataDir=/export/data/zookeeper
#启动ZooKeeper
./zkServer.sh start
#检查是否启动成功
jps
#在/export/data/zookeeper目录中建立myid文件
vim /export/data/zookeeper/myid
#写入对应的节点的id,如:1,2等,保存退出
#在conf下,修改zoo.cfg文件
vim zoo.cfg
#添加以下内容
server.1=node01:2888:3888
server.2=node02:2888:3888
server.3=node03:2888:3888
vim /etc/profile
export ZK_HOME=/export/servers/zookeeper-3.4.9
export PATH=${ZK_HOME}/bin:$PATH
#当即生效
source /etc/profile
scp /etc/profile node02:/etc/
scp /etc/profile node03:/etc/
cd /export/servers
scp -r zookeeper-3.4.9 node02:/export/servers/
scp -r zookeeper-3.4.9 node03:/export/servers/
mkdir -p /export/servers/onekey/zk
vim slave
#输入以下内容
node01
node02
node03
#保存退出
vim startzk.sh
#输入以下内容
cat /export/servers/onekey/zk/slave | while read line
do
{
echo "开始启动 --> "$line
ssh $line "source /etc/profile;nohup sh ${ZK_HOME}/bin/zkServer.sh start >/dev/null 2>&1 &"
}&
wait
done
echo "★★★启动完成★★★"
#保存退出
vim stopzk.sh
#输入以下内容
cat /export/servers/onekey/zk/slave | while read line
do
{
echo "开始中止 --> "$line
ssh $line "source /etc/profile;nohup sh ${ZK_HOME}/bin/zkServer.sh stop >/dev/null 2>&1 &"
}&
wait
done
echo "★★★中止完成★★★"
#保存退出
#设置可执行权限
chmod +x startzk.sh stopzk.sh
#添加到环境变量中
export ZK_ONEKEY=/export/servers/onekey
export PATH=${ZK_ONEKEY}/zk:$PATH
发现三台机器都有“QuorumPeerMain”进程,说明机器已经启动成功了。
检查集群是否正常:
zkServer.sh status
发现,集群运行一切正常。
第一步:上传Kafka安装包而且解压
rz 上传kafka_2.11-1.1.0.tgz到 /export/software/
cd /export/software/
tar -xvf kafka_2.11-1.1.0.tgz -C /export/servers/
cd /export/servers
mv kafka_2.11-1.1.0/ kafka
第二步:配置环境变量
vim /etc/profile
#输入以下内容
export KAFKA_HOME=/export/servers/kafka
export PATH=${KAFKA_HOME}/bin:$PATH
#保存退出
source /etc/profile
第三步:修改配置文件
cd /export/servers/kafka
cd config
vim server.properties
# The id of the broker. This must be set to a unique integer for each broker.
# 必需要只要一个brokerid,而且它必须是惟一的。
broker.id=0
# A comma separated list of directories under which to store log files
# 日志数据文件存储的路径 (如不存在,须要手动建立该目录, mkdir -p /export/data/kafka/)
log.dirs=/export/data/kafka
# ZooKeeper的配置,本地模式下指向到本地的ZooKeeper服务便可
zookeeper.connect=node01:2181
# 保存退出
第四步:启动kafka服务
# 以守护进程的方式启动kafka
kafka-server-start.sh -daemon /export/servers/kafka/config/server.properties
第五步:检测kafka是否启动
若是进程中有名为kafka的进程,就说明kafka已经启动了。
因为kafka是将元数据保存在ZooKeeper中的,因此,能够经过查看ZooKeeper中的信息进行验证kafka是否安装成功。
Kafka Manager 由 yahoo 公司开发,该工具能够方便查看集群 主题分布状况,同时支持对 多个集群的管理、分区平衡以及建立主题等操做。
源码托管于github:https://github.com/yahoo/kafka-manager
第一步:上传Kafka-manager安装包而且解压
rz上传kafka-manager-1.3.3.17.tar.gz到 /export/software/
cd /export/software
tar -xvf kafka-manager-1.3.3.17.tar.gz -C /export/servers/
cd /export/servers/kafka-manager-1.3.3.17/conf
第二步:修改配置文件
#修改配置文件
vim application.conf
#新增项,http访问服务的端口
http.port=19000
#修改为本身的zk机器地址和端口
kafka-manager.zkhosts="node01:2181"
#保存退出
第三步:启动服务
cd /export/servers/kafka-manager-1.3.3.17/bin
#启动服务
./kafka-manager -Dconfig.file=../conf/application.conf
#制做启动脚本
vim /etc/profile
export KAFKA_MANAGE_HOME=/export/servers/kafka-manager-1.3.3.17
export PATH=${KAFKA_MANAGE_HOME}/bin:$PATH
source /etc/profile
cd /export/servers/onekey/
mkdir kafka-manager
cd kafka-manager
vim start-kafka-manager.sh
nohup kafka-manager -Dconfig.file=${KAFKA_MANAGE_HOME}/conf/application.conf >/dev/null 2>&1 &
chmod +x start-kafka-manager.sh
vim /etc/profile
export PATH=${ZK_ONEKEY}/kafka-manager:$PATH
source /etc/profile
第四步:检查是否启动成功
打开浏览器,输入地址:http://node01:19000/,便可看到kafka-manage管理界面。
进入管理界面,是没有显示Cluster信息的,须要添加后才能操做。
添加 Cluster:
输入Cluster Name、ZooKeeper信息、以及Kafka的版本信息(这里最高只能选择1.0.0)。
点击Save按钮保存。
添加成功。
查看kafka的信息
查看Broker信息
查看Topic列表
查看单个topic信息以及操做
优化副本选举
查看消费者信息
kafka集群的搭建是很是简单的,只须要将上面的单机版的kafka分发的其余机器,而且将ZooKeeper信息修改为集群的配置以及设置不一样的broker值便可。
第一步:将kafka分发到node0二、node03
cd /export/servers/
scp -r kafka node02:/export/servers/
scp -r kafka node03:/export/servers/
scp /etc/profile node02:/etc/
scp /etc/profile node03:/etc/
# 分别到node0二、node03机器上执行
source /etc/profile
第二步:修改node0一、node0二、node03上的kafka配置文件
node01:
cd /export/servers/kafka/config
vim server.properties
zookeeper.connect=node01:2181,node02:2181,node03:2181
node02:
cd /export/servers/kafka/config
vim server.properties
broker.id=1
zookeeper.connect=node01:2181,node02:2181,node03:2181
node03:
cd /export/servers/kafka/config
vim server.properties
broker.id=2
zookeeper.connect=node01:2181,node02:2181,node03:2181
第三步:编写一键启动、中止脚本。注意:该脚本依赖于环境变量中的KAFKA_HOME。
mkdir -p /export/servers/onekey/kafka
vim slave
#输入以下内容
node01
node02
node03
#保存退出
vim start-kafka.sh
#输入以下内容
cat /export/servers/onekey/kafka/slave | while read line
do
{
echo "开始启动 --> "$line
ssh $line "source /etc/profile;nohup sh ${KAFKA_HOME}/bin/kafka-server-start.sh -daemon ${KAFKA_HOME}/config/server.properties >/dev/null 2>&1 &"
}&
wait
done
echo "★★★启动完成★★★"
#保存退出
chmod +x start-kafka.sh
vim stop-kafka.sh
#输入以下内容
cat /export/servers/onekey/kafka/slave | while read line
do
{
echo "开始中止 --> "$line
ssh $line "source /etc/profile;nohup sh ${KAFKA_HOME}/bin/kafka-server-stop.sh >/dev/null 2>&1 &"
}&
wait
done
echo "★★★中止完成★★★"
#保存退出
chmod +x stop-kafka.sh
#加入到环境变量中
export PATH=${ZK_ONEKEY}/kafka:$PATH
source /etc/profile
第四步:经过kafka-manager管理工具查看集群信息。
因而可知,kafka集群已经启动完成。
对kafka的操做有2种方式,一种是经过命令行方式,一种是经过API方式。
Kafka在bin目录下提供了shell脚本文件,能够对Kafka进行操做,分别是:
经过命令行的方式,咱们将体验下kafka,以便咱们对kafka有进一步的认知。
kafka-topics.sh --create --zookeeper node01:2181 --replication-factor 1 --partitions 1 --topic my-kafka-topic
#执行结果:
Created topic "my-kafka-topic".
参数说明:
zookeeper:参数是必传参数,用于配置 Kafka 集群与 ZooKeeper 链接地址。至少写一个。
partitions:参数用于设置主题分区数,该配置为必传参数。
replication-factor:参数用来设置主题副本数 ,该配置也是必传参数。
topic:指定topic的名称。
kafka-topics.sh --list --zookeeper node01:2181
__consumer_offsets
my-kafka-topic
能够查看列表。
若是须要查看topic的详细信息,须要使用describe命令。
kafka-topics.sh --describe --zookeeper node01:2181 --topic test-topic
#若不指定topic,则查看全部topic的信息
kafka-topics.sh --describe --zookeeper node01:2181
经过kafka-topics.sh执行删除动做,须要在server.properties文件中配置 delete.topic.enable=true,该配置默认为 false。
不然执行该脚本并未真正删除主题 ,将该topic标记为删除状态 。
kafka-topics.sh --delete --zookeeper node01:2181 --topic my-kafka-topic
# 执行以下
[root@node01 config]# kafka-topics.sh --delete --zookeeper node01:2181 --topic my-kafka-topic
Topic my-kafka-topic is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
# 若是将delete.topic.enable=true
[root@node01 config]# kafka-topics.sh --delete --zookeeper node01:2181 --topic my-kafka-topic2
Topic my-kafka-topic2 is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
# 说明:虽然设置后,删除时依然提示没有设置为true,实际上已经删除了。
kafka-console-producer.sh --broker-list node01:9092 --topic my-kafka-topic
能够看到,已经向topic发送了消息。
kafka-console-consumer.sh --bootstrap-server node01:9092 --topic my-kafka-topic
# 经过以上命令,能够看到消费者能够接收生产者发送的消息
# 若是须要从头开始接收数据,须要添加--from-beginning参数
kafka-console-consumer.sh --bootstrap-server node01:9092 --from-beginning --topic my-kafka-topic
除了经过命令行的方式操做kafka外,还能够经过Java api的方式操做,这种方式将更加的经常使用。
导入依赖:
因为主题的元数据信息是注册在 ZooKeeper 相 应节点之中,因此对主题的操做实质是对 ZooKeeper 中记录主题元数据信息相关路径的操做。 Kafka将对 ZooKeeper 的相关操做封装成一 个 ZkUtils 类 , 井封装了一个AdrninUtils 类调用 ZkClient 类的相关方法以实现对 Kafka 元数据 的操做,包括对主题、代理、消费者等相关元数据的操做。对主题操做的相关 API调用较简单, 相应操做都是经过调用 AdminUtils类的相应方法来完成的。
package cn. .kafka;
import kafka.admin.AdminUtils;
import kafka.utils.ZkUtils;
import org.apache.kafka.common.security.JaasUtils;
import org.junit.Test;
import java.util.Properties;
public class TestKafkaTopic {
测试结果:
测试结果:
package cn. .kafka;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.Test;
import java.util.Properties;
public class TestProducer {
package cn. .kafka;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.Test;
import javax.sound.midi.Soundbank;
import java.util.Arrays;
import java.util.Properties;
public class TestConsumer {
异步方式:两个 send 方法都返回一个 Future<RecordMetadata>对象,即只负责将消息 发送到消息缓冲区,并不等待 Sender线程处理结果,若但愿了解异步方式消息发送成功与否 ,能够在回调函数中进行相应处理, 当消息被 Sender线程处理后会回调 Callback。
同步方式:经过调用 send方法返回的 Future对象的 get()方法以阻塞式获取执行结果, 即等待 Sender线程处理的最终结果。
默认采用的异步方式。
package cn. .kafka;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.Test;
import java.util.Properties;
public class TestAsyncProducer {
测试的结果:
发送消息 --> 0 发送消息 --> 1 发送消息 --> 2 发送消息 --> 3 发送消息 --> 4 发送消息 --> 5 发送消息 --> 6 发送消息 --> 7 发送消息 --> 8 发送消息 --> 9 消息的callbakc --> my-kafka-topic-0@-1 消息的callbakc --> my-kafka-topic-0@-1 消息的callbakc --> my-kafka-topic-0@-1 消息的callbakc --> my-kafka-topic-0@-1 消息的callbakc --> my-kafka-topic-0@-1 消息的callbakc --> my-kafka-topic-0@-1 消息的callbakc --> my-kafka-topic-0@-1 消息的callbakc --> my-kafka-topic-0@-1 消息的callbakc --> my-kafka-topic-0@-1 消息的callbakc --> my-kafka-topic-0@-1 Process finished with exit code 0
package cn. .kafka;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.Test;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
public class TestSyncProducer {
测试:
消息的callbakc --> my-kafka-topic-0@3128 发送消息 --> 0 消息的callbakc --> my-kafka-topic-0@3129 发送消息 --> 1 消息的callbakc --> my-kafka-topic-0@3130 发送消息 --> 2 消息的callbakc --> my-kafka-topic-0@3131 发送消息 --> 3 消息的callbakc --> my-kafka-topic-0@3132 发送消息 --> 4 消息的callbakc --> my-kafka-topic-0@3133 发送消息 --> 5 消息的callbakc --> my-kafka-topic-0@3134 发送消息 --> 6 消息的callbakc --> my-kafka-topic-0@3135 发送消息 --> 7 消息的callbakc --> my-kafka-topic-0@3136 发送消息 --> 8 消息的callbakc --> my-kafka-topic-0@3137 发送消息 --> 9 Process finished with exit code 0
在 Kafka 中每个消费者都属于一个特定消费组( ConsumerGroup),咱们能够为每一个消费者指定一个消费组,以 groupld 表明消费组名称,经过 group.id 配置设置 。 若是不指定消费组,则 该消费者属于默 认消费组 test-consumer-group。
须要重点说明的是:
同一个主题的一条消息只能同一个消费者组下的某一个消费者消费。
不一样消费组的消费者可同时消费该消息。
消费者1: my-client-1
package cn. .kafka;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.Test;
import org.springframework.kafka.listener.MessageListener;
import javax.sound.midi.Soundbank;
import java.util.Arrays;
import java.util.Properties;
public class TestConsumer {
消费者2:my-client-2
package cn. .kafka;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.junit.Test;
import java.util.Arrays;
import java.util.Properties;
public class TestConsumer2 {
测试:
分别启动消费者1和消费者2,而后在控制台输入消息,观察消费者1和消费者2的控制台打印状况。
消费者1:my-client-11在my-group-test-3组中
package cn. .kafka;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.Test;
import org.springframework.kafka.listener.MessageListener;
import javax.sound.midi.Soundbank;
import java.util.Arrays;
import java.util.Properties;
public class TestConsumer {
消费者2:my-client-22在my-group-test-4组中
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.junit.Test;
import java.util.Arrays;
import java.util.Properties;
public class TestConsumer2 {
测试:
因而可知,2个消费者获取到的消息是同样的。
消费组是 Kafka 用来实现对一个主题消息进行广播和单播的手段,实现消息广播只需指定各消费者均属于不一样的消费组,消息单播则只 需让各消费者属于同 一个消费组 。
建立topic时,若是不指定分区,那么,topic的数据只是在一个broker中存储。
问题:若是一台机器存储不下怎么办?
建立topic时,能够指定分区,将一个topic的数据分散存储到多个broker,实现分区存储,从而解决一个机器存储不下的问题,也就实现了存储的横向扩展。
第一种:经过脚本建立topic时指定
kafka-topics.sh --create --zookeeper node01:2181 --replication-factor 1 --partitions 3 --topic my-kafka-topic-3
第二种:在经过java api建立topic时指定
//参数:zkUtils,topic名称,partition数量,副本数量,参数,机架感知模式
AdminUtils.createTopic(zkUtils, topicName, 3, 1, new Properties(), AdminUtils.createTopic$default$6());
说明:
同一个partition的数据能够被不一样的消费组获取。
同一个组内的消费者不能同时消费同一个partition。
同一个组内的消费者能够消费不一样的partition,也就是说,同一个组内的消费者数 小于等于 partition数,不能大于。
若是同组内的消费者数大于partition数,那么必定会有消费者是空闲的。
建立topic时,若是不指定副本,那么一个partition只会在一个Broker中存储
问题:若是该机器宕机,那么topic的数据将丢失。
建立topic时,若是设置副本,那么kafka将在其余分区中保存该分区的数据,已确保数据的可靠性。
在多个副本中,kafka会选取一个做为leader提供服务。其它的做为Follower节点存在。
第一种:经过脚本建立topic时指定
kafka-topics.sh --create --zookeeper node01:2181 --replication-factor 3 --partitions 3 --topic my-kafka-topic-4
第二种:在经过java api建立topic时指定
//参数:zkUtils,topic名称,partition数量,副本数量,参数,机架感知模式
AdminUtils.createTopic(zkUtils, topicName, 3, 3, new Properties(), AdminUtils.createTopic$default$6());
消息的生产者将消息发送到leader,Follower会进行同步消息,若是消息还未同步完成,leader宕机,消息会丢失吗?
其实, Kafka为生产者提供3种消息确认机制(acks),分别是,0,-1,1,默认为:1。
( 1) 当 acks=0时,生产者不用等待代理返回确认信息,而连续发送消息。显然这种 方式加快了消息投递的速度,然而没法保证消息是否己被代理接受 ,有可能存在丢失数据 的风险。
(2)当 acsk=1时,生产者须要等待 Leader 己成功将消息写入日志文件中。这种方式 在必定程度上下降了数据丢失的可能性,但仍没法保证数据必定不会丢失。若是在 Leader副本 成功存储数据后, Follower 副本尚未来得及进行同步,而此时 Leader 着机了,那么此时虽 然数据己进行了存储,因为原来的 Leader 己不可用而会从集群中下线,同时存活的代理又不再会有从原来的 Leader副本存储的数据,此时数据就会丢失。
(3)当 acks=-1 时, Leader副本和全部 ISR列表中的副本都完成数据存储时才会向生产者 发送确认信息,这种策略保证只要 Leader 副本和 Follower 副本中至少有一个节点存活,数据就 不会丢失。为了保证数据不丢失,须要保证同步的副本至少大于1,经过参数 min.insync.replicas 设置,当同步副本数不足此配置值时,生产者会抛出异常,但这种方式同时也影响了生产者发 送消息的速度以及吞吐量。
在kafka0.8.2版本以后,消费者将消费记录(offset)值,保存在名为__consumer_offsets的topic中。
因此,只要可以正确记录offset值,就能保证消息不丢失,可是有可能消息会重复消费。
在Kafka 文件存储中,同一个topic 下有多个不一样partition,每一个partition 为一个目录,partiton命名规则为topic 名称+有序序号,第一个partiton 序号从0 开始,序号最大值为partitions数量减1。
每一个partion(目录)至关于一个巨型文件被平均分配到多个大小相等segment(段)数据文件中。但每一个段segment file 消息数量不必定相等,这种特性方便old segment file 快速被删除。默认保留7 天的数据。
Segment file 组成:由3大部分组成,分别为index file 、data file、timeindex file,此3个文件一一对应,分别表示为segment 索引文件、数据文件、时间日志。
当log文件等于1G时,新的会写入到下一个segment中。
Segment 文件命名规则:partion 全局的第一个segment 从0 开始,后续每一个segment文件名为上一个segment 文件最后一条消息的offset 值。数值最大为64 位long 大小,19 位数字字符长度,没有数字用0 填充。
索引文件存储大量元数据,数据文件存储大量消息,索引文件中元数据指向对应数据文件中message 的物理偏移地址。
上述图中索引文件存储大量元数据,数据文件存储大量消息,索引文件中元数据指向对应数据文件中message 的物理偏移地址。其中以索引文件中元数据3,497 为例,依次在数据文件中表示第3 个message(在全局partiton表示第368772 个message)、以及该消息的物理偏移地址为497。
若是须要读取offset=368776 的message,如何查找?
00000000000000000000.index 表示最开始的文件,起始偏移量(offset)为000000000000000368769.index 的消息量起始偏移量为368770 = 368769 + 100000000000000737337.index 的起始偏移量为737338=737337 + 1其余后续文件依次类推。以起始偏移量命名并排序这些文件,只要根据offset 二分查找文件列表,就能够快速定位到具体文件。当offset=368776 时定位到00000000000000368769.index 和对应log 文件。
当offset=368776 时, 依次定位到00000000000000368769.index 的元数据物理位置和00000000000000368769.log 的物理偏移地址,而后再经过00000000000000368769.log 顺序查找直到offset=368776 为止。
kafka为何要将数据进行分段存储,好处是什么?
读写数据速度快
小文件必定比大文件快
对于旧数据,清除起来方便
kafka在数据生产的时候,有一个数据分发策略。默认的状况使用DefaultPartitioner.class类。
分发策略有三种:
若是是用户指定了partition,生产就不会调用DefaultPartitioner.partition()方法
当用户指定key,使用hash算法。Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions
若是key一直不变,同一个key算出来的hash值是个固定值。
若是是固定值,这种hash取模就没有意义。
当用既没有指定partition也没有key,就采用轮询算法。
源码:
public class KafkaProducer<K, V> implements Producer<K, V> {
。。。。。。。
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
Integer partition = record.partition();
return partition != null ?
partition :
partitioner.partition(
record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
}
DefaultPartitioner.java文件:
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {
// 轮询算法
int nextValue = nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// no partitions are available, give a non-available partition
return Utils.toPositive(nextValue) % numPartitions;
}
} else {
// hash the keyBytes to choose a partition hash算法
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}