(持续更新中~~~)kafka--消息引擎与分布式流处理平台

kafka概述

kafka是一个分布式的基于发布/订阅模式的消息队列(message queue),通常更愿意称kafka是一款开源的消息引擎系统,只不过消息队列会耳熟一些。kafka主要应用于大数据实时领域。java

为何会有消息队列,主要是为了异步处理,提升效率。咱们来看一张图node

使用消息队列,能够把耗时任务扔到队列里面,异步调用,从而提高效率。也就是咱们所说的解耦。python

然而除了解耦,还有没有其余做用呢?答案显然是有的,用一个专业点的名词解释的话,就是削峰填谷。mysql

削峰填谷,真的是很是形象的四个字。所谓的削峰填谷,就是指缓冲上下游瞬时突发流量,使其更平滑。特别是那种发送能力很强的上游系统,若是没有消息引擎的保护,脆弱的下游系统可能会直接被压垮致使全链路服务雪崩。可是,一旦有了消息引擎,它可以有效的对抗上游的流量冲击,真正作到将上游的"峰"填到"谷"中,避免了流量的震荡。消息引擎系统的另外一大好处就是咱们刚才说的,在于发送方和接收方的松耦合,这也在必定程度上简化了应用的开发,减小了系统间没必要要的交互linux

直接解释的话,可能没有直观的感觉,咱们来举一个实际的例子。好比在京东购买商品。当点击购买的时候,会调用订单系统生成对应的订单,然而要处理该订单则会依次调用下游系统的多个子服务,好比调用银行等支付接口、查询你的登陆信息、验证商品信息等等。显然上游的订单操做比较简单,它的TPS要远高于处理订单的下游服务。所以若是上游和下游直接对接,势必会出现下游服务没法及时处理上游订单从而形成订单堆积的状况。特别是当出现双11、双12、相似秒杀这种业务的时候,上游订单流量会瞬间增长,可能出现的结果就是直接压垮下游子系统服务。解决此问题的一个常见的作法就是对上游系统进行限速,可是这种作法显然是不合理的,毕竟问题不是出如今它那里。何况你要是限速了,别人家网站双十一成交一千万笔单子,自家网站才成交一百万笔单子,这样钱送到嘴边都赚不到。因此更常见的办法就是引入像kafka这样的消息引擎系统来对抗这种上下游系统的TPS不一致以及瞬时峰值流量。程序员

仍是这个例子,当引入了kafka以后,上游系统再也不直接与下游系统进行交互。当新订单生成以后它仅仅是向kafka broker发送一条消息便可。相似的,下游的各个子服务订阅kafka中的对应主题,并实时从该主题的各自分区(partition)中获取到订单消息进行处理,从而实现上游订单服务和下游订单处理服务的解耦。这样当出现秒杀业务的时候,kafka可以将瞬时增长的订单流量所有以消息的形式保存在对应的主题中。既不影响上游服务的TPS,同时也给下游服务流出了足够的时间去消费它们。这就是kafka这类消息引擎存在的最大意义所在。算法

目前里面出现了不少的专业词汇,broker、主题、partition等等,这些咱们后面都会介绍。sql

kafka消费模式

咱们知道消息队列传输的是消息,那么这个消息如何传递也是很重要的一环。通常消息队列支持两种传递模式。数据库

  • 点对点模式:编程

    生产者将生产的消息发送到queue中,而后消费者再从queue中取出消息进行消费。消息一旦被消费,那么queue中再也不有存储,因此消费者不可能消费到已经被消费的信息。queue支持多个消费者同时消费,可是一个消息只能被一个消费者消费,不存在说多个消费者同时消费一个消息。平常生活中就好电话客服服务,同一个客户呼入电话,只能被一位客服人员处理,第二个客服人员不能为该客户服务

  • 发布订阅模式

    和点对点模型不一样,它有一个主题(Topic)的概念。该模型也有发送方和接收方,只不过叫法不同。发送方也被成为发布者(publisher),接收方成为订阅者(subscriber)。和点对点模型不同,这个模型能够存在多个发布者和多个订阅者,它们都能接收到相同主题的消息。比如微信公众号,一个公众号能够有多个订阅者,一个订阅者也能够订阅多个公众号。

搞定kafka的专业术语

在kafka的世界中有不少概念和术语是须要咱们提早理解而且熟练掌握的,下面来盘点一下。

以前咱们提到过,kafka属于分布式的消息引擎系统,主要功能是提供一套完善的消息发布与订阅方案。在kafka中,发布订阅的对象是主题(topic),能够为每一个业务、每一个应用、甚至是每一类数据都建立专属的主题

向主题发布消息的客户端应用程序成为生产者(producer),生产者一般持续不断地向一个或多个主题发送消息,而订阅这些主题获取消息的客户端应用程序就被称之为消费者(consumer)。和生产者相似,消费者也能同时订阅多个主题。咱们把生产者和消费者统称为客户端(clients)。你能够同时运行多个生产者和消费者实例,这些实例不断地向kafka集群中的多个主题生产和消费消息。有客户端天然也就有服务端。kafka的服务器端由被称为broker的服务进程构成,即一个kafka集群由多个broker组成,broker负责接收和处理客户端发来的请求,以及对消息进行持久化。虽然多个broker进程可以运行在同一台机器上,但更常见的作法是将不一样的broker分散运行在不一样的机器上。这样即使集群中的某一台机器宕机,运行在其之上的broker进程挂掉了其余机器上的broker也依旧能对外提供服务。这其实就是kafka提供高可用的手段之一

在实现高可用的另外一个手段就是备份机制(replication)。备份的思想很简单,就是把相同的数据拷贝到多台机器上,而这些相同的数据拷贝就叫作副本(replica)。副本的数量是能够配置的,这些副本保存着相同的数据,但却有不一样的角色和做用。kafka定义了两种副本,领导者副本(leader replica)和追随者副本(follower replica)。前者对外提供服务,这里的对外指的是与客户端进行交互;然后者只是被动地追随领导者副本而已,不与外界进行交互。固然了,不少其余系统中追随者副本是能够对外提供服务的,好比mysql,从库是能够处理读操做的,也就是所谓的"主写从读",可是在kafka中追随者副本不会对外提供服务,至于为何咱们做为思考题解答。对了,关于领导者--追随者,以前实际上是叫作主(master)--从(slave),可是不建议使用了,由于slave有奴隶的意思,政治上有点不合适,因此目前大部分的系统都改为leader-follower了。

副本的工做机制很简单:生产者向主题写的消息老是往领导者那里,消费者向主题获取消息也都是来自于领导者。也就是不管是读仍是写,针对的都是领导者副本,至于追随者副本,它只作一件事情,那就是向领导者副本发送请求,请求领导者副本把最新生产的消息发送给它,这样便可以保持和领导者的同步。

虽然有了副本机制能够保证数据的持久化或者数据不丢失,但没有解决伸缩性的问题。伸缩性即所谓的scalability,是分布式系统中很是重要且必须谨慎对待的问题。什么事伸缩性呢?咱们拿副原本说,虽然如今有了领导者副本和追随者副本,但假若领导者副本积累了太多的数据以致于单台broker都没法容纳了,此时应该怎么办?有个很天然的想法就是,可否把数据分割成多分保存在不一样的broker上?没错,kafka就是这么设计的。

这种机制就是所谓的分区(partition)。若是了解其余的分布式系统,那么可能据说过度片、分区域等提法,好比MongoDB和ElasticSearch中的sharding、Hbase中的region,其实它们都是相同的原理,只是partition是最标准的名称。

kafka中的分区机制指定的是将每一个主题划分为多个分区,每一个分区都是一组有序的消息日志。生产者生产的每一条消息只会被发到一个分区中,也就说若是向有两个分区的主题发送一条消息,那么这条消息要么在第一个分区中,要么在第二条分区中。而kafka的分区编号是从0开始的,若是某个topic有100个分区,那么它们的分区编号就是从0到99

到这里可能会有疑问,那就是刚才提到的副本如何与这里的分区联系在一块儿呢?实际上,副本是在分区这个层级定义的。每一个分区下能够配置若干个副本,其中只能有1个领导者副本和N-1个追随者副本。生产者向分区写入消息,每条消息在分区中的位置由一个叫位移(offset)的数据来表征。分区位移老是从0开始,假设一个生产者向一个空分区写入了10条消息,那么这10条消息的位移依次是0、一、二、...、9

至此咱们能完整地串联起kafka的三层消息架构

  • 第一层是主题层,每一个主题能够配置M个分区,每一个分区又能够配置N个副本
  • 第二层是分区层,每一个分区的N个副本中只能有一个副原本充当领导者角色,对外提供服务;其余的N-1个副本只是追随者副本,用来提供数据冗余之用。
  • 第三层是消息层,分区中包含若干条消息,每条消息的位移从0开始,依次递增。
  • 最后客户端程序只能与分区的领导者副本进行交互

那么kafka是如何持久化数据的呢?总的来讲,kafka使用消息日志(log)来保存数据,一个日志就是磁盘上一个只能追加写(append-only)消息的物理文件。由于只能追加写入,故避免了缓慢的随机I/O操做,改成性能较好的顺序I/O操做,这也是实现kafka高吞吐量特性的一个重要手段。不过若是不停地向一个日志写入消息,最终也会耗尽全部的磁盘空间,所以kafka必然要按期地删除消息以回收磁盘。怎么删除?简单来讲就是经过日志段(log segment)机制。在kafka底层,一个日志又进一步细分红多个日志段,消息被追加写到当前最新的日志段中,当写满了一个日志段后,kafka会自动切分出一个新的日志段,并将老的日志段封存起来。kafka在后台还有定时任务会按期地检查老的日志段是否可以被删除,从而实现回收磁盘的目的。

这里再重点说一下消费者,以前说过有两种消息模型,即点对点模型(peer to peer, p2p)和分布订阅模型。这里面的点对点指的是同一条消息只能被下游的一个消费者消费,其余消费者不能染指。在kafka中实现这种p2p模型的方法就是引入了消费者组(consumer group)。所谓的消费者组,指的是多个消费者实例共同组成一个组来消费一个主题。这个主题中的每一个分区都只会被消费者组里面的一个消费者实例消费,其余消费者实例不能消费它。为何要引入消费者组呢?主要是为了提高消费者端的吞吐量,多个消费者实例同时消费,加速了整个消费端的吞吐量(TPS)。关于消费者组的机制,后面会详细介绍,如今只须要知道消费者组就是多个消费者组成一个组来消费主题里面的消息、而且消息只会被组里面的一个消费者消费便可。此外,这里的消费者实例能够是运行消费者应用的进程,也能够是一个线程,它们都称为一个消费者实例(consumer instance)

消费者组里面的消费者不只瓜分订阅主题的数据,并且更酷的是它们还能彼此协助。假设组内某个实例挂掉了,kafka可以自动检测,而后把这个Failed实例以前负责的分区转移给其余活着的消费者。这个过程就是大名鼎鼎的"重平衡(rebalance)"。嗯,其实便是大名鼎鼎,也是臭名昭著,由于由重平衡引起的消费者问题比比皆是。事实上,目前不少重平衡的bug,整个社区都无力解决。

每一个消费者在消费消息的过程当中,必然须要有个字段记录它当前消费到了分区的哪一个位置上,这个字段就是消费者位移(consumer offset)。注意,咱们以前说一个主题能够有多个分区、每一个分区也是用位移来表示消息的位置。可是这两个位移彻底不是一个概念,分区位移表示的是分区内的消息位置,它是不变的,一旦消息被成功写入一个分区上,那么它的位置就是固定了的。而消费者位移则不一样,它多是随时变化的,毕竟它是消费者消费进度的指示器嘛。另外每一个消费者都有着本身的消费者位移,所以必定要区分这两类位移的区别。一个是分区位移,另外一个是消费者位移

小结:

  • 生产者,producer向主题发布新消息的应用程序
  • 消费者,consumer从主题订阅新消息的应用程序
  • 消息,recordkafka是消息引擎,这里的消息就是指kafka处理的主要对象
  • 主题,topic主题是承载消息的逻辑容器,在实际使用中多用来区分具体的业务,即不一样的业务对应不一样的主题。
  • 分区,partition一个有序不变的消息序列,每一个主题下能够有多个分区。分区编号从0开始,分布在不一样的broker上面,实现发布于订阅的负载均衡。生产者将消息发送到主题下的某个分区中,以分区偏移(offset)来标识一条消息在一个分区当中的位置(惟一性)
  • 分区位移,offset表示分区中每条消息的位置信息,是一个单调递增且不变的值
  • 副本,replicakafka中同一条数据可以被拷贝到多个地方以提供数据冗余,这即是所谓的副本。副本还分为领导者副本和追随者副本,各自有各自的功能职责。读写都是针对领导者副原本的,追随者副本只是用来和领导者副本进行数据同步、保证数据冗余、实现高可用。
  • 消费者位移,consumer offset表示消费者消费进度,每一个消费者都有本身的消费者位移
  • 消费者组,consumer group多个消费者实例共同组成的一个组,同时消费多个分区以实现高吞吐。
  • 重平衡,rebalance消费者组内某个消费者实例挂掉以后,其它消费者实例自动从新分配订阅主题分区的过程。重平衡是kafka消费者端实现高可用的重要手段

思考:为何kafka不像mysql那样支持主写从读呢?

由于kafka的主题已经被分为多个分区,分布在不一样的broker上,而不一样的broker又分布在不一样的机器上,所以从某种角度来讲,kafka已经实现了负载均衡的效果。不像mysql,压力都在主上面,因此才要从读;另外,kafka保存的数据和数据库的数据有着实质性的差异,kafka保存的数据是流数据,具备消费的概念,并且须要消费者位移。因此若是支持从读,那么消费端控制offset会更复杂,并且领导者副本同步到追随者副本须要时间的,会形成数据不一致的问题;另外对于生产者来讲,kafka是能够经过配置来控制是否等待follower对消息确认的,若是支持从读的话,那么也须要全部的follower都确认了才能够回复生产者,形成性能降低,并且follower出现了问题也很差处理。

kafka只是消息队列(消息引擎系统)吗

kafka真的只是消息引擎系统吗?要搞清楚这个问题,就要从kafka的发展历史提及,纵观kafka的发展历史,它确实是消息引擎起家的,但它不只是一个消息引擎系统,同时也是一个分布式流处理平台(distributed stream processing platform)。若是这一节你只能记住一句户的话,那我但愿你能记住,kafka虽然是消息引擎起家,但它不只是一个消息引擎,仍是一个分布式流处理平台。

总所周知,kafka是LinkedIn公司内部孵化的项目,LinkedIn最开始有强烈的数据强实时处理方面的需求,其内部的诸多子系统要执行多种类型的数据处理与分析,主要包括业务系统和应用程序性能监控,以及用户行为数据处理等。当时他们碰到的主要问题包括:

  • 数据正确性不足。由于数据的收集主要采用轮询(polling)的方式,如何肯定轮询的时间间隔就变成了一个高度经验化的事情。虽然能够采用一些相似于启发式算法来帮助评估间隔时间,但一旦指定不当,必然会形成较大的数据误差。
  • 系统高度定制化,维护成本高。各个业务子系统都须要对接数据收集模块,引入了大量的定制开销和人工成本

为了解决这些问题,LinkedIn工程师尝试过使用ActiveMQ来解决这些问题,但效果并不理想。显然须要有一个"大一统"的系统来取代现有的工做方式,而这个系统就是kafka。所以kafka自诞生伊始是以消息引擎系统的面目出如今大众视野的,若是翻看比较老的kafka对应的官网的话,你会发现kafka社区将其清晰地定位为一个分布式、分区化且带备份功能的提交日志(commit log)服务。

所以,kafka在设计之初就旨在提供三个方面的特性:

  • 提供一套API实现生产者和消费者
  • 下降网络传输和磁盘存储开销
  • 实现高伸缩架构

在现现在的大数据领域,kafka在承接上下游、串联数据流管道方面发挥了重要的做用:全部的数据几乎都要从一个系统流入kafka,而后再流入下游的另外一个系统中 。这样使用方式家常便饭以致于引起了kafka社区的思考:与其我把数据从一个系统传递到下一个系统进行处理,我为什么不本身实现一套流处理框架呢?基于这个考量,kafka社区在0.10.0.0版本推出了流处理组件kafka streams,也正是从这个版本开始,kafka正式变身为分布式的流处理平台,而再也不仅仅只是消息引擎系统了。今天kafka是和storm、spark、flink同等级的实时流处理平台了。

那么做为流处理平台,kafka与其余大数据流式计算框架相比,优点在哪里呢?

  • 第一点是更容易实现端到端的正确性(correctness)。流处理要最终替代它的兄弟批处理须要具有两点核心优点:要实现正确性和提供可以推导时间的工具。实现正确性是流处理可以匹敌批处理的基石。正确性一直是批处理的强项,而实现正确性的基石则是要求框架能提供'精确一次语义处理',即处理一条消息有且只有一次机会可以影响系统状态。目前主流的大数据流处理框架都宣称实现了'精确一次语义处理',可是这是有限定条件的,即它们只能实现框架内的精确一次语义处理,没法实现端到端的。这是为何呢?由于当这些框架与外部消息引擎系统结合使用时,它们没法影响到外部系统的处理语义,因此若是你搭建了一套环境使得spark或flink从kafka读取消息以后进行有状态的数据计算,最后再写回kafka,那么你只能保证在spark或者flink内部,这条消息对于状态的影响只有一次。可是计算结果有可能屡次写入的kafka,由于它们不能控制kafka的语义处理。相反地,kafka则不是这样,由于全部的数据流转和计算都在kafka内部完成,故kafka能够实现端到端的精确一次处理。
  • 第二点是kafka本身对于流式计算的定位。官网上明确表示kafka streams是一个用于搭建实时流处理的客户端库而非是一个完整地功能系统。这就是说,你不能指望着kafka提供相似于集群调度、弹性部署等开箱即用的运维特性,你须要本身选择合适的工具或者系统来帮助kafka流处理应用实现这些功能。读到这里可能以为这怎么是有点呢?坦率的说,这是一个双刃剑的设计,也是kafka剑走偏锋不正面pk其余流计算框架的特地考量。大型公司的流处理平台必定是大规模部署的,所以具有集群调度功能以及灵活的部署方案是不可获取的要素。但毕竟世界上还存在着不少中小企业,它们的流处理数据量并不巨大,逻辑也不复杂,部署几台或者十几台机器足以应付。在这样的需求下,搭建重量级的完整性平台实在是'杀鸡用宰牛刀',而这正式kafka流处理组件的用武之地。所以从这个角度来讲,将来在流处理框架当中,kafka应该是有着一席之地的。

这里再来解释一下什么是精确一次语义处理。举个例子,若是咱们使用kafka计算某网页的pv,咱们将每次网页访问都做为一个消息发送给kafka,pv的计算就是咱们统计kafka总共接收了多少条这样的消息便可。精确一次语义处理表示每次网页访问都会产生、且只产生一条消息

处理消息引擎和流处理平台,kafka还有别的用途吗?固然有,kafka甚至可以被用做分布式存储系统,可是实际生产中,没有人会把kafka当中分布式存储系统来用的。kafka从一个优秀的消息引擎系统起家,逐渐演变成如今的分布式的流处理平台。咱们不只要熟练掌握它做为消息引擎系统的非凡特性以及使用技巧,最好还要多了解下其流处理组件的设计与案例应用。

应该选择哪一种kafka

咱们上一节谈了kafka当前的定位问题,kafka再也不是一个单纯的消息引擎系统,而是可以实现精确一次(exactly-once)语义处理的实时流平台。咱们到目前为止所说的kafka都是Apache kafka,kafka是Apache社区的一个顶级项目,若是咱们把视角从流处理平台扩展到流处理生态圈,kafka其实还有很长的路要走,毕竟是半路出家转型成流处理平台的。前面咱们提到过kafka streams组件,正是它提供了kafka实时处理流数据的能力。可是其实还有一个重要的组件没有说起,那就是kafka connect。

咱们在评估流处理平台的时候,框架自己的性能、所提供操做算子(operator)的丰富程度当然是重要的评判指标,可是框架与上下游交互的能力也是很是重要的。可以与之进行数据传输的外部系统越多,围绕它打造的生态圈就越牢固,于是也就有更多的人愿意去使用它,从而造成正向反馈,不断地促进该生态圈的发展。就kafka而言,kafka connect经过一个个具体的链接器(connector),串联起上下游的外部系统。

整个kafka生态圈以下图所示,值得注意的是,这张图的外部系统只是kafka connect组件支持的一部分而已。目前还有一个可喜的趋势是使用kafka connect组件的用户愈来愈多,相信在将来会有愈来愈多的人开发本身的链接器。

说了这么多,可能会有人好奇这跟这一节的主题有什么关系呢?其实清晰地了解kafka的发展脉络和生态圈现状,对于咱们选择合适的kafka版本大有裨益。下面咱们就进入今天的主题---如何选择kafka版本

首先你知道几种kafka

咦?kafka不是一个开源框架吗?什么叫有几种kafka,实际上,kafka的确有好几种kafka啊?实际上,kafka的确有好几种,这里我不是指它的版本,而是指存在多个组织或者公司发布的不一样kafka。就像linux的发行版,有Ubuntu、centos等等,虽然说kafka没有发行版的概念,但姑且能够这样的近似的认为市面上的确存在着多个kafka"发行版"。固然用发行版这个词只是为了这里方便解释,可是发行版这个词在kafka生态圈很是陌生,之后聊天时不要用发行版这个词。下面咱们就看看kafka都有哪些"发行版"

  • Apache kafka

    Apache kafka是最"正宗"的kafka,也应该是最熟悉的发行版了。字kafka开源伊始,它便在Apache基金会孵化并最终毕业成为顶级项目,也被称之为社区版kafka。目前我也是以这个版本的kafka进行介绍的。更重要的是,它是后面其余全部发行版的基础。也就是说,其余的发行版要么是原封不动的继承了Apache kafka,要么是在其基础之上进行了扩展、添加了新功能,总之Apache kafka是咱们学习和使用kafka的基础。

  • Confluent kafka

    我先说说Confluent公司吧。2014年,kafka的3个创始人Jay Kreps、Naha Narkhede 和饶军离开 LinkedIn创办了Confluent公司,专一于提供基于kafka的企业级流处理解决方案。2019年1月,Confluent公司成功融资1.25亿美圆,估值也到了25亿美圆,足见资本市场的青睐。这里说点题外话,饶军是咱们中国人,清华大学毕业的大神级人物。咱们已经看到愈来愈多的Apache顶级项目创始人中出现了中国人的身影。另外一个例子就是,Apache pulsar,它是一个以战胜kafka为目标的新一代消息引擎系统,在开源社区中活跃的中国人数不胜数,这种现象实在使人振奋。还说回Confluent公司,它主要从事商业化的kafka工具开发,并在此基础上发布了Confluent kafka。Confluent kafka提供了Apache kafka没有的高级特性,好比跨数据中心备份、schema注册中心以及集群监控工具等等。

  • Cloudera/Hortonworks Kafka

    Cloudera提供的CDH和Hortonworks提供的HDP是很是著名的大数据平台,里面集成了目前主流的大数据框架,可以帮助用户实现从分布式存储、集群调度、流处理到机器学习、实时数据库等全方位的数据处理。不少创业公司在搭建数据平台时首选就是这两个产品。无论是CDH仍是HDP,里面都集成了Apache kafka,所以就把这款产品中的kafka称之为CDH kafka和HDP kafka

    固然在2018年10月两家公司宣布合并,共同打造世界领先的数据平台,也许之后CDH和HDP也会合并成一款产品,但能确定的是Apache kafka依然会包含其中,并做为新数据平台的一部分对外提供服务。

特色比较

okay,说完了目前市面上的这些kafka,咱们来对比一下它们的优点和劣势

  • 1.Apache kafka

    对Apache kafka而言,它如今依旧是开发人数最多,版本迭代速度最快的kafka。在2018年度Apache基金会邮件列表中开发者数量最多的top5排行榜中,kafka社区邮件组排名第二位。若是你使用Apache kafka碰到任何问题并将问题提交到社区,社区都会比较及时的响应你。这对于咱们kafka普通使用者来讲无疑是很是友好的。

    可是Apache kafka的劣势在于它仅仅提供最最基础的主组件,特别是对于前面提到的kafka connect而言,社区版kafka只提供一种链接器,即读写磁盘文件的链接器,而没有与其余外部系统交互的链接器,在实际使用过程当中须要自行编写代码实现,这是它的一个劣势。另外Apache kafka没有提供任何监控框架或工具,而在线上环境不加监控确定是不行的,你必然须要借助第三方的监控框架来对kafka进行监控。好消息是目前有一些开源的监控框架能够帮助用于监控kafka(好比kafka manager)

    总而言之,若是仅仅须要一个消息引擎系统亦或是简单的流处理应用场景,同时须要对系统有较大把控度,那么我推荐你使用Apache kafka

  • 2.Confluent kafka

    下面来看看Confluent kafka。Confluent kafka目前分为免费版和企业版两种。前者和Apache kafka很是相像,除了常规的组件以外,免费版还包含schema注册中心和rest proxy两大功能。前者是帮助你集中管理kafka消息格式以实现数据向前/向后兼容;后者用开放的HTTP接口的方式容许你经过网络访问kafka的各类功能,这两个都是Apache kafka所没有的。除此以外,免费版还包含了更多的链接器,它们都是Confluent公司开发并认证过的,你能够无偿使用它们。至于企业版,它提供的功能就更多了,最有用的当属跨数据中心备份和集群监控两大功能了。多个数据中心之间数据的同步以及对集群的监控从来是kafka的痛点,Confluent kafka企业版提供了强大的解决方案来帮助你干掉它们。不过Confluent kafka没有发展国内业务的计划,相关资料以及技术支持都很欠缺,不少国内的使用者都没法找到对应的中文文档,所以目前Confluent kafka在国内的普及率是比较低的。

    一言以蔽之,若是你须要使用kafka的一些高级特性,那么推荐你使用Confluent kafka。

  • 3.CDH/HDP kafka

    最后说说大数据云公司发布的kafka,这些大数据平台自然继承了Apache kafka,经过便捷化的界面操做将kafka的安装、运维、管理、监控所有统一在控制台中。若是你是这些平台的用户必定以为很是方便,由于全部的操做均可以在前段UI界面上完成,而没必要执行复杂的kafka命令。另外这些平台的监控界面也很是友好,你一般不须要进行任何配置就能有效的监控kafka。

    可是凡事有利就有弊,这样作的结果就是直接下降了你对kafka集群的掌握程度。毕竟你对下层的kafka集群一无所知,你怎么可以作到心中有数呢?这种kafka的另外一个弊端在于它的滞后性,因为它有本身的发布周期,所以是否能及时地包含最新版本的kafka就成为了一个问题。好比CDH6.1.0版本发布时Apache kafka已经演进到了2.1.0版本,但CDH中的kafka仍然是2.0.0版本,显然那些在kafka2.1.0中修复的bug只能等到CDH下次版本更新时才有可能被真正修复。

    简单来讲,若是你须要快速的搭建消息引擎系统,或者你须要搭建的是多框架构成的数据平台且kafka只是其中的一个组件,那么建议使用这些大数据云公司提供的kafka。

小结

总结一下,咱们今天讨论了不一样的kafka"发行版"以及它们的优缺点,根据这些优缺点,咱们能够有针对性地根据实际需求选择合适的kafka。最后咱们回顾一下今天的内容:

  • Apache kafka,也成社区办kafka。优点在于迭代速度快,社区响应度高,使用它可让你有更高的把控度;缺陷在于仅提供最基础的核心组件,缺失一些高级的特性。
  • Confluent kafka,Confluent公司提供的kafka。优点在于集成了不少高级特性且由kafka原版人马打造,质量上有保证;缺陷在于相关资料不全,普及率较低,没有太多可供参考的范例。
  • CDH/HPD kafka,大数据云公司提供的kafka,内嵌Apache kafka。优点在于操做简单,节省运维成本;缺陷在于把控度低,演进速度较慢。

聊聊kafka的版本号

今天聊聊kafka版本号的问题,这个问题实在是过重要了,我以为甚至是往后可否用好kafka的关键。上一节咱们介绍了kafka的几种发行版,其实不管是哪一种kafka,本质上都内嵌了最核心的Apache kafka,也就是社区版kafka,那今天咱们就说说Apache kafka版本号的问题。在开始以前,先强调一下,后面出现的全部"版本"这个词都表示kafka具体的版本号,而非上一节中介绍kafka种类,这一点要切记。

那么如今可能会有这样的疑问,我为何要关心版本号的问题呢?直接使用最新版本不就行了吗?固然了,这的确是一种有效的版本选择的策略,但我想强调的是这种策略并不是在任何场景下都适用。若是你不了解各个版本之间的差别和功能变化,你怎么能准确地评判某kafka版本是否是知足你的业务需求呢?所以在深刻学习kafka以前,花些时间搞明白版本演进,其实是很是划算的一件事。

kafka版本命名

当前Apache kafka已经迭代到2.2版本,社区正在为2.3.0发版日期进行投票,相信2.3.0也会立刻发布。可是稍微有些使人吃惊的是,不少人对于kafka的版本命名理解存在歧义。好比咱们在官网下载kafka时,会看到这样的版本。

因而有些人或许就会纳闷,难道kafka的版本号不是2.11或者2.12吗?其实否则,前面的版本号是编译kafka源代码的Scala编译器版本。kafka服务器端的代码彻底由Scala语言编写,Scala同时支持面向对象编程和函数式编程,用Scala写的源代码编译以后也是普通".class"文件,所以咱们说Scala是JVM系的语言,它的不少设计思想都是为人称道的。

事实上目前java新推出的不少功能都是在不断地向Scala靠近,好比lambda表达式、函数式接口、val变量等等。一个有意思的事情是,kafka新版客户端代码彻底由java语言编写,因而有人展开了java vs Scala的讨论,并从语言特性的角度尝试分析kafka社区为何放弃Scala转而使用java重写客户端代码。其实事情远没有那么复杂,仅仅是由于社区来了一批java程序员而已,而之前老的Scala程序员隐退罢了。可能有点跑题了,可是无论怎么样,我依然建议你有空学一学python语言。

回到刚才的版本号讨论,如今你应该知道了对于kafka-2.11-2.1.1的提法,真正的kafka版本号是2.1.1,那么这个2.1.1又表示什么呢?前面的2表示大版本号,即major version;中间的1表示小版本号或者次版本号,即minor version;最后的1表示修订版本号,也就是patch号。kafka社区在发布1.0.0版本后特地写过一篇文章,宣布kafka版本命名规则正式从4位演进到3位,好比0.11.0.0版本就是4位版本号。

kafka版本演进

于kafka目前总共演进了7个大版本,分别是0.七、0.八、0.九、0.十、0.十一、1.0和2.0,其中的小版本和patch版本不少。哪些版本引入了哪些重大的功能改进?建议你最好作到如数家珍,由于这样不只令你在和别人交谈时显得很酷,并且若是你要向架构师转型或者已然是架构师,那么这些都是可以帮助你进行技术选型、架构评估的重要依据。

咱们先从0.7版本提及,实际上也没有什么可说的,这是最先开源时的上古版本了。这个版本只提供了最基础的消息队列功能,甚至连副本机制都没有,我实在想不出来有什么理由你要使用这个版本,所以若是有人要向你推荐这个版本,果断走开好了。

kafka从0.7时代演进到0.8以后正式引入了副本机制,至此kafka成为了一个真正意义上完备的分布式、高可靠消息队列解决方案。有了副本备份机制,kafka就可以比较好地作到消息无丢失。那时候生产和消费消息使用的仍是老版本客户端的api,所谓老版本是指当你使用它们的api开发生产者和消费者应用时,你须要指定zookeeper的地址而非broker的地址。

若是你如今尚不能理解这二者的区别也没有关系,我会在后续继续介绍它们。老版本的客户端有不少的问题,特别是生产者api,它默认使用同步方式发送消息,能够想到其吞吐量必定不会过高。虽然它也支持异步的方式,但实际场景中消息有可能丢失,所以0.8.2.0版本社区引入了新版本producer api,即须要指定broker地址的producer。

据我所知,国内依然有少部分用户在使用0.8.1.一、0.8.2版本。个人建议是尽可能使用比较新的版本,若是你不能升级大版本,我也建议你至少要升级到0.8.2.2这个版本,由于该版本中老版本消费者的api是比较稳定的。另外即便升级到了0.8.2.2,也不要使用新版本producer api,此时它的bug还很是的多。

时间来到了2015年11月,社区正式发布了0.9.0.0版本,在我看来这是一个重量级的大版本更迭,0.9大版本增长了基础的安全认证/权限功能,同时使用java重写了新版本消费者的api,另外还引入了kafka connect组件用于实现高性能的数据抽取。若是这么眼花缭乱的功能你一时无暇顾及,那么我但愿你记住这个版本另外一个好处,那就是新版本的producer api在这个版本中算比较稳定了。若是你使用0.9做为线上环境不妨切换到新版本producer,这是此版本一个不太为人所知的优点。但和0.8.2引入新api问题相似,不要使用新版本的consumer api,由于bug超级多,绝对用到你崩溃。即便你反馈问题到社区,社区也无论的,它会无脑的推荐你升级到新版本再试试,所以千万别用0.9新版本的consumer api。对于国内一些使用比较老的CDH的创业公司,鉴于其内嵌的就是0.9版本,因此要格外注意这些问题。

0.10.0.0是里程碑式的大版本,由于该版本引入了kafka streams。从这个版本起,kafka正式升级成为分布式流处理平台,虽然此时的kafka streams还不能上线部署使用。0.10大版本包含两个包含两个小版本:0.10.1和0.10.2,它们的主要功能变动都是在kafka streams组件上。若是把kafka做为消息引擎,实际上该版本并无太多的功能提高。不过在个人印象中,自从0.10.2.2版本起,新版本consumer api算是比较稳定了。若是你依然在使用0.10大版本,那么我强烈建议你至少升级到0.10.2.2而后再使用新版本的consumer api。还有个事情不得不提,0.10.2.2修复了一个可能致使producer性能下降的bug。基于性能的缘故你也应该升级到0.10.2.2。

在2017年6月,社区发布了0.11.0.0版本,引入了两个重量级的功能变动:一个是提供幂等性producer api;另外一个是对kafka消息格式作了重构。

  • 前一个好像更加吸引眼球一些,毕竟producer实现幂等性以及支持事务都是kafka实现流处理结果正确性的基石。没有它们,kafka streams在作流处理时没法像批处理那样保证结果的正确性。固然一样是因为刚推出,此时的事务api有一些bug,不算十分稳定。另外事务api主要是为kafka streams应用服务的,实际使用场景中用户利用事务api自行编写程序的成功案例并很少见
  • 第二个改进是消息格式的变化。虽然它对用户是透明的,可是它带来的深远影响将一直持续。由于格式变动引发消息格式转换而致使的性能问题在生产环境中家常便饭,因此必定要谨慎对待0.11这个版本的变化。不得不说的是,在这个版本中,各个大功能组件都变得至关稳定了,国内该版本的用户也不少,应该算是目前最主流的版本之一了。也正是由于这个缘故,社区为0.11大版本特地退出了3个patch版本,足见它的受欢迎程度。个人建议是,若是你对1.0版本是否适用于线上环境依然感到困惑,那么至少将你的环境升级到0.11.0.3,由于这个版本的消息引擎功能已经很是完善了。

最后合并说一下1.0和2.0版本吧,由于在我看来这两个大版本主要仍是kafka streams的各类改进,在消息引擎方面并未引入太多的重大功能特性。kafka streams的确在这两个版本有着很是大的变化,也必须认可kafka streams目前依然还在积极地发展着。若是你是kafka streams的用户,只要选择2.0.0版本吧。

去年8月国外出了一本书叫作kafka streams in action,中文译名:kafka streams实战,它是基于kafka streams1.0版本撰写的,可是用2.0版本去运行书中的不少例子,竟然不少都已经没法编译了,足见两个版本的差异之大。不过若是你在乎的依然是消息引擎,那么这两个大版本都是能够用于生产环境的。

最后还有个建议,不论你使用的是哪一个版本,都请尽可能保持服务器端版本和客户端版本一致,不然你将损失不少kafka为你提供的性能优化收益。

kafka线上集群部署方案怎么作

前面几节,咱们分别从kafka的定位,版本的变迁以及功能的演进等方面按部就班地梳理了Apache kafka的发展脉络。那么如今咱们就来看看生产环境中的kafka集群方案该怎么作。既然是集群,那必然就要有多个kafka节点机器,由于只有单台机器构成的kafka伪集群只能用于平常测试之用,根本没法知足实际的线上生产需求。而真正的线上环境须要仔细地考量各类因素,结合自身的业务需求而制定。下面咱们就从操做系统、磁盘、磁盘容量和带宽等方面来讨论一下。

操做系统:

这个很少BB,果断选择linux。至于为何?主要是在如下这三个方面

  • I/O模型的使用

    什么是I/O模型,能够近似的认为是操做系统执行I/O指令的方法。主流的I/O模型有五种:阻塞式I/O,非阻塞式I/O,I/O多路复用,信号驱动I/O,异步I/O。每种I/O模型都有各自的使用场景,但咱们想要支持高并发的话,都会选择I/O多路复用,至于异步I/O,因为操做系统支持的不完美,因此不选择。对于I/O多路复用有三种,select、poll、epoll,epoll是在linux内核2.4中提出的,对于I/O轮询能够作到效率最大化,至于这三者的具体关系就不详细介绍了,只须要知道epoll"最好"就好了。说了这么多,那么I/O模型和kafka又有什么关系呢?实际上kafka客户端底层使用java的selector,selector会自动从select、poll、epoll中选择一个,而Windows只支持select。所以在这一点上linux是有优点的,由于可以得到更高效的I/O性能。

  • 数据网络传输效率

    首先kafka生产和消费的消息都是经过网络传输的,而消息保存在哪里呢?确定是磁盘,故kafka须要在磁盘和网络之间进行大量数据传输。若是你熟悉linux,那么你确定据说过零拷贝(zero copy)技术,就是当数据在磁盘和网络进行传输时避免昂贵的内核态数据拷贝从而实现快速的数据传输。linux平台实现了这样的零拷贝机制,但有些遗憾的是在Windows平台上必须等到java8的60更新版本才能享受这个福利。一句话总结一下,在linux部署kafka可以享受到零拷贝技术所带来的快速数据传输特性。

  • 社区支持度

    最后是社区支持度,这一点虽然不是什么明显的差异,但若是不了解的话,所形成的影响可能会比前两个因素更大。简单的来讲,就是社区目前对Windows平台上发现的bug不作任何承诺。所以Windows平台上部署kafka只适合于我的测试或用于功能验证,千万不要用于生产环境。

磁盘:

若是要问哪一种资源对kafka性能最重要,磁盘无疑是要排名靠前的。在对kafka集群进行磁盘规划时常常要面对的问题是,我应该选择普通的机械磁盘仍是固态硬盘?前者成本低且容量大,但易损坏;后者性能优点大,不过单价高。我的建议:使用普通的机械硬盘便可。

kafka大量使用磁盘不假,可它使用的方式是多顺序读写操做,必定程度上规避了机械磁盘最大的劣势,即随机读写操做慢。从这一点上,使用ssd彷佛没有太大的性能优点,毕竟从性价比是哪一个来讲,机械磁盘物美价廉,而它因易损坏而形成的可靠性差等缺陷,又有kafka在软件层面提供机制来保证,故使用普通机械磁盘是很划算的

关于磁盘选择另外一个经常讨论的话题,究竟是否应该使用磁盘阵列(raid)。使用磁盘阵列的两个优点在于:

  • 提供冗余的磁盘存储空间
  • 提供负载均衡

以上两个优点对于任何一个分布式系统都颇有吸引力。不过就kakfa而言,一方面kafka本身实现了冗余机制来提供高可靠性;另外一方面经过分区的概念,kafka也能在软件层面自行实现负载均衡。如此一来磁盘阵列的优点就没有那么明显了,固然并非说磁盘阵列很差,实际上依然有不少大厂是把确实是把kafka底层的存储交给磁盘阵列的,只是目前kafka在存储这方面提供了愈来愈便捷的高可靠性方案,所以在线上环境使用磁盘阵列彷佛变得不那么重要了。综合以上的考量,我的给出的建议是:

  • 追求性价比的公司能够不搭建磁盘阵列,使用普通磁盘组成存储空间便可。
  • 使用机械磁盘彻底可以胜任kafka线上环境。

磁盘容量:

kafka集群到底须要多大的存储空间,这是一个很是经典的规划问题。kafka须要将消息保存在底层的磁盘上,这些消息默认会被保存一段时间而后自动被删除。虽然这段时间是能够配置的,但你应该如何结合自身业务场景和存储需求来规划kafka集群的存储容量呢?

我举一个简单的例子来讲明如何思考这个问题,假设你所在公司有个业务天天须要向kafka集群发送1亿条消息,每条消息保存两份以防止数据丢失,另外消息默认保存两周时间。如今假设消息的平均大小是1KB,那么你能说出你的kafka集群须要为这个业务预留多少磁盘空间吗?

咱们来计算一下,天天1亿条1KB大小的消息,保存两份且存两周的时间,那么总的空间大小就等于1亿 * 1KB * 2/ 1024 / 1024 。通常状况下,kafka集群除了消息数据还有其余类型的数据,好比索引数据等,所以咱们须要再为这些数据预留出10%的磁盘空间,所以咱们在原来的基础上乘上1.1,既然要保存两周,那么再乘上14,那么总体容量大概为21.5TB左右。因为kafka支持数据的压缩 ,假设数据的压缩比是0.75,那么最后你须要规划的存储空间是21.5 * 0.75=16.14TB左右。

总之在规划磁盘容量时你须要考虑下面这几个元素:

  • 新增消息数
  • 消息留存时间
  • 平均消息大小
  • 备份数
  • 是否启用压缩

带宽:

对于kafka这种经过网络进行大量数据传输的框架而言,带宽特别容易成为瓶颈。事实上,在真实案例当中,带宽资源不足致使kafka出现性能问题的比例至少占60%以上。若是你的环境中还要涉及跨机房传输,那么状况状况可能更糟糕了。

若是你不是超级土豪的话,我会认为你使用的是普通的以太网,带宽也主要有两种:1Gbps的千兆网络,和10Gbps的万兆网络,特别是千兆网络应该是通常公司网络的标准配置了。下面我就以千兆网举一个实际的例子,来讲明一下如何进行带宽资源的规划。

与其说是带宽资源的规划,其实真正要规划的是所需的kafka服务器的数量。假设你公司的机房环境是千兆网络,即1Gps,如今你有个业务,其业务目标或SLA是在1小时内处理1TB的业务数据。那么问题来了,你到底须要多少台kafka服务器来完成这个业务呢?

让咱们来计算一下,因为带宽是1Gps,即每秒处理1Gb的数据,假设每台kafka服务器都是安装在专属的机器上,也就是说每台kafka机器上没有混布其余服务,可是真实环境中不建议这么作。一般状况下你只能假设kafka会用到70%的资源,由于总要为其余应用或者进程留一些资源。根据实际使用经验,超过70%的阈值就有网络丢包的可能性了,故70%的设定是一个比较合理的值,也就是说单台kafka服务器最多也就能使用700Mb的带宽资源。

稍等,这只是它能使用的最大带宽资源,你不能让kafka服务器常规性地使用这么多资源,故一般要再额外留出2/3的资源,即单台服务器使用带宽为700/3≈233MBps。须要提示的是,这里的2/3是至关保守的,你能够结合本身机器的使用状况酌情减小此值。

好了,有了240MBps,咱们就能够计算1小时内处理1TB数据所须要的服务器数量了。根据这个目标,咱们每秒须要处理1TB / 3600s * 8 ≈ 2336MB的数据,除以240,约等于10台服务器。若是消息还须要额外复制两份,那么总的服务器台数还要乘以3,即30台。

kafka的安装&启动&关闭

下面咱们就来安装kafka,这里咱们选择的版本为kafka_2.12-2.2.1,安装在/opt/kafka目录下,而后配置环境变量,source一下

kafka的目录结构以下

先来看看kafka的配置文件吧

sink、source显然是和flume有关的。consumer、producer则是经过命令行启动消费者、生产者,这个是作测试用的,可是咱们通常都在代码中写配置。下面还有一个zookeeper,不过这个zookeeper咱们不用管,由于这是kafka自带的zookeeper,咱们都用本身的zookeeper。比较重要的是,那么server.properities,咱们来看一下。

# The id of the broker. This must be set to a unique integer for each broker.
# broker的惟一id,对于每个broker都必须设置为惟一的"整数",另外一台broker的话,broker.id=1
broker.id=0

# The address the socket server listens on. It will get the value returned from 
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092
# 监听端口9092

# The number of threads that the server uses for receiving requests from the network and sending responses to the network
# 服务用于接收来自网络的请求以及向网络发送响应的线程数
num.network.threads=3

# The number of threads that the server uses for processing requests, which may include disk I/O
# 服务用于处理请求的线程数、可能包含磁盘IO
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
# socket服务端用于发送数据的缓存,意思是当数据到达指定的缓存以后才发送
socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server
# socket服务端用于接收数据的缓存,意思是当数据达到指定的缓存以后才读取
socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept (protection against OOM)
# socket服务容许接收的请求的最大字节数
socket.request.max.bytes=104857600

# 以上都是默认配置,咱们就不改了

# A comma separated list of directories under which to store log files
# 用逗号分隔的一系列文件路径,用于存储日志文件
# 注意:其实不止日志文件,还有暂存数据,也存在这里面。都叫作log,这一点容易混淆,务必记住
log.dirs=/tmp/kafka-logs

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
# 每个注意的分区数,下面都不用管
num.partitions=1

# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
# 用来恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings  #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended for to ensure availability such as 3.
# 副本系数等等
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1


# The maximum size of a log segment file. When this size is reached a new log segment will be created.
# 每个日志段的最大字节,换算以后是一个G
log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
# segment保留的最长时间,超时将被删除
log.retention.check.interval.ms=300000

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
# 配置链接zookeeper的地址,若是多个zookeeper的话,那么就用逗号分割
zookeeper.connect=localhost:2181

# Timeout in ms for connecting to zookeeper
# 链接zookeeper的最大超时时间
zookeeper.connection.timeout.ms=6000

咱们再来看一下,bin目录

有五个sh脚本是比较经常使用的,kafka-console-consumer.sh、kafka-console-producer.sh,这两个是在控制台启动的,用于测试。kafka-server-start.sh、kafka-server-stop.sh,这两个是启动kafka集群的。kafka-topics.sh,这个是与主题相关的,能够对主题进行相关操做。
启动kafka:bin]# ./kafka-server-start.sh ../config/server.properties,我是在bin目录下启动的,注意启动的时候须要指定配置文件,就是咱们刚才配的server.properties。可是注意的是,这样启动的话,进程是一个阻塞的,若是想进行别的操做,只能单独开一个终端了,所以咱们能够以守护进程的方式启动:bin]# ./kafka-server-start.sh -daemon ../config/server.properties。不过还有一点须要注意:那就是咱们指定了zookeeper,是否是要先启动zookeeper呢?没错,zkServer.sh start,启动以后才能启动kafka,不然就会链接zookeeper超时,从而致使启动失败

关闭kafka:bin]# ./kafka-server-stop.sh ../config/server.properties,关闭的时候就不须要指定-daemon这个参数了。

命令行操做topic增删查

查看全部topic

kafka-topics.sh --list --zookeeper localhost:2181

能够看到,因为咱们尚未建立,因此此时尚未主题。

建立topic:

kafka-topics.sh --create --zookeeper localhost:2181 --topic 主题名 --partitions 分区数 --replication-factor 副本数

注意:副本数不能超过你broker的数量,由于咱们只有一台机器,因此副本数是1,可是分区在一台broker上是能够有多个的

删除topic:

kafka-topics.sh --delete --zookeeper localhost:2181 --topic 主题名

这里提示咱们,若是没有将delete.topic.enable设置为true,那么这个分区不会被删除,可是satori这个主题已经被标记为删除了。咱们看看,就知道了,或者说再建立一个satori,若是存在会报错的。

能够看到,这个分区是真的被删除了。

查看topic信息:

kafka-topics.sh --describe --zookeeper localhost:2181 --topic 主题名

命令行控制台生产者消费者测试

启动生产者:

kafka-console-producer.sh --topic 主题 --broker-list localhost:9092,注意这里是--broker-list,也就是broker的地址

启动消费者:

kafka-console-consumer.sh --topic 主题 --bootstrap-server localhost:9092

此时消费者卡在了这个地方,等待生产者生产数据。

数据默认保留7天,超过7天就会删除。可是还有一个问题,要是消费者启动以前,生产生产消息了,怎么办?显然此时的消费者是接收不到的,所以咱们能够加上一个--from-beginning参数,这样的话就能够把消息所有消费掉。

关闭消费者以后,生产者又生产了两条消息,而后启动消费者。

相关文章
相关标签/搜索