Kafka 权威指南 笔记

看了下 Kafka 作了一些随笔的笔记。先看了第1、3、4、5、十一章,后续章节还会慢慢补上。java

 

第一章   初识 :生产者和消费者正则表达式

 

生产者:算法

一个消息会被发布到一个特定主题上。生产者默认吧消息均衡分不到主题的全部分区上,并不关心特定消息会被写到那个分区。sql

某些状况下,生产者会把消息直接写到指定分区。是经过消息键和分区器来实现,分区器为键为键生成一个散列值,并将其映射到指定的分区上。这样可保证同一个键的消息会被写到同一个分区上。数据库

 

消费者:编程

消费者订阅一个或多个主题,并按照消息生成的顺序读取他们。bootstrap

消费者经过检查消息的偏移量来区分已经读取过的消息。设计模式

偏移量 是另外一种元数据,一个不断递增的整数值,在建立消息时,Kafka 把它添加到消息里。数组

在给定分区里,消息偏移量都是惟一的,消费者把每一个分区最后读取的消息偏移量保存在 Zookeeper 或 kafka上,若是消费者关闭或重启,它的读取状态不会丢失。[ 这也是为何 消费者宕机后,可以从宕机前的位置继续读取数据 ]缓存

 

多消费者 ,消费者是消费者群组的一部分,多个消费者共通读取一个主题。群组保证每一个分区智能被一个消费者使用。消费者与分区之间的映射一般被称为消费者对分区的全部权关系

 

 

 

 

broker 和集群

一个独立的 kafka 服务器被称为 broker。

broker 接收来自生产者的消息,为消息设置偏移量,并提交消息到磁盘保存。broker 为消费者提供服务,对读取分区的请求做出相应,返回已经提交到磁盘上的消息。

根据特定硬件及其性能特性,单个 broker 能够轻松处理数千个分区以及每秒百万级的消息量。

 

broker 是集群的组成部分。每一个集群都有一个 broker 充当集群控制器的角色(自动从集群的活跃成员中选举出来)。

保留消息(在必定期限内)是kafka 一个重要特性。kafka默认消息保留策略 是 :默认 7天,或 默认 1G 。当消息达到这些上限时,旧消息就会过时并被删除。主题能够配置本身的保留策略,能够将消息保留到再也不使用他们为止。

 

多集群

三点缘由: 最好使用多个集群

数据类型分离

安全需求隔离

多数据中心(灾难恢复)

若是使用多个数据中心,就须要在它们之间复制消息。这样应用程序能够访问到多个站点的用户活动信息。 kafka 的消息复制机制智能在单个集群里进行,不能再多个集群之间进行。

Kafka 提供了一个叫作 MirrorMaker的工具,能够实现集群间的消息复制。

 

 

为何选择 Kafka

多个生产者

Kafka 能够无缝支持多个生产者,无论客户端在使用单个主题仍是多个主题。

 

多个消费者

Kafka 支持多个消费者从一个单独的消息流上读取数据,且消息之间互不影响。(这与其余队列系统不一样,其余队列系统的消息一旦被一个客户端读取,其余客户端就没法读取它。另外,多个消费者能够组成一个群组,他们共享一个消息流,并保证整个群组对每一个给定的消息只处理一次。)

 

基于磁盘的数据存储

Kafka 容许消费者非实时地读取消息,归功于 kafka的数据保留特性。

消费者能够在进行应用程序维护时离线一小段时间,无需担忧消息丢失或者拥塞在生产者端。

 

伸缩性

Kafka 从一开始被设计成一个具备灵活伸缩性的系统。开发阶段能够先使用单个 broker,再扩展到包含3个broker的小型开发集群...,一个包含多个broker的集群,即便个别broker 失效,仍然能够持续为客户提供服务。

 

高性能

经过横向扩展生产者、消费者 broker, kafka 能够处理巨大的消息流。在处理大量数据的同时,它能保证亚秒级的消息延迟。

 

 

数据生态系统

Kafka 为数据生态系统带来了循环系统,它在基础设施的各个组件之间传递消息,为全部客户端提供一直的接口。

使用场景:

活动跟踪

传递消息

 

默认配置

broker 配置

Kafka 发送包里自带配置样本能够安装单机服务,并不能知足大多数安装场景的要求。

 

主题配置

主题吞吐量 / 消费者吞吐量 = 分区个数

num.partitions : 默认把分区大小限制在 25 GB 之内,能够获得比较理想的效果。

log.retention.ms : 数据被保留时间。默认为 168小时(一周),还有 log.retention.hours、log.retention.minutes

log.retention.bytes : 经过保留消息字节数来判断消息是否过时。例:一个包含8个分区的主题,而且 log.retention.bytes 设置为 1GB , 那么这个主题最多能够保留8GB的数据。

log.segment.bytes : 当日志片断大小达到 log.segment.bytes 指定的上线(默认 1G)时,当前日志片断就会被关闭,一个新的日志片断被打开。

log.segment.ms : 控制日志片断关闭时间

message.max.bytes : broker 经过设置message.max.bytes 参数来限制单个消息的大小。默认值是 1 000 000 ,也就是 1M。若是生产者发送消息超过这个值,消息不会被接收,broker 还会返回 错误信息。

 

硬件性能

(磁盘吞吐量 容量、内存、网络、CPU )

 

 

Kafka 集群

简单kafka集群图:

 

 

 

须要多少 broker ?

一、须要多少磁盘空间来保留数据,以及单个 broker 有多少空间可用。

若是 整个集群须要保留 10TB 数据,每一个broker 能够存储 2TB,至少须要 5 个 broker。若是启用了数据复制,至少还须要一倍的空间(取决于复制系数是多少)。

二、集群处理请求的能力。与网络接口处理客户端流量能力有关。

 

broker 配置

要把一个 broker 加入集群,须要配置两个参数。

一、全部 broker 都必须配置相同的 zookeeper.connect ,指定了用于保存元数据的 Zookeeper 群组和路径。

二、每一个 broker 都必须为 broker.id 参数设置惟一的值。若是 两个 broker 使用相同的broker.id,那么第二个 broker 就没法启动。

 

生产环境注意

垃圾回收器选项

Java 7 有 G1 垃圾回收器,.

MaxGcPauseMillis: 指定每次垃圾回收默认的停顿时间。值不固定,默认 200ms。

InitiatingHeapOccupancyPercent: 指定了G1启动新一轮垃圾回收以前可使用的堆内存百分比,默认 45。即 堆内存使用率到达 45% 前,G1不会启动垃圾回收。这个百分比包括 新生代 和 老年代的内存。

(例:若是一台服务器有 64GB内存,而且使用 5GB 堆内存来运行 Kafka,

参考配置:MaxGcPauseMillis : 20ms;

InitiatingHeapOccupancyPercent : 35 ;

kafka 启动脚本没有启用 G1回收器,使用了 Paraller New 和 CMS 垃圾回收器。

 

数据中心布局

把集群的 broker 安装在不一样机架上,不能让他们共享可能出现单点故障的基础设施。

 

共享zookeeper

Kafka 使用 zookeeper 来保存 broker、主题和分区的元数据信息。

Kafka 消费者 和 zookeeper

kafka 0.9.0.0 版本前,除 broker 外,消费者会使用 zookeeper 来保存一些信息,

kafka 0.9.0.0 版本后,kafka 引入了一个新的消费者接口,容许 broker 直接维护这些信息。

kafka 对 zookeeper 的延迟和超时比较敏感。

 

第三章: 生产者

 

 

 

概览

Kafka 发送消息主要步骤,从建立一个 ProducerRecord 对象开始,对象须要包含目标主题和要发送的内容。还能够指定键或分区。

发送 ProducerRecord 对象时,生产者先把键和值对象序列化成字节数组,这样才可在网络上传输。

数据传给分区器。如指定了分区,分区器直接把指定分区返回。如未指定,分区器会根据 ProducerRecord 对象的键来选择一个分区。

分区后,生产者知道往哪一个分区发送信息,这条信息被添加到一个记录批次里,这个批次全部消息会被发送到相同的主题和分区上。有独立线程负责把这些记录批次发送到相应 broker 上。

服务器收到消息会返回一个相应。若是消息成功写入 Kafka ,就返回一个包含了 主题、分区信息、分区里偏移量 的 RecordMetaData 对象。写入失败,返回错误。生产者收到错误后会尝试从新发送,几回后还失败,就返回错误信息。

 

建立生产者

kafka写消息,先要建立生产者对象,并设置一些属性。有三个必选属性:

bootstrap.servers broker 地址清单,格式为 host:port 。清单中不用包含全部 broker地址,生产者会从给定的broker里查找到其余broker信息。建议至少两个,一旦其中一个宕机,生产者仍能链接到集群上。

key.serializer  broker 接收的消息都是字节数组,生产者须要知道如何把这些java对象转换成字节数组。key.serializer 必须被设置为一个实现了 Serializer 接口的类。Kafka客户端默认提供了 ByteArraySerializer、StringSerializer、IntegerSerializer。

value.serializer :与 key.serializer 同样,value.serializer 指定类会将值序列化。

实例化生产者对象后,发送消息主要有三种方式:

发送并忘记 消息发给服务器,不关心是否正常到达。kafka高可用,生产者会自动尝试重发。有时会丢失一些消息。

同步发送 send() 方法发送消息,返回 Futrue 对象,调用 get() 方法等待。

kafkaProducer 通常会发生两类错误。

1:可重试错误,可经过重发消息解决。例:链接错误、无主(no leader)错误。kafka 能够配置成自动重试,若是屡次重试后仍没法解决,程序会受到一个重试异常。

二、另类错误,没法经过重试解决。例:“消息太大” 异常,不进行重试,一直抛出异常

异步发送 send() 方法发送消息,指定一个回调函数,服务器在返回响应时调用该函数。

 

生产者配置

acks  指定了必需要有多少个分区副本收到消息,生产者才会认为消息写入时成功的。

acks = 0 :生产者在成功写入消息前不会等待任何来自服务器的响应。

能够以网络可以支持的最大速度发送消息,从而达到很高的吞吐量。

acks = 1 : 只要集群首领节点收到消息,生产者就会收到一个来自服务器的成功响应。

吞吐量取决于使用的是同步发送仍是异步发送。

acks = all :只有当全部参与复制的节点所有收到消息时,生产者才会收到一个来自服务器的成功响应。模式最安全。延迟最高。

buffer.memory  设置生产者内存缓冲区大小。缓冲要发送到服务器的消息。(0.9.0.0 版本替换为 max.block.ms)

compression.type :默认状况,消息发送时不会被压缩。该参数能够设置为 snappy、gzip、lz4。指定了消息发送给 broker以前使用哪一种压缩算法。

snappy 压缩占用较少CPU,提供较好性能和压缩比。比较关注 性能和网络带宽可用。

gzip 占用较多CPU,若是网络带宽比较有限 可用。

retries :生产者能够重发消息的次数。默认:生产者每次重试之间等待 100ms( 能够经过 retry.backoff.ms 参数来改变这个时间间隔 ),建议设置 总的重试时间比kafka集群从崩溃中恢复的时间长。通常状况,生产者会自动进行重试,代码中可不处理 可重试错误,只处理 不可重试错误 或 重试次数超出上限的状况。

batch.size :当有多个消息须要被发送到同一个分区时,生产者会把他们放在同一个批次里。该参数指定了一个批次可使用的内存大小,按照字节数计算。

批次填满即发送,半满或一条消息也可能被发送。批次大小设置很大,不会形成延迟,只会占用更多内存。设置过小,生产者须要更频繁地发送消息,会增长一些额外的开销。

linger.ms :指定了生产者在发送批次以前等待更多消息加入批次的时间。

linger.ms 设置成比 0 大的数,虽然会增长延迟,但也会提高吞吐量。

client.id :能够是任意字符串,服务器用它来识别消息来源,还能够用在日志和配额指标里。

max.in.flight.requests.per.connection :指定了生产者在收到服务器响应以前能够发送多少个消息。值越高,占用越多内存,提高吞吐量。设为 1 ,能够保证消息是按照发送的顺序写入服务器的,即便发送了重试。

timeout.ms 、 request.timeout.ms 、metadata.fetch.timeout.ms 

request.timeout.ms : 生产者在发送数据时等待服务器返回响应的时间。

metadata.fetch.timeout.ms :生产者在获取元数据时等待服务器返回响应的时间。若是等待响应超时,生产者要么重试发送数据,要么返回一个错误。

timeout.ms : 指定了broker 等待同步副本返回消息确认的时间,与 asks 的配置相匹配。若是指定时间内没有收到同步副本的确认,那么broker 就会返回一个错误。

max.block.ms :指定了调用 send() 方法或使用 partitionsFor() 方法获取元数据时生产者的阻塞时间。当 生产者发送缓冲区满 或 没有可用元数据时,方法会阻塞。阻塞时间达到 max.block.ms 时,生产者会抛出超时异常。

max.request.size :用于控制生产者发送的请求大小。指 单个消息最大值 单个请求全部消息总大小。

注: broker 对可接收的消息最大值也有本身限制(message.max.bytes),两边最好匹配。避免生产者发送的消息被 broker 拒绝。

receive.buffer.bytes 和 send.buffer.bytes :分别指定 TCP socket 接收和发送数据包的缓冲区大小。值为 -1,就使用操做系统的默认值。若是 生产者或消费者 与 broker 处于不一样的数据中心,那么能够适当增大这些值,由于跨数据中心的网络通常都有比较高的延迟和比较低的带宽。

 

注: 顺序保证kafka 能够保证同一个分区里的消息是有序的。

例:若是把 retries 设为 非零整数,把 max.in.flight.requests.per.connection 设为比 1 大的数。

若是第一批次消息写入失败,第二批次成功,broker重写第一批次。如此时第一批次写入成功,那么两个批次的顺序就反过来了。

若是 retries 为非零整数,把 max.in.flight.requests.per.connection 设为 1,这样生产者发送第一批消息时,就不会有其余消息发送给 broker。这样会严重哦影响生产者的吞吐量,只有在对消息顺序有严格要求状况才这么作。

 

序列化器

已有序列化器和反序列化器 不如 JSON、Avro、Thrift、Protobuf

Apache Avro 序列化: 一种与编程语言无关的序列化格式。

注:写入数据和读取数据的 schema 必须是相互兼容的。

 

分区

ProducerRecord 对象包含目标主题、键、值。Kafka 消息是一个个键值对,ProducerRecord 对象 的键能够设置为默认的 null。

键有两个用途:能够做为消息的附加信息,也能够决定消息该被写到主题的那个分区。拥有相同键的消息将被写入用一个分区。

 

 

第四章: 消费者

消费者和消费者群组

kafka 消费者 从属于消费者群组。一个群组里的消费者订阅的是同一个主题,每一个消费者接收主题一部分分区的消息。

例: kafka 有 4个分区,若是只要 1 个消费者- 一个消费者收到4个分区的消息。

若是有两个消费者,每一个消费者收到两个分区的消息。

若是有4个消费者,每一个消费者收到1个分区的消息。

若是有 5 个消费者,有4个消费者能够收到消息,多出来的消费者会闲置,不会收到消息。

 

多个消费者群组订阅相同的主题,群组互不影响。

 

消费者群组和分区再均衡

群组的消费者共同读取主题的分区。一个新的消费者 加入群组时,他读取的是本来由其余消费者读取的消息。当某一消费者被关闭或发生崩溃,它就离开群组,本来由它读取的分区将由群组里的其余消费者读取。

在主题发送变化时,好比管理员添加了新的分区,会发生分区重分配。

分区的全部权从一个消费者转移到另外一个消费者,这样的行为被称为 再均衡

再均衡期间,消费者没法读取消息,形成整个群组一小段时间的不可用。

当分区被从新分配给另外一个消费者时,消费者当前的读取状态会丢失,他还须要去刷新缓存,在它从新恢复状态以前,会拖慢应用程序。

 

消费者经过向被指派为 群组协调器 broker(不一样群组能够有不一样的协调器) 发送心跳 来维持和群组的从属关系 对分区的全部权关系。消费者会在轮询消息 或提交偏移量时发送心跳。

若是消费者中止发送心跳时间过长,会话过时,群组协调器认为它死亡,就会触发一次再均衡。

 

若是一个消费者崩溃,中止读取消息,群组协调器会等待几秒钟,确认它死亡了才会触发再均衡。这几秒内,死掉的消费者不会读取分区里的消息。在清理消费者时,消费者会通知协调器它将要离开群组,协调器会当即出发一次再均衡,尽可能下降处理停顿。

 

建立 Kafka 消费者

3个必要属性: bootstrap.servers 、key.deserializer、value.deserializer

bootstrap.servers : kafka 集群的链接字符串。(与KfakaProducer 中用途同样)

key.deserializer 和 value.deserializer 与生产者的 serializer 定义也相似

group.id 非必须。 指定了kafkaConsumer 属于哪个消费者群组。

 

订阅主题

subscribe() 方法

consumer.subscribe(Collections.singletonList("customerCountries"));

也能够在 subscribe() 方法时传入一个正则表达式。正则表达式能够匹配多个主题,若是有人建立了新主题,且主题名字与正则表达式匹配,会当即触发一次再均衡,消费者就能够读取新添加的主题。

例:订阅全部与 test 相关的主题,能够 consumer.subscribe(" test.* ");

 

轮询

消息轮询是消费者 API 核心,经过一个简单的轮询服务向服务器请求数据。

一旦消费者订阅了主题,轮询会处理全部的细节,包括群组协调、分区再均衡、发送心跳和获取数据。

轮询不止是获取数据 。在第一次调用新消费者的 poll() 方法时,它会查找 GroupCoordinator,而后加入群组,接收分配的分区。

线程安全 -- 同一群组里,没法让 一个线程运行多个消费者,也没法让多个线程安全的共享一个消费者。若是要在同一个消费者群组里运行多个消费者,须要让每一个消费者运行在本身的线程里,最好把消费者逻辑封装在本身的对象里,而后使用Java 的 ExecutorService 启动多个线程,使每一个消费者运行在本身的线程上。(参考 : https://www.confluent.io/blog/ )

 

消费者的配置

几个配置属性: bootstrap.servers 、group.id 、key.deserializer 、value.deserializer 。

几个重要属性:

fetch.min.bytes :消费者从服务器获取记录的最小字节数

fetch.max.wait.ms : broker 的等待时间,默认 500ms。

max.partition.fetch.bytes : 指定了服务器从每一个分区里返回给消费者的最大字节数。默认 1MB

session.timeout.ms :指定了消费者在被认为死亡以前能够与服务器断开链接的时间,默认 3s

auto.offset.reset : 指定了消费者在读取一个没有偏移量的分区或者偏移量无效的状况下(因消费者长时间失效,包含偏移量的记录已通过时并被删除)该作何处理。默认是 latest (在偏移量无效的状况下,消费者将从最新的记录开始读取数据)。另外一个值时 earliest,意为:在偏移量无效的状况下,消费者将从起始位置读取分区的记录。

enable.auto.commit :指定了 消费者是否自动提交偏移量,默认为 true

partition.assignment.strategy : 分区策略

client.id :能够是任意字符,broker 用它标识从客户端发送过来的消息,常被用在日志、度量指标和配额里。

max.poll.records :用于控制单次调用 call() 方法可以返回的记录数量。

receive.buffer.bytes 和 send.buffer.bytes :socket 在读写数据时用到的TCP 缓冲区也能够设置大小。若是值为 -1,即为操做系统的默认值。

提交和偏移量

每次调用 poll() 方法,它老是返回由生产者写入kafka 但尚未被消费者读取过的记录。

咱们把更新分区当前位置的操做叫作提交

消费者如何提交偏移量?

消费者往一个叫作 _consumer_offset 的特殊主题发送消息,消息里包含每一个分区的偏移量。若是消费者一直处于运行状态,那么偏移量就没用。若是消费者发生崩溃或新消费者加入群组,触发在均衡。再均衡后消费者分到新分区,为了能继续以前工做,消费者须要读取每一个分区最后一次提交的偏移量,而后从偏移量指定的地方继续处理。

若是提交的偏移量小于客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息就会被重复处理。

若是提交的偏移量大于客户端处理的最后一个消息的偏移量,那么助于两个偏移量之间的消息会丢失。

自动提交

最简单。enable.auto.commit = true,消费者自动把poll()方法接收到的最大偏移量提交上去。提交间隔由 auto.commit.interval.ms 控制,默认 5s。

这种方法没法避免由于再均衡致使的 重复处理消息。

提交当前偏移量

auto.commit.offset = false,让应用程序决定什么时候提交偏移量。使用 commitSync() 提交偏移量。

注: commitSync() 将会提交由 poll() 返回的最新偏移量,在处理完全部记录后要确保调用了 commitSync(),不然会有丢失消息的风险。若是发生再均衡,最近一批消息到发生再均衡之间的全部消息都将被重复处理。

异步提交

因为手动提交的不足:【在broke 对提交请求做出回应前,应用程序会一直阻塞,限制应用程序吞吐量。可经过下降提交频率来提高吞吐量。若是发生再均衡,会增长重复消息的数量。】

使用异步提交,只管发送提交请求,无需等待broker 响应。

重试异步提交 使用一个单调递增的序列号维护异步提交顺序,每次提交偏移量后或在回调里提交偏移量时递增序列号。重试前对比回调序列号和即将提交的偏移量是否相等,相等 - 没有新提交,可安全的进行重试,若是序列号比较大,说明有新提交已经发送,应该中止提交。

同步和异步组合提交

通常状况下,偶尔提交失败,不影响后续提交的成功。在关闭消费者或再均衡前的最后一次提交,须要确保可以提交成功。故:消费者关闭前会组合使用 commitAsync() 和 commitSync() 。

提交特定的偏移量

提交偏移量的频率和处理消息批次的频率是同样的。

消费者 API 容许调用 commitSync() 和 commitAsync() 方法时传入但愿提交的分区和偏移量的 map。

 

再均衡监听器

 

从特定偏移量处开始处理记录

从分区起始位置开始读取消息: seekToBeginning(Collection<TopicPartition> tp)

从分区末尾位置开始读取消息: seekToEnd(Collection<TopicPartition> tp)

经过把偏移量和记录保存到同一个外部系统来实现单次语义,结合使用 ConsumerRebalanceListener 和 seek () 方法来确保可以及时保存偏移量,并保证消费者老是可以从正确的位置开始读取消息。

 

如何退出

肯定要退出循环,经过另外一个线程调用 consumer.wakeup() 方法。

若是循环运行在主线程,能够在 ShutdownHook 中调用该方法。

反序列化器

生产者须要用 序列化器 把对象转换成字节数组再发送给kafka。

消费者须要用 反序列化器 把从kafka接收到的字节数组转换成 java对象。

生成消息使用的序列化器 读取消息使用的反序列化器应该是一一对应的。

使用 Avro 和 schema 注册表进行序列化和反序列化的优点: AvroSerializer 能够保证写入主题的数据与主题的 schema 是兼容的。

注: 不建议使用自定义序列化器和自定义反序列化器。它使生产者和消费者牢牢耦合在一块儿。

独立消费者

一个消费者从一个主题的全部分区或某个特定分区读取数据。不须要消费者群组和再均衡,不须要订阅主题,取之 为本身分配分区。

一个消费者能够订阅主题(并加入消费者群组),或者为本身分配分区,但不能同时作这两件事。

 

第五章:深刻Kafka

话题 : Kafka 如何进行复制;

Kafka 如何处理来自生产者和消费者的请求;

Kafka 的存储细节,好比文件格式 和 索引;

集群成员关系

kafka 使用 zookeeper 来维护 集群成员的信息。每一个broker 都有一个惟一的标识符,标识符可在配置文件中指定,也能够自动生成。broker 启动时,经过建立临时节点把本身的id注册到 Zookeeper。kafka 组件订阅 Zookeeper 的 /broker/ids 路径(broker 在 zookeeper 上的注册路径),当有 broker 加入集群或退出集群时,这些组件就能够得到通知。

如要启动另外一个具备相同 id 的 broker,会获得错误。由于 zookeeper 里已经有一个具备相同ID 的broker。

broker 停机、出现网络分区或长时间垃圾回收停顿时,broker 会从 zookeeper 上断开链接,broker 启动时建立的临时节点会自动从 zookeeper 上移除。监听 broker 列表的 kafka 组件会被告知该 broker 已移除。

关闭 broker 时,对应的节点也会消失,在彻底关闭一个 broker以后,若是使用相同的 id 启动另外一个全新的broker ,它会当即加入群组,并拥有与旧broker 相同的分区和主题。

控制器

控制器就是一个broker ,具备普通broker 功能外,还负责分区首领的选举。

集群里第一个启动的broker 经过在 zookeeper 里建立一个临时节点 / controller 让本身成为控制器。其余 broker 启动时也尝试建立这个节点,但会受到“节点已存在” 的异常。其余控制器节点上建立 zookeeper watch 对象,这样它们就能够受到这个节点的变动通知。这种方式能够确保集群里一次只有一个控制器存在。

若是控制器被关闭或者与zookeeper断开链接,zookeeper上临时节点消失。集群上其余 broker经过 watch对象获得通知,会尝试让本身成为新的控制器。第一个在 zookeeper里成功建立控制器节点的broker 会成为新的 控制器,其余节点收到“节点已存在”的异常,而后再新控制器节点上再次建立 watch 对象。

总之: Kafka 使用 zookeeper 的临时节点选举控制器,在节点加入集群或退出集群时通知控制器。控制器负责在节点加入或离开集群时进行分区首领选举。控制器使用 epoch 来避免 “脑裂”。“脑裂” 指连个节点同时认为本身是当前的控制器。

 

复制

复制功能 kafka 架构核心。

kafka :一个分布式的 、 可分区的 、可复制的提交日志服务。

复制 关键 ,由于它能够在个别节点失效时仍能保证kafka 的可用性和持久性。

kafka 使用主题来组织数据,每一个主题被分为若干个分区,每一个分区有多个副本。副本被保存在 broker 上,每一个 broker 能够保存成百上千个属于不一样主题和分区的副本。

副本类型:

首领副本 每一个分区都有一个首领副本。为保证数据一致性,全部生产者请求和消费者请求都会通过这个副本。

跟随者副本 :首领外副本都是跟随者副本。不处理来自客户端的请求,惟一任务是 从首领哪里复制消息,保持与首领一直的状态。若是首领崩溃,其中一个跟随者会被提高为新首领。

为了与首领保持同步,跟随者向首领发送获取数据的请求,请求信息里包含了跟随者想要获取消息的偏移量,偏移量老是有序的。

首领经过查看每一个跟随者请求的最新偏移量,知道跟随者复制的进度。若是跟随者 10s 内没有请求任何消息,或 在请求消息,但 10s 内没有请求最新的数据,那么它就会被认为是不一样步的。若是副本与首领不一样步,首领失效时,它就不可能成为新首领。

持续请求获得的最新消息副本被称为同步的副本。

除当前首领外,每一个分区都有一个 首选首领 - 建立主题时选定的首领。默认 kafka的 auto.leader.rebalance.enable = true 。它会检查首选首领 是否是当前首领,若是不是,而且该副本是同步的,那么就会触发首领选举,让首选首领称为当前首领。

处理请求

broker 大部分工做是处理客户端、分区副本 和控制器发送给分区首领的请求。

全部的请求消息都包含一个标准消息头:

Request type ( 即 API key)

Request version ( broker 能够处理不一样版本的客户端请求,并根据客户端版本做出不一样的响应。 )

Correlation ID 一个具备惟一性的数字,用于标识请求消息,同时也出如今响应消息和错误日志里。

Client ID 用于标识发送请求的客户端。

几种常见的请求类型

生产请求 生产者发送的请求,包含客户端要写入 broker 的消息。

包含首领副本的broker 在收到生产请求时,会对请求作一些验证。

一、发送数据的用户是否有主题写入权限

二、请求里包含的 acks 值是否有效(只容许出现 0、一、all )

三、若是 acks = all ,是否有足够多的同步副本保证消息已经安全被写入。

获取请求  在消费者和跟随者副本须要从 broker 读取消息时发送的请求。

首领收到请求时,先检查请求是否有效。

例:指定偏移量在分区上是否存在?若是客户端请求是已被删除的数据,或者请求的偏移量不存在,那么broker 将返回一个错误。如存在,broker 将按照客户端指定的数量上限从分区里读取消息,再把消息返回给客户端。

kafka 使用 零复制 技术向客户端发送消息。 kafka 直接把消息从文件里发送到网络通道,不须要通过任何中间缓冲区。避免了字节复制,也不用管理内存缓冲区,从而得到更好的性能。

 

生产请求和获取请求都必须发送给分区的首领副本。

元数据请求 :用来获取 客户端该往哪里发送请求。元数据请求包含了客户端感兴趣的主题列表。服务端的响应消息里指明了这些主题所包含的分区、每一个分区都有哪些副本,以及哪一个副本是首领。元数据请求能够发送给任意一个 broker,由于全部 broker 都缓存了这些信息。

 

其余请求

物理存储

kafka 基本存储单元是分区。分区没法在多个 broker 间进行再细分,也没法在同一个broker的多个磁盘上再进行细分。

分区大小受到单个挂载点可用空间的限制。

分区分配  建立主题时,kafka 会决定如何在 broker 间分配分区。目标 -

broker 间平均地分布分区副本。

确保每一个分区的每一个副本分布在不一样的broker 上。

若是为 broker 指定了机架信息,那么尽量把每一个分区的副本分配到不一样机架的 broker 上。目的是 为了保证一个机架的不可用不会致使总体的分区不可用。

注意:磁盘空间。在为 broker 分配分区时并无考虑可用空间和工做负载问题,但在将分区分配到磁盘上时会考虑分区数量,不过不考虑分区大小。

文件管理:保留数据 kafka的一个基本特性。把分区分红若干个片断。默认状况下,每一个片断包含 1GB 或一周的数据,以较小的那个为准。在broker 往分区写入数据时,若是达到片断上限,就关闭当前文件,并打开一个新文件。

当前正在写入的文件 活跃片断。该片断永远不会被删除。

文件格式 咱们把 kafka 的消息和偏移量保存在文件里。保存在磁盘上的数据格式与从生产者发送过来或者发送给消费者的消息格式是同样的。

除了键、值、偏移量。消息里还包含了消息大小、校验、消息格式版本号、压缩算法、时间戳。

若是生产者发送的是压缩过的消息,那么同一批次的消息会被压缩在一块儿,被当作 “包装消息” 进行发送。

索引  消费者能够从kafka 的任意可用偏移量位置开始读取消息。为了帮助 broker 更快地定位到指定的偏移量,kafka 为每一个分区维护了一个索引。索引把偏移量映射到片断文件和偏移量在文件里的位置。

清理 :通常状况下,kafka 会根据设置的时间保留数据,把超过期效的旧数据删除掉。

清理的工做原理 :每一个日志片断能够分为两个部分 -

干净的部分 - 消息以前被清理过,每一个键只有一个对应的值,这个值是上一次清理时保留下来的。

污浊的部分 - 消息是在上一次清理以后写入的。

被删除的事件  为了完全把一个键从系统里删除,应用程序必须发送一个包含该键且值为 null的消息。清理线程发现该消息,会先进性常规清理,只保留值为null的消息。该消息(被称为 墓碑消息)会被保留一段时间,时间长短可配置。消费者往数据库里复制 kafka 的数据时,看到墓碑消息,就知道应该要把相关用户信息从数据库里删除。

什么时候会清理出题  delete 策略不会删除当前活跃的片断同样,compact 策略也不会对当前片断进行清理。只有旧片断里的消息才会被清理。

 

 

 

第十一章:流式处理

什么是流式处理

数据流 是无边界数据集的抽象表示。无边界意味着无限和持续增加。

例:信用卡交易、股票交易、包裹递送 等等

除没边界外,事件流模型的其余属性:

事件流是有序的

不可变的数据记录

事件流是可重播的

流式处理 指实时地处理一个或多个事件流。

对比:

请求 响应:延迟最小的一种范式,响应时间处于亚毫秒到毫秒之间,响应时间稳定。这种处理模式通常是阻塞的。 在数据库领域,这种范式是线上交易处理(OLTP)

批处理:高延迟和高吞吐量。处理系统按照设定时间启动处理进程。在数据库领域,是 数据仓库(DWH) 或商业智能(BI) 系统

流处理:介于上二者之间的范式。

的定义不依赖任何一个特定的框架、API 或 特性。只要持续地从一个无边界的数据集读取数据,而后对它们进行处理并生成结果,那就是在进行流式处理。重点: 整个处理过程必须是持续的。

 

流式处理的一些概念

时间 最重要概念。通常包含的几个时间概念:

事件时间: 所追踪事件的发生时间和记录的建立时间。

日志追加时间: 时间保存到 broker 的时间

处理时间: 应用程序在收到事件以后要对其进行处理的时间。

注意: 时区问题。整个数据管道应该使用同一个时区。

状态: 事件 事件 之间的信息被称为 “状态”。包含的几种类型的状态:

本地状态或内部状态: 只能被单个应用程序实例访问。优势是 速度快,缺点是 受内存大小的限制。流式处理的不少设计模式都将数据拆分到多个子流,这样就可使用有线的本地状态来处理它们。

外部状态: 使用外部的数据存储来维护,通常使用 Nosql 系统。优点是 没有大小的限制,缺点是 引入额外的系统会形成更大的延迟和复杂性。

流和表的二元性

是一系列事件,每一个事件就是一个变动。

是记录的集合,包含了当前的状态,是多个变动所产生的结果。

转化为 表,须要 ”应用“ 流里所包含的全部变动,这也叫作流的 ”物化“

 

时间窗口

大部分针对流的操做都是基于时间窗口的。

窗口的大小

窗口移动的频率 :若是 ‘移动间隔’ 与窗口大小相等 为 “滚动窗口”; 若是 窗口随每条记录移动, 为 滑动窗口。

窗口的可更新时间多长。

 

流式处理的设计模式

单个事件处理:最基本模式。 可使用一个生产者 一个消费者来实现。

使用本地状态:流式处理使用本地状态须要解决 内存使用、持久化、再均衡。

使用外部查找 - 流和表的链接。

流的链接 。

乱序的事件 识别乱序的事件,规定一个时间段用于重排乱序的事件,具备在必定时间段内重排乱序事件的能力。具有更新结果的能力。

从新处理 。

相关文章
相关标签/搜索