一、如何获取 topic 主题的列表
bin/kafka-topics.sh --list --zookeeper localhost:2181
二、生产者和消费者的命令行是什么?
生产者在主题上发布消息:
bin/kafka-console-producer.sh --broker-list 192.168.43.49:9092 --topicHello-Kafka
注意这里的 IP 是 server.properties 中的 listeners 的配置。接下来每一个新行就是输入一条新消息
消费者接受消息:
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topicHello-Kafka --from-beginning
三、consumer 是推仍是拉?
Kafka 最初考虑的问题是,customer 应该从 brokes 拉取消息仍是 brokers 将消息推送到 consumer,也就是 pull 还 push。在这方面,Kafka 遵循了一种大部分消息系统共同的传统的设计:producer 将消息推送到 broker,consumer 从broker 拉取消息。
一些消息系统好比 Scribe 和 Apache Flume 采用了 push 模式,将消息推送到下游的 consumer。这样作有好处也有坏处:由 broker 决定消息推送的速率,对于不一样消费速率的 consumer 就不太好处理了。消息系统都致力于让 consumer 以最大的速率最快速的消费消息,但不幸的是,push 模式下,当 broker 推送的速率远大于 consumer 消费的速率时,consumer 恐怕就要崩溃了。最终 Kafka 仍是选取了传统的 pull 模式。
Pull 模式的另一个好处是 consumer 能够自主决定是否批量的从 broker 拉取数据。Push 模式必须在不知道下游 consumer 消费能力和消费策略的状况下决定是当即推送每条消息仍是缓存以后批量推送。若是为了不 consumer 崩溃而采用较低的推送速率,将可能致使一次只推送较少的消息而形成浪费。Pull 模式下,consumer 就能够根据本身的消费能力去决定这些策略。
Pull 有个缺点是,若是 broker 没有可供消费的消息,将致使 consumer 不断在循环中轮询,直到新消息到 t 达。为了不这点,Kafka 有个参数可让 consumer阻塞知道新消息到达(固然也能够阻塞知道消息的数量达到某个特定的量这样就能够批量发送)。
四、讲讲 kafka 维护消费状态跟踪的方法
大部分消息系统在 broker 端的维护消息被消费的记录:一个消息被分发到consumer 后 broker 就立刻进行标记或者等待 customer 的通知后进行标记。这样也能够在消息在消费后立马就删除以减小空间占用。
可是这样会不会有什么问题呢?若是一条消息发送出去以后就当即被标记为消费过的,一旦 consumer 处理消息时失败了(好比程序崩溃)消息就丢失了。为了解决这个问题,不少消息系统提供了另一个个功能:当消息被发送出去以后仅仅被标记为已发送状态,当接到 consumer 已经消费成功的通知后才标记为已被消费的状态。这虽然解决了消息丢失的问题,但产生了新问题,首先若是 consumer处理消息成功了可是向 broker 发送响应时失败了,这条消息将被消费两次。第二个问题时,broker 必须维护每条消息的状态,而且每次都要先锁住消息而后更改状态而后释放锁。这样麻烦又来了,且不说要维护大量的状态数据,好比若是消息发送出去但没有收到消费成功的通知,这条消息将一直处于被锁定的状态,Kafka 采用了不一样的策略。Topic 被分红了若干分区,每一个分区在同一时间只被一个 consumer 消费。这意味着每一个分区被消费的消息在日志中的位置仅仅是一个简单的整数:offset。这样就很容易标记每一个分区消费状态就很容易了,仅仅须要一个整数而已。这样消费状态的跟踪就很简单了。
这带来了另一个好处:consumer 能够把 offset 调成一个较老的值,去从新消费老的消息。这对传统的消息系统来讲看起来有些难以想象,但确实是很是有用的,谁规定了一条消息只能被消费一次呢?
五、讲一下主从同步
Kafka容许topic的分区拥有若干副本,这个数量是能够配置的,你能够为每一个topci配置副本的数量。Kafka会自动在每一个个副本上备份数据,因此当一个节点down掉时数据依然是可用的。
mysql
Kafka的副本功能不是必须的,你能够配置只有一个副本,这样其实就至关于只有一份数据。
程序员
建立副本的单位是topic的分区,每一个分区都有一个leader和零或多个followers.全部的读写操做都由leader处理,通常分区的数量都比broker的数量多的多,各分区的leader均匀的分布在brokers中。全部的followers都复制leader的日志,日志中的消息和顺序都和leader中的一致。flowers向普通的consumer那样从leader那里拉取消息并保存在本身的日志文件中。 许多分布式的消息系统自动的处理失败的请求,它们对一个节点是否 着(alive)”有着清晰的定义。Kafka判断一个节点是否活着有两个条件:
面试
- 节点必须能够维护和ZooKeeper的链接,Zookeeper经过心跳机制检查每一个节点的链接。
- 若是节点是个follower,他必须能及时的同步leader的写操做,延时不能过久。
符合以上条件的节点准确的说应该是“同步中的(in sync)”,而不是模糊的说是“活着的”或是“失败的”。Leader会追踪全部“同步中”的节点,一旦一个down掉了,或是卡住了,或是延时过久,leader就会把它移除。至于延时多久算是“过久”,是由参数replica.lag.max.messages决定的,怎样算是卡住了,怎是由参数replica.lag.time.max.ms决定的。
spring
只有当消息被全部的副本加入到日志中时,才算是“committed”,只有committed的消息才会发送给consumer,这样就不用担忧一旦leader down掉了消息会丢失。Producer也能够选择是否等待消息被提交的通知,这个是由参数request.required.acks决定的。sql
Kafka保证只要有一个“同步中”的节点,“committed”的消息就不会丢失。
数据库
Leader的选择
Kafka的核心是日志文件,日志文件在集群中的同步是分布式数据系统最基础的要素。
缓存
若是leaders永远不会down的话咱们就不须要followers了!一旦leader down掉了,须要在followers中选择一个新的leader.可是followers自己有可能延时过久或者crash,因此必须选择高质量的follower做为leader.必须保证,一旦一个消息被提交了,可是leader down掉了,新选出的leader必须能够提供这条消息。大部分的分布式系统采用了多数投票法则选择新的leader,对于多数投票法则,就是根据全部副本节点的情况动态的选择最适合的做为leader.Kafka并非使用这种方法。
安全
Kafaka动态维护了一个同步状态的副本的集合(a set of in-sync replicas),简称ISR,在这个集合中的节点都是和leader保持高度一致的,任何一条消息必须被这个集合中的每一个节点读取并追加到日志中了,才回通知外部这个消息已经被提交了。所以这个集合中的任何一个节点随时均可以被选为leader.ISR在ZooKeeper中维护。ISR中有f+1个节点,就能够容许在f个节点down掉的状况下不会丢失消息并正常提供服。ISR的成员是动态的,若是一个节点被淘汰了,当它从新达到“同步中”的状态时,他能够从新加入ISR.这种leader的选择方式是很是快速的,适合kafka的应用场景。
bash
一个邪恶的想法:若是全部节点都down掉了怎么办?Kafka对于数据不会丢失的保证,是基于至少一个节点是存活的,一旦全部节点都down了,这个就不能保证了。
服务器
实际应用中,当全部的副本都down掉时,必须及时做出反应。能够有如下两种选择:
- 等待ISR中的任何一个节点恢复并担任leader。
- 选择全部节点中(不仅是ISR)第一个恢复的节点做为leader.
这是一个在可用性和连续性之间的权衡。若是等待ISR中的节点恢复,一旦ISR中的节点起不起来或者数据都是了,那集群就永远恢复不了了。若是等待ISR意外的节点恢复,这个节点的数据就会被做为线上数据,有可能和真实的数据有所出入,由于有些数据它可能还没同步到。Kafka目前选择了第二种策略,在将来的版本中将使这个策略的选择可配置,能够根据场景灵活的选择。
这种窘境不仅Kafka会遇到,几乎全部的分布式数据系统都会遇到。
副本管理
以上仅仅以一个topic一个分区为例子进行了讨论,但实际上一个Kafka将会管理成千上万的topic分区.Kafka尽可能的使全部分区均匀的分布到集群全部的节点上而不是集中在某些节点上,另外主从关系也尽可能均衡这样每一个几点都会担任必定比例的分区的leader.
优化leader的选择过程也是很重要的,它决定了系统发生故障时的空窗期有多久。Kafka选择一个节点做为“controller”,当发现有节点down掉的时候它负责在游泳分区的全部节点中选择新的leader,这使得Kafka能够批量的高效的管理全部分区节点的主从关系。若是controller down掉了,活着的节点中的一个会备切换为新的controller.
六、为何须要消息系统,mysql 不能知足需求吗?
1.解耦:
容许你独立的扩展或修改两边的处理过程,只要确保它们遵照一样的接口约束。
2.冗余:
消息队列把数据进行持久化直到它们已经被彻底处理,经过这一方式规避了数据丢失风险。许多消息队列所采用的”插入-获取-删除”范式中,在把一个消息从队列中删除以前,须要你的处理系统明确的指出该消息已经被处理完毕,从而确保你的数据被安全的保存直到你使用完毕。
3.扩展性:
由于消息队列解耦了你的处理过程,因此增大消息入队和处理的频率是很容易的,只要另外增长处理过程便可。
4.灵活性 & 峰值处理能力:
在访问量剧增的状况下,应用仍然须要继续发挥做用,可是这样的突发流量并不常见。若是为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列可以使关键组件顶住突发的访问压力,而不会由于突发的超负荷的请求而彻底崩溃。
5.可恢复性:
系统的一部分组件失效时,不会影响到整个系统。消息队列下降了进程间的耦合度,因此即便一个处理消息的进程挂掉,加入队列中的消息仍然能够在系统恢复后被处理。
6.顺序保证:
在大多使用场景下,数据处理的顺序都很重要。大部分消息队列原本就是排序的,而且能保证数据会按照特定的顺序来处理。(Kafka 保证一个 Partition 内的消息的有序性)
7.缓冲:
有助于控制和优化数据流通过系统的速度,解决生产消息和消费消息的处理速度不一致的状况。
8.异步通讯:
不少时候,用户不想也不须要当即处理消息。消息队列提供了异步处理机制,容许用户把一个消息放入队列,但并不当即处理它。想向队列中放入多少消息就放多少,而后在须要的时候再去处理它们。
七、Zookeeper 对于 Kafka 的做用是什么?
Zookeeper 是一个开放源码的、高性能的协调服务,它用于 Kafka 的分布式应用。
Zookeeper 主要用于在集群中不一样节点之间进行通讯
在 Kafka 中,它被用于提交偏移量,所以若是节点在任何状况下都失败了,它都
能够从以前提交的偏移量中获取
除此以外,它还执行其余活动,如: leader 检测、分布式同步、配置管理、识别新
节点什么时候离开或链接、集群、节点实时状态等等。
八、数据传输的事务定义有哪三种?
和 MQTT 的事务定义同样都是 3 种。
(1)最多一次: 消息不会被重复发送,最多被传输一次,但也有可能一次不传输
(2)最少一次: 消息不会被漏发送,最少被传输一次,但也有可能被重复传输.
(3)精确的一次(Exactly once): 不会漏传输也不会重复传输,每一个消息都传输被一次并且仅仅被传输一次,这是你们所指望的
九、Kafka 判断一个节点是否还活着有那两个条件?
(1)节点必须能够维护和 ZooKeeper 的链接,Zookeeper 经过心跳机制检查每一个节点的链接
(2)若是节点是个 follower,他必须能及时的同步 leader 的写操做,延时不能过久
十、Kafka 与传统 MQ 消息系统之间有三个关键区别
(1).Kafka 持久化日志,这些日志能够被重复读取和无限期保留
(2).Kafka 是一个分布式系统:它以集群的方式运行,能够灵活伸缩,在内部经过复制数据提高容错能力和高可用性
(3).Kafka 支持实时的流式处理
十一、讲一讲 kafka 的 ack 的三种机制
request.required.acks 有三个值 0 1 -1(all)
0:生产者不会等待 broker 的 ack,这个延迟最低可是存储的保证最弱当 server 挂掉的时候就会丢数据。
1:服务端会等待 ack 值 leader 副本确认接收到消息后发送 ack 可是若是 leader挂掉后他不确保是否复制完成新 leader 也会致使数据丢失。
-1(all):服务端会等全部的 follower 的副本受到数据后才会受到 leader 发出的ack,这样数据不会丢失
十二、消费者如何不自动提交偏移量,由应用提交?
将 auto.commit.offset 设为 false,而后在处理一批消息后 commitSync() 或者异步提交 commitAsync()
即:
ConsumerRecords<> records = consumer.poll();
for (ConsumerRecord<> record : records){
。。。
tyr{consumer.commitSync()
}
。。。
}复制代码
1三、消费者故障,出现活锁问题如何解决?
出现“活锁”的状况,是它持续的发送心跳,可是没有处理。为了预防消费者在这种状况下一直持有分区,咱们使用 max.poll.interval.ms 活跃检测机制。 在此基础上,若是你调用的 poll 的频率大于最大间隔,则客户端将主动地离开组,以便其余消费者接管该分区。 发生这种状况时,你会看到 offset 提交失败(调用commitSync()引起的 CommitFailedException)。这是一种安全机制,保障只有活动成员可以提交 offset。因此要留在组中,你必须持续调用 poll。
消费者提供两个配置设置来控制 poll 循环:
max.poll.interval.ms:增大 poll 的间隔,能够为消费者提供更多的时间去处理返回的消息(调用 poll(long)返回的消息,一般返回的消息都是一批)。缺点是此值越大将会延迟组从新平衡。
max.poll.records:此设置限制每次调用 poll 返回的消息数,这样能够更容易的预测每次 poll 间隔要处理的最大值。经过调整此值,能够减小 poll 间隔,减小从新平衡分组的
对于消息处理时间不可预测地的状况,这些选项是不够的。 处理这种状况的推荐方法是将消息处理移到另外一个线程中,让消费者继续调用 poll。 可是必须注意确保已提交的 offset 不超过实际位置。另外,你必须禁用自动提交,并只有在线程完成处理后才为记录手动提交偏移量(取决于你)。 还要注意,你须要 pause 暂停分区,不会从 poll 接收到新消息,让线程处理完以前返回的消息(若是你的处理能力比拉取消息的慢,那建立新线程将致使你机器内存溢出)。
1四、如何控制消费的位置
kafka 使用 seek(TopicPartition, long)指定新的消费位置。用于查找服务器保留的最先和最新的 offset 的特殊的方法也可用(seekToBeginning(Collection) 和seekToEnd(Collection))
1五、kafka 分布式(不是单机)的状况下,如何保证消息的顺序消费?
Kafka 分布式的单位是 partition,同一个 partition 用一个 write ahead log 组织,因此能够保证 FIFO 的顺序。不一样 partition 之间不能保证顺序。可是绝大多数用户均可以经过 message key 来定义,由于同一个 key 的 message 能够保证只发送到同一个 partition。
Kafka 中发送 1 条消息的时候,能够指定(topic, partition, key) 3 个参数。
partiton 和 key 是可选的。若是你指定了 partition,那就是全部消息发往同 1个 partition,就是有序的。而且在消费端,Kafka 保证,1 个 partition 只能被1 个 consumer 消费。或者你指定 key(好比 order id),具备同 1 个 key 的全部消息,会发往同 1 个 partition。
1六、kafka 的高可用机制是什么?
这个问题比较系统,回答出 kafka 的系统特色,leader 和 follower 的关系,消息读写的顺序便可。
1七、kafka 如何减小数据丢失
Kafka到底会不会丢数据(data loss)? 一般不会,但有些状况下的确有可能会发生。下面的参数配置及Best practice列表能够较好地保证数据的持久性(固然是trade-off,牺牲了吞吐量)。笔者会在该列表以后对列表中的每一项进行讨论,有兴趣的同窗能够看下后面的分析。
- block.on.buffer.full = true
- acks = all
- retries = MAX_VALUE
- max.in.flight.requests.per.connection = 1
- 使用KafkaProducer.send(record, callback)
- callback逻辑中显式关闭producer:close(0)
- unclean.leader.election.enable=false
- replication.factor = 3
- min.insync.replicas = 2
- replication.factor > min.insync.replicas
- enable.auto.commit=false
- 消息处理完成以后再提交位移
给出列表以后,咱们从两个方面来探讨一下数据为何会丢失:
1. Producer端
目前比较新版本的Kafka正式替换了Scala版本的old producer,使用了由Java重写的producer。新版本的producer采用异步发送机制。KafkaProducer.send(ProducerRecord)方法仅仅是把这条消息放入一个缓存中(即RecordAccumulator,本质上使用了队列来缓存记录),同时后台的IO线程会不断扫描该缓存区,将知足条件的消息封装到某个batch中而后发送出去。显然,这个过程当中就有一个数据丢失的窗口:若IO线程发送以前client端挂掉了,累积在accumulator中的数据的确有可能会丢失。
Producer的另外一个问题是消息的乱序问题。假设客户端代码依次执行下面的语句将两条消息发到相同的分区
producer.send(record1);
producer.send(record2);复制代码
若是此时因为某些缘由(好比瞬时的网络抖动)致使record1没有成功发送,同时Kafka又配置了重试机制和max.in.flight.requests.per.connection大于1(默认值是5,原本就是大于1的),那么重试record1成功后,record1在分区中就在record2以后,从而形成消息的乱序。不少某些要求强顺序保证的场景是不容许出现这种状况的。
鉴于producer的这两个问题,咱们应该如何规避呢??对于消息丢失的问题,很容易想到的一个方案就是:既然异步发送有可能丢失数据, 我改为同步发送总能够吧?好比这样:
producer.send(record).get();复制代码
这样固然是能够的,可是性能会不好,不建议这样使用。所以特地总结了一份配置列表。我的认为该配置清单应该可以比较好地规避producer端数据丢失状况的发生:(特此说明一下,软件配置的不少决策都是trade-off,下面的配置也不例外:应用了这些配置,你可能会发现你的producer/consumer 吞吐量会降低,这是正常的,由于你换取了更高的数据安全性)
- block.on.buffer.full = true 尽管该参数在0.9.0.0已经被标记为“deprecated”,但鉴于它的含义很是直观,因此这里仍是显式设置它为true,使得producer将一直等待缓冲区直至其变为可用。不然若是producer生产速度过快耗尽了缓冲区,producer将抛出异常
- acks=all 很好理解,全部follower都响应了才认为消息提交成功,即"committed"
- retries = MAX 无限重试,直到你意识到出现了问题:)
- max.in.flight.requests.per.connection = 1 限制客户端在单个链接上可以发送的未响应请求的个数。设置此值是1表示kafka broker在响应请求以前client不能再向同一个broker发送请求。注意:设置此参数是为了不消息乱序
- 使用KafkaProducer.send(record, callback)而不是send(record)方法 自定义回调逻辑处理消息发送失败
- callback逻辑中最好显式关闭producer:close(0) 注意:设置此参数是为了不消息乱序
- unclean.leader.election.enable=false 关闭unclean leader选举,即不容许非ISR中的副本被选举为leader,以免数据丢失
- replication.factor >= 3 这个彻底是我的建议了,参考了Hadoop及业界通用的三备份原则
- min.insync.replicas > 1 消息至少要被写入到这么多副本才算成功,也是提高数据持久性的一个参数。与acks配合使用
- 保证replication.factor > min.insync.replicas 若是二者相等,当一个副本挂掉了分区也就无法正常工做了。一般设置replication.factor = min.insync.replicas + 1便可
2. Consumer端
consumer端丢失消息的情形比较简单:若是在消息处理完成前就提交了offset,那么就有可能形成数据的丢失。因为Kafka consumer默认是自动提交位移的,因此在后台提交位移前必定要保证消息被正常处理了,所以不建议采用很重的处理逻辑,若是处理耗时很长,则建议把逻辑放到另外一个线程中去作。为了不数据丢失,现给出两点建议:
- enable.auto.commit=false 关闭自动提交位移
- 在消息被完整处理以后再手动提交位移
1八、kafka 如何不消费重复数据?好比扣款,咱们不能重复的扣。
其实仍是得结合业务来思考,我这里给几个思路:
好比你拿个数据要写库,你先根据主键查一下,若是这数据都有了,你就别插入了,update 一下好吧。
好比你是写 Redis,那没问题了,反正每次都是 set,自然幂等性。
好比你不是上面两个场景,那作的稍微复杂一点,你须要让生产者发送每条数据的时候,里面加一个全局惟一的 id,相似订单 id 之类的东西,而后你这里消费到了以后,先根据这个 id 去好比 Redis 里查一下,以前消费过吗?若是没有消费过,你就处理,而后这个 id 写 Redis。若是消费过了,那你就别处理了,保证别重复处理相同的消息便可。
好比基于数据库的惟一键来保证重复数据不会重复插入多条。由于有惟一键约束了,重复数据插入只会报错,不会致使数据库中出现脏数据。