1、Kafka 简介前言:在以前的文章里面已经了解到了「消息队列」是怎么样的一种存在(传送门),Kafka 做为当下流行的一种中间件,咱们如今开始学习它!
html
Kafka 是一个消息系统,本来开发自 LinkedIn,用做 LinkedIn 的活动流(Activity Stream)和运营数据处理管道(Pipeline)的基础。如今它已被多家不一样类型的公司 做为多种类型的数据管道和消息系统使用。shell
活动流数据是几乎全部站点在对其网站使用状况作报表时都要用到的数据中最常规的部分。活动数据包括页面访问量(Page View)、被查看内容方面的信息以及搜索状况等内容。这种数据一般的处理方式是先把各类活动以日志的形式写入某种文件,而后周期性地对这些文件进行统计分析。运营数据指的是服务器的性能数据(CPU、IO 使用率、请求时间、服务日志等等数据)。运营数据的统计方法种类繁多。数据库
近年来,活动和运营数据处理已经成为了网站软件产品特性中一个相当重要的组成部分,这就须要一套稍微更加复杂的基础设施对其提供支持。bootstrap
Kafka 是一种分布式的,基于发布 / 订阅的消息系统。主要设计目标以下:数组
以时间复杂度为 O(1) 的方式提供消息持久化能力,即便对 TB 级以上数据也能保证常数时间复杂度的访问性能。缓存
高吞吐率。即便在很是廉价的商用机器上也能作到单机支持每秒 100K 条以上消息的传输。安全
支持 Kafka Server 间的消息分区,及分布式消费,同时保证每一个 Partition 内的消息顺序传输。服务器
同时支持离线数据处理和实时数据处理。网络
Scale out:支持在线水平扩展。session
对于 Kafka 来讲客户端有两种基本类型:生产者(Producer)和消费者(Consumer)。除此以外,还有用来作数据集成的 Kafka Connect API 和流式处理的 Kafka Streams 等高阶客户端,但这些高阶客户端底层仍然是生产者和消费者API,它们只不过是在上层作了封装。
这很容易理解,生产者(也称为发布者)建立消息,而消费者(也称为订阅者)负责消费or读取消息。
在 Kafka 中,消息以主题(Topic)来分类,每个主题都对应一个「消息队列」,这有点儿相似于数据库中的表。可是若是咱们把全部同类的消息都塞入到一个“中心”队列中,势必缺乏可伸缩性,不管是生产者/消费者数目的增长,仍是消息数量的增长,均可能耗尽系统的性能或存储。
咱们使用一个生活中的例子来讲明:如今 A 城市生产的某商品须要运输到 B 城市,走的是公路,那么单通道的高速公路不管是在「A 城市商品增多」仍是「如今 C 城市也要往 B 城市运输东西」这样的状况下都会出现「吞吐量不足」的问题。因此咱们如今引入分区(Partition)的概念,相似“容许多修几条道”的方式对咱们的主题完成了水平扩展。
一个 Kafka 服务器也称为 Broker,它接受生产者发送的消息并存入磁盘;Broker 同时服务消费者拉取分区消息的请求,返回目前已经提交的消息。使用特定的机器硬件,一个 Broker 每秒能够处理成千上万的分区和百万量级的消息。(如今动不动就百万量级..我特意去查了一把,好像确实集群的状况下吞吐量挺高的..摁..)
若干个 Broker 组成一个集群(Cluster),其中集群内某个 Broker 会成为集群控制器(Cluster Controller),它负责管理集群,包括分配分区到 Broker、监控 Broker 故障等。在集群内,一个分区由一个 Broker 负责,这个 Broker 也称为这个分区的 Leader;固然一个分区能够被复制到多个 Broker 上来实现冗余,这样当存在 Broker 故障时能够将其分区从新分配到其余 Broker 来负责。下图是一个样例:
Kafka 的一个关键性质是日志保留(retention),咱们能够配置主题的消息保留策略,譬如只保留一段时间的日志或者只保留特定大小的日志。当超过这些限制时,老的消息会被删除。咱们也能够针对某个主题单独设置消息过时策略,这样对于不一样应用能够实现个性化。
随着业务发展,咱们每每须要多集群,一般处于下面几个缘由:
基于数据的隔离;
基于安全的隔离;
多数据中心(容灾)
当构建多个数据中心时,每每须要实现消息互通。举个例子,假如用户修改了我的资料,那么后续的请求不管被哪一个数据中心处理,这个更新须要反映出来。又或者,多个数据中心的数据须要汇总到一个总控中心来作数据分析。
上面说的分区复制冗余机制只适用于同一个 Kafka 集群内部,对于多个 Kafka 集群消息同步可使用 Kafka 提供的 MirrorMaker 工具。本质上来讲,MirrorMaker 只是一个 Kafka 消费者和生产者,并使用一个队列链接起来而已。它从一个集群中消费消息,而后往另外一个集群生产消息。
2、Kafka 的设计与实现上面咱们知道了 Kafka 中的一些基本概念,但做为一个成熟的「消息队列」中间件,其中有许多有意思的设计值得咱们思考,下面咱们简单列举一些。
是的,您首先应该知道 Kafka 的消息是存在于文件系统之上的。Kafka 高度依赖文件系统来存储和缓存消息,通常的人认为 “磁盘是缓慢的”,因此对这样的设计持有怀疑态度。实际上,磁盘比人们预想的快不少也慢不少,这取决于它们如何被使用;一个好的磁盘结构设计可使之跟网络速度同样快。
现代的操做系统针对磁盘的读写已经作了一些优化方案来加快磁盘的访问速度。好比,预读会提早将一个比较大的磁盘快读入内存。后写会将不少小的逻辑写操做合并起来组合成一个大的物理写操做。而且,操做系统还会将主内存剩余的全部空闲内存空间都用做磁盘缓存,全部的磁盘读写操做都会通过统一的磁盘缓存(除了直接 I/O 会绕过磁盘缓存)。综合这几点优化特色,若是是针对磁盘的顺序访问,某些状况下它可能比随机的内存访问都要快,甚至能够和网络的速度相差无几。
上述的 Topic 实际上是逻辑上的概念,面相消费者和生产者,物理上存储的实际上是 Partition,每个 Partition 最终对应一个目录,里面存储全部的消息和索引文件。默认状况下,每个 Topic 在建立时若是不指定 Partition 数量时只会建立 1 个 Partition。好比,我建立了一个 Topic 名字为 test ,没有指定 Partition 的数量,那么会默认建立一个 test-0 的文件夹,这里的命名规则是:<topic_name>-<partition_id>
。
任何发布到 Partition 的消息都会被追加到 Partition 数据文件的尾部,这样的顺序写磁盘操做让 Kafka 的效率很是高(经验证,顺序写磁盘效率比随机写内存还要高,这是 Kafka 高吞吐率的一个很重要的保证)。
每一条消息被发送到 Broker 中,会根据 Partition 规则选择被存储到哪个 Partition。若是 Partition 规则设置的合理,全部消息能够均匀分布到不一样的 Partition中。
假设咱们如今 Kafka 集群只有一个 Broker,咱们建立 2 个 Topic 名称分别为:「topic1」和「topic2」,Partition 数量分别为 一、2,那么咱们的根目录下就会建立以下三个文件夹:
| --topic1-0
| --topic2-0
| --topic2-1
在 Kafka 的文件存储中,同一个 Topic 下有多个不一样的 Partition,每一个 Partition 都为一个目录,而每个目录又被平均分配成多个大小相等的 Segment File 中,Segment File 又由 index file 和 data file 组成,他们老是成对出现,后缀 ".index" 和 ".log" 分表表示 Segment 索引文件和数据文件。
如今假设咱们设置每一个 Segment 大小为 500 MB,并启动生产者向 topic1 中写入大量数据,topic1-0 文件夹中就会产生相似以下的一些文件:
| --topic1-0
| --00000000000000000000.index
| --00000000000000000000.log
| --00000000000000368769.index
| --00000000000000368769.log
| --00000000000000737337.index
| --00000000000000737337.log
| --00000000000001105814.index
| --00000000000001105814.log
| --topic2-0
| --topic2-1
Segment 是 Kafka 文件存储的最小单位。Segment 文件命名规则:Partition 全局的第一个 Segment 从 0 开始,后续每一个 Segment 文件名为上一个 Segment 文件最后一条消息的 offset 值。数值最大为 64 位 long 大小,19 位数字字符长度,没有数字用0填充。如 00000000000000368769.index 和 00000000000000368769.log。
以上面的一对 Segment File 为例,说明一下索引文件和数据文件对应关系:
其中以索引文件中元数据 <3, 497>
为例,依次在数据文件中表示第 3 个 message(在全局 Partition 表示第 368769 + 3 = 368772 个 message)以及该消息的物理偏移地址为 497。
注意该 index 文件并非从0开始,也不是每次递增1的,这是由于 Kafka 采起稀疏索引存储的方式,每隔必定字节的数据创建一条索引,它减小了索引文件大小,使得可以把 index 映射到内存,下降了查询时的磁盘 IO 开销,同时也并无给查询带来太多的时间消耗。
由于其文件名为上一个 Segment 最后一条消息的 offset ,因此当须要查找一个指定 offset 的 message 时,经过在全部 segment 的文件名中进行二分查找就能找到它归属的 segment ,再在其 index 文件中找到其对应到文件上的物理位置,就能拿出该 message 。
因为消息在 Partition 的 Segment 数据文件中是顺序读写的,且消息消费后不会删除(删除策略是针对过时的 Segment 文件),这种顺序磁盘 IO 存储设计师 Kafka 高性能很重要的缘由。
Kafka 是如何准确的知道 message 的偏移的呢?这是由于在 Kafka 定义了标准的数据存储结构,在 Partition 中的每一条 message 都包含了如下三个属性:
offset:表示 message 在当前 Partition 中的偏移量,是一个逻辑上的值,惟一肯定了 Partition 中的一条 message,能够简单的认为是一个 id;
MessageSize:表示 message 内容 data 的大小;
data:message 的具体内容
当咱们发送消息以前,先问几个问题:每条消息都是很关键且不能容忍丢失么?偶尔重复消息能够么?咱们关注的是消息延迟仍是写入消息的吞吐量?
举个例子,有一个信用卡交易处理系统,当交易发生时会发送一条消息到 Kafka,另外一个服务来读取消息并根据规则引擎来检查交易是否经过,将结果经过 Kafka 返回。对于这样的业务,消息既不能丢失也不能重复,因为交易量大所以吞吐量须要尽量大,延迟能够稍微高一点。
再举个例子,假如咱们须要收集用户在网页上的点击数据,对于这样的场景,少许消息丢失或者重复是能够容忍的,延迟多大都不重要只要不影响用户体验,吞吐则根据实时用户数来决定。
不一样的业务须要使用不一样的写入方式和配置。具体的方式咱们在这里不作讨论,如今先看下生产者写消息的基本流程:
图片来源:http://www.dengshenyu.com/%E5%88%86%E5%B8%83%E5%BC%8F%E7%B3%BB%E7%BB%9F/2017/11/12/kafka-producer.html
流程以下:
首先,咱们须要建立一个ProducerRecord,这个对象须要包含消息的主题(topic)和值(value),能够选择性指定一个键值(key)或者分区(partition)。
发送消息时,生产者会对键值和值序列化成字节数组,而后发送到分配器(partitioner)。
若是咱们指定了分区,那么分配器返回该分区便可;不然,分配器将会基于键值来选择一个分区并返回。
选择完分区后,生产者知道了消息所属的主题和分区,它将这条记录添加到相同主题和分区的批量消息中,另外一个线程负责发送这些批量消息到对应的Kafka broker。
当broker接收到消息后,若是成功写入则返回一个包含消息的主题、分区及位移的RecordMetadata对象,不然返回异常。
生产者接收到结果后,对于异常可能会进行重试。
假设这么个场景:咱们从Kafka中读取消息,而且进行检查,最后产生结果数据。咱们能够建立一个消费者实例去作这件事情,但若是生产者写入消息的速度比消费者读取的速度快怎么办呢?这样随着时间增加,消息堆积愈来愈严重。对于这种场景,咱们须要增长多个消费者来进行水平扩展。
Kafka消费者是消费组的一部分,当多个消费者造成一个消费组来消费主题时,每一个消费者会收到不一样分区的消息。假设有一个T1主题,该主题有4个分区;同时咱们有一个消费组G1,这个消费组只有一个消费者C1。那么消费者C1将会收到这4个分区的消息,以下所示:
若是咱们增长新的消费者C2到消费组G1,那么每一个消费者将会分别收到两个分区的消息,以下所示:
若是增长到4个消费者,那么每一个消费者将会分别收到一个分区的消息,以下所示:
但若是咱们继续增长消费者到这个消费组,剩余的消费者将会空闲,不会收到任何消息:
总而言之,咱们能够经过增长消费组的消费者来进行水平扩展提高消费能力。这也是为何建议建立主题时使用比较多的分区数,这样能够在消费负载高的状况下增长消费者来提高性能。另外,消费者的数量不该该比分区数多,由于多出来的消费者是空闲的,没有任何帮助。
Kafka一个很重要的特性就是,只需写入一次消息,能够支持任意多的应用读取这个消息。换句话说,每一个应用均可以读到全量的消息。为了使得每一个应用都能读到全量消息,应用须要有不一样的消费组。对于上面的例子,假如咱们新增了一个新的消费组G2,而这个消费组有两个消费者,那么会是这样的:
在这个场景中,消费组G1和消费组G2都能收到T1主题的全量消息,在逻辑意义上来讲它们属于不一样的应用。
最后,总结起来就是:若是应用须要读取全量消息,那么请为该应用设置一个消费组;若是该应用消费能力不足,那么能够考虑在这个消费组里增长消费者。
能够看到,当新的消费者加入消费组,它会消费一个或多个分区,而这些分区以前是由其余消费者负责的;另外,当消费者离开消费组(好比重启、宕机等)时,它所消费的分区会分配给其余分区。这种现象称为重平衡(rebalance)。重平衡是 Kafka 一个很重要的性质,这个性质保证了高可用和水平扩展。不过也须要注意到,在重平衡期间,全部消费者都不能消费消息,所以会形成整个消费组短暂的不可用。并且,将分区进行重平衡也会致使原来的消费者状态过时,从而致使消费者须要从新更新状态,这段期间也会下降消费性能。后面咱们会讨论如何安全的进行重平衡以及如何尽量避免。
消费者经过按期发送心跳(hearbeat)到一个做为组协调者(group coordinator)的 broker 来保持在消费组内存活。这个 broker 不是固定的,每一个消费组均可能不一样。当消费者拉取消息或者提交时,便会发送心跳。
若是消费者超过必定时间没有发送心跳,那么它的会话(session)就会过时,组协调者会认为该消费者已经宕机,而后触发重平衡。能够看到,从消费者宕机到会话过时是有必定时间的,这段时间内该消费者的分区都不能进行消息消费;一般状况下,咱们能够进行优雅关闭,这样消费者会发送离开的消息到组协调者,这样组协调者能够当即进行重平衡而不须要等待会话过时。
在 0.10.1 版本,Kafka 对心跳机制进行了修改,将发送心跳与拉取消息进行分离,这样使得发送心跳的频率不受拉取的频率影响。另外更高版本的 Kafka 支持配置一个消费者多长时间不拉取消息但仍然保持存活,这个配置能够避免活锁(livelock)。活锁,是指应用没有故障可是因为某些缘由不能进一步消费。
上面提到,Kafka 中一个 topic 中的消息是被打散分配在多个 Partition(分区) 中存储的, Consumer Group 在消费时须要从不一样的 Partition 获取消息,那最终如何重建出 Topic 中消息的顺序呢?
答案是:没有办法。Kafka 只会保证在 Partition 内消息是有序的,而无论全局的状况。
下一个问题是:Partition 中的消息能够被(不一样的 Consumer Group)屡次消费,那 Partition中被消费的消息是什么时候删除的?Partition 又是如何知道一个 Consumer Group 当前消费的位置呢?
不管消息是否被消费,除非消息到期 Partition 从不删除消息。例如设置保留时间为 2 天,则消息发布 2 天内任何 Group 均可以消费,2 天后,消息自动被删除。
Partition 会为每一个 Consumer Group 保存一个偏移量,记录 Group 消费到的位置。以下图:
消费者应该向 Broker 要数据(pull)仍是 Broker 向消费者推送数据(push)?做为一个消息系统,Kafka 遵循了传统的方式,选择由 Producer 向 broker push 消息并由 Consumer 从 broker pull 消息。一些 logging-centric system,好比 Facebook 的Scribe和 Cloudera 的Flume,采用 push 模式。事实上,push 模式和 pull 模式各有优劣。
push 模式很难适应消费速率不一样的消费者,由于消息发送速率是由 broker 决定的。push 模式的目标是尽量以最快速度传递消息,可是这样很容易形成 Consumer 来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而 pull 模式则能够根据 Consumer 的消费能力以适当的速率消费消息。
对于 Kafka 而言,pull 模式更合适。pull 模式可简化 broker 的设计,Consumer 可自主控制消费消息的速率,同时 Consumer 能够本身控制消费方式——便可批量消费也可逐条消费,同时还能选择不一样的提交方式从而实现不一样的传输语义。
当咱们讨论可靠性的时候,咱们总会提到保证*这个词语。可靠性保证是基础,咱们基于这些基础之上构建咱们的应用。好比关系型数据库的可靠性保证是ACID,也就是原子性(Atomicity)、一致性(Consistency)、隔离性(Isolation)和持久性(Durability)。
Kafka 中的可靠性保证有以下四点:
对于一个分区来讲,它的消息是有序的。若是一个生产者向一个分区先写入消息A,而后写入消息B,那么消费者会先读取消息A再读取消息B。
当消息写入全部in-sync状态的副本后,消息才会认为已提交(committed)。这里的写入有可能只是写入到文件系统的缓存,不必定刷新到磁盘。生产者能够等待不一样时机的确认,好比等待分区主副本写入即返回,后者等待全部in-sync状态副本写入才返回。
一旦消息已提交,那么只要有一个副本存活,数据不会丢失。
消费者只能读取到已提交的消息。
使用这些基础保证,咱们构建一个可靠的系统,这时候须要考虑一个问题:究竟咱们的应用须要多大程度的可靠性?可靠性不是无偿的,它与系统可用性、吞吐量、延迟和硬件价格息息相关,得此失彼。所以,咱们每每须要作权衡,一味的追求可靠性并不实际。
3、动手搭一个 Kafka想了解更多戳这里:http://www.dengshenyu.com/%E5%88%86%E5%B8%83%E5%BC%8F%E7%B3%BB%E7%BB%9F/2017/11/21/kafka-data-delivery.html
经过上面的描述,咱们已经大体了解到了「Kafka」是何方神圣了,如今咱们开始尝试本身动手本地搭一个来实际体验一把。
这里以 Mac OS 为例,在安装了 Homebrew 的状况下执行下列代码:
brew install kafka
因为 Kafka 依赖了 Zookeeper,因此在下载的时候会自动下载。
咱们在启动以前首先须要修改 Kafka 的监听地址和端口为 localhost:9092
:
vi /usr/local/etc/kafka/server.properties
而后修改为下图的样子:
依次启动 Zookeeper 和 Kafka:
brew services start zookeeper
brew services start kafka
而后执行下列语句来建立一个名字为 "test" 的 Topic:
kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
咱们能够经过下列的命令查看咱们的 Topic 列表:
kafka-topics --list --zookeeper localhost:2181
而后咱们新建一个控制台,运行下列命令建立一个消费者关注刚才建立的 Topic:
kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning
用控制台往刚才建立的 Topic 中添加消息,并观察刚才建立的消费者窗口:
kafka-console-producer --broker-list localhost:9092 --topic test
能经过消费者窗口观察到正确的消息:
https://www.infoq.cn/article/kafka-analysis-part-1 - Kafka 设计解析(一):Kafka 背景及架构介绍
http://www.dengshenyu.com/%E5%88%86%E5%B8%83%E5%BC%8F%E7%B3%BB%E7%BB%9F/2017/11/06/kafka-Meet-Kafka.html - Kafka系列(一)初识Kafka
https://lotabout.me/2018/kafka-introduction/ - Kafka 入门介绍
https://www.zhihu.com/question/28925721 - Kafka 中的 Topic 为何要进行分区? - 知乎
https://blog.joway.io/posts/kafka-design-practice/ - Kafka 的设计与实践思考
http://www.dengshenyu.com/%E5%88%86%E5%B8%83%E5%BC%8F%E7%B3%BB%E7%BB%9F/2017/11/21/kafka-data-delivery.html - Kafka系列(六)可靠的数据传输
转自公众号:我没有三颗心脏 ,做者我没有三颗心脏