kafka最初是 LinkedIn 的一个内部基础设施系统。最初开发的原由是,LinkedIn虽然有了数据库和其余系统能够用来存储数据,可是缺少一个能够帮助处理持续数据流的组件。因此在设计理念上,开发者不想只是开发一个可以存储数据的系统,如关系数据库、Nosql 数据库、搜索引擎等等,更但愿把数据当作一个持续变化和不断增加的流,并基于这样的想法构建出一个数据系统,一个数据架构。前端
Kafka外在表现很像消息系统,容许发布和订阅消息流,可是它和传统的消息系统有很大的差别,java
首先,Kafka 是个现代分布式系统,以集群的方式运行,能够自由伸缩。sql
其次,Kafka 能够按照要求存储数据,保存多久均可以,数据库
第三,流式处理将数据处理的层次提示到了新高度,消息系统只会传递数据,Kafka 的流式处理能力可让咱们用不多的代码就能动态地处理派生流和数据集。因此 Kafka 不只仅是个消息中间件。apache
Kafka不只仅是一个消息中间件,同时它是一个流平台,这个平台上能够发布和订阅数据流(Kafka 的流,有一个单独的包 Stream 的处理),并把他们保存起来,进行处理,这个是 Kafka做者的设计理念。bootstrap
大数据领域,Kafka 还能够当作实时版的 Hadoop,可是仍是有些区别,Hadoop 能够存储和按期处理大量的数据文件,每每以 TB 计数,而 Kafka能够存储和持续处理大型的数据流。Hadoop 主要用在数据分析上,而 Kafka由于低延迟,更适合于核心的业务应用上。因此国内的大公司通常会结合使用,好比京东在实时数据计算架构中就使用了到了 Kafka,具体见《张开涛-海量数据下的应用系统架构实践》windows
常见的大数据处理框架:storm、spark、Flink、(Blink 阿里)数组
Kafka名字的由来:卡夫卡与法国做家马塞尔·普鲁斯特,爱尔兰做家詹姆斯·乔伊斯并称为西方现代主义文学的先驱和大师。《变形记》是卡夫卡的短篇表明做,是卡夫卡的艺术成就中的一座高峰,被认为是 20 世纪最伟大的小说做品之一(达到管理层的高度应该多看下人文相关的书籍,增加管理知识和人格魅力)。缓存
本文以 kafka_2.11-2.3.0 版本为主,其他版本不予考虑,而且 Kafka 是 scala 语言写的,小众语言,没有必要研究其源码,投入和产出比低,除非你的技术级别很是高或者须要去开发单独的消息中间件。服务器
消息,Kafka 里的数据单元,也就是咱们通常消息中间件里的消息的概念(能够比做数据库中一条记录)。消息由字节数组组成。消息还能够包含键(可选元数据,也是字节数组),主要用于对消息选取分区。
做为一个高效的消息系统,为了提升效率,消息能够被分批写入 Kafka。批次就是一组消息,这些消息属于同一个主题和分区。若是只传递单个消息,会致使大量的网络开销,把消息分红批次传输能够减小这开销。可是,这个须要权衡(时间延迟和吞吐量之间),批次里包含的消息越多,单位时间内处理的消息就越多,单个消息的传输时间就越长(吞吐量高延时也高)。若是进行压缩,能够提高数据的传输和存储能力,但须要更多的计算处理。
对于 Kafka来讲,消息是晦涩难懂的字节数组,通常咱们使用序列化和反序列化技术,格式经常使用的有 JSON 和 XML,还有 Avro(Hadoop 开发的一款序列化框架),具体怎么使用依据自身的业务来定。
Kafka里的消息用主题进行分类(主题比如数据库中的表),主题下有能够被分为若干个分区(分表技术)。分区本质上是个提交日志文件,有新消息,这个消息就会以追加的方式写入分区(写文件的形式),而后用先入先出的顺序读取。
可是由于主题会有多个分区,因此在整个主题的范围内,是没法保证消息的顺序的,单个分区则能够保证。
Kafka经过分区来实现数据冗余和伸缩性,由于分区能够分布在不一样的服务器上,那就是说一个主题能够跨越多个服务器(这是 Kafka 高性能的一个缘由,多台服务器的磁盘读写性能比单台更高)。
前面咱们说 Kafka 能够当作一个流平台,不少时候,咱们会把一个主题的数据当作一个流,无论有多少个分区。
就是通常消息中间件里生产者和消费者的概念。一些其余的高级客户端 API,像数据管道 API 和流式处理的 Kafka Stream,都是使用了最基本的生产者和消费者做为内部组件,而后提供了高级功能。
生产者默认状况下把消息均衡分布到主题的全部分区上,若是须要指定分区,则须要使用消息里的消息键和分区器。
消费者订阅主题,一个或者多个,而且按照消息的生成顺序读取。消费者经过检查所谓的偏移量来区分消息是否读取过。偏移量是一种元数据,一个不断递增的整数值,建立消息的时候,Kafka 会把他加入消息。在一个主题中一个分区里,每一个消息的偏移量是惟一的。每一个分区最后读取的消息偏移量会保存到 Zookeeper 或者 Kafka 上,这样分区的消费者关闭或者重启,读取状态都不会丢失。
多个消费者能够构成一个消费者群组。怎么构成?共同读取一个主题的消费者们,就造成了一个群组。群组能够保证每一个分区只被一个消费者使用。
消费者和分区之间的这种映射关系叫作消费者对分区的全部权关系,很明显,一个分区只有一个消费者,而一个消费者能够有多个分区。
(吃饭的故事:一桌一个分区,多桌多个分区,生产者不断生产消息(消费),消费者就是买单的人,消费者群组就是一群买单的人),一个分区只能被消费者群组中的一个消费者消费(不能重复消费),若是有一个消费者挂掉了<James 跑路了>,另外的消费者接上)
一个独立的 Kafka 服务器叫 Broker。broker 的主要工做是,接收生产者的消息,设置偏移量,提交消息到磁盘保存;为消费者提供服务,响应请求,返回消息。在合适的硬件上,单个 broker 能够处理上千个分区和每秒百万级的消息量。(要达到这个目的须要作操做系统调优和 JVM 调优)
多个 broker 能够组成一个集群。每一个集群中 broker 会选举出一个集群控制器。控制器会进行管理,包括将分区分配给 broker 和监控 broker。
集群里,一个分区从属于一个 broker,这个 broker 被称为首领。可是分区能够被分配给多个 broker,这个时候会发生分区复制。
集群中 Kafka 内部通常使用管道技术进行高效的复制。
分区复制带来的好处是,提供了消息冗余。一旦首领 broker 失效,其余 broker 能够接管领导权。固然相关的消费者和生产者都要从新链接到新的首领上。
在必定期限内保留消息是 Kafka 的一个重要特性,Kafka broker 默认的保留策略是:要么保留一段时间(7 天),要么保留必定大小(好比 1 个 G)。
到了限制,旧消息过时并删除。可是每一个主题能够根据业务需求配置本身的保留策略(开发时要注意,Kafka 不像 Mysql 之类的永久存储)。
多生产者和多消费者
基于磁盘的数据存储,换句话说,Kafka 的数据天生就是持久化的。
高伸缩性,Kafka 一开始就被设计成一个具备灵活伸缩性的系统,对在线集群的伸缩丝绝不影响总体系统的可用性。
高性能,结合横向扩展生产者、消费者和 broker,Kafka 能够轻松处理巨大的信息流(LinkedIn 公司天天处理万亿级数据),同时保证亚秒级的消息延迟。
跟踪网站用户和前端应用发生的交互,好比页面访问次数和点击,将这些信息做为消息发布到一个或者多个主题上,这样就能够根据这些数据为机器学习提供数据,更新搜素结果等等(头条、淘宝等总会推送你感兴趣的内容,其实在数据分析以前就已经作了活动跟踪)。
标准消息中间件的功能
收集应用程序和系统的度量监控指标,或者收集应用日志信息,经过 Kafka路由到专门的日志搜索系统,好比 ES。(国内用得较多)
收集其余系统的变更日志,好比数据库。能够把数据库的更新发布到 Kafka上,应用经过监控事件流来接收数据库的实时更新,或者经过事件流将数据库的更新复制到远程系统。
还能够当其余系统发生了崩溃,经过重放日志来恢复系统的状态。(异地灾备)
操做实时数据流,进行统计、转换、复杂计算等等。随着大数据技术的不断发展和成熟,不管是传统企业仍是互联网公司都已经再也不知足于离线批处理,实时流处理的需求和重要性日益增加 。
近年来业界一直在探索实时流计算引擎和 API,好比这几年火爆的 Spark Streaming、Kafka Streaming、Beam 和 Flink,其中阿里双 11 会场展现的实时销售金额,就用的是流计算,是基于 Flink,而后阿里在其上定制化的 Blink。
Kafka是 Java 生态圈下的一员,用 Scala 编写,运行在 Java 虚拟机上,因此安装运行和普通的 Java 程序并无什么区别。
安装 Kafka官方说法,Java 环境推荐 Java8。
Kafka须要 Zookeeper 保存集群的元数据信息和消费者信息。Kafka通常会自带 Zookeeper,可是从稳定性考虑,应该使用单独的 Zookeeper,并且构建Zookeeper 集群。
在 http://kafka.apache.org/downloads 上寻找合适的版本下载,这里选用的是 kafka_2.11-2.3.0,下载完成后解压到本地目录。
启动 Zookeeper
进入 Kafka目录下的 bin\windows
执行 kafka-server-start.bat ../../config/server.properties,出现如下画面表示成功
Linux下与此相似,进入 bin 后,执行对应的 sh 文件便可
##列出全部主题
kafka-topics.bat --zookeeper localhost:2181 --list
##列出全部主题的详细信息
kafka-topics.bat --zookeeper localhost:2181 --describe
##建立主题 主题名 my-topic,1 副本,8 分区
kafka-topics.bat --zookeeper localhost:2181 --create --topic my-topic --replication-factor 1 --partitions 8
##增长分区,注意:分区没法被删除
kafka-topics.bat --zookeeper localhost:2181 --alter --topic my-topic --partitions 16
##删除主题
kafka-topics.bat --zookeeper localhost:2181 --delete --topic my-topic
##建立生产者(控制台)
kafka-console-producer.bat --broker-list localhost:9092 --topic my-topic
##建立消费者(控制台)
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic my-topic --from-beginning
##列出消费者群组(仅 Linux)
kafka-topics.sh --new-consumer --bootstrap-server localhost:9092 --list
##列出消费者群组详细信息(仅 Linux)
kafka-topics.sh --new-consumer --bootstrap-server localhost:9092 --describe --group 群组名
配置文件放在 Kafka目录下的 config 目录中,主要是 server.properties 文件
在单机时无需修改,但在集群下部署时每每须要修改。它是个每个 broker 在集群中的惟一表示,要求是正数。当该服务器的 IP 地址发生改变时,broker.id 没有变化,则不会影响 consumers 的消息状况
监听列表(以逗号分隔 不一样的协议(如 plaintext,trace,ssl、不一样的 IP 和端口)),hostname 若是设置为 0.0.0.0 则绑定全部的网卡地址;若是 hostname 为空则绑定默认的网卡。
若是没有配置则默认为 java.net.InetAddress.getCanonicalHostName()。
如:PLAINTEXT://myhost:9092,TRACE://:9091 或 PLAINTEXT://0.0.0.0:9092,
zookeeper 集群的地址,能够是多个,多个之间用逗号分割。(一组 hostname:port/path 列表,hostname 是 zk 的机器名或 IP、port 是 zk 的端口、/path是可选 zk 的路径,若是不指定,默认使用根路径)
Kafka把全部的消息都保存在磁盘上,存放这些数据的目录经过 log.dirs 指定。可使用多路径,使用逗号分隔。若是是多路径,Kafka会根据“最少使用”原则,把同一个分区的日志片断保存到同一路径下。会往拥有最少数据分区的路径新增分区。
每数据目录用于日志恢复启动和关闭时的线程数量。由于这些线程只是服务器启动(正常启动和崩溃后重启)和关闭时会用到。因此彻底能够设置大量的线程来达到并行操做的目的。注意,这个参数指的是每一个日志目录的线程数,好比本参数设置为 8,而 log.dirs 设置为了三个路径,则总共会启动24 个线程。
是否容许自动建立主题。若是设为 true,那么 produce(生产者往主题写消息),consume(消费者从主题读消息)或者 fetch metadata(任意客户端向主题发送元数据请求时)一个不存在的主题时,就会自动建立。缺省为 true。
新建主题的默认参数
每一个新建主题的分区个数(分区个数只能增长,不能减小 )。这个参数通常要评估,好比,每秒钟要写入和读取 1000M 数据,若是如今每一个消费者每秒钟能够处理 50MB 的数据,那么须要 20 个分区,这样就可让 20 个消费者同时读取这些分区,从而达到设计目标。(通常经验,把分区大小限制在25G 以内比较理想)
日志保存时间,默认为 7 天(168 小时)。超过这个时间会清理数据。bytes 和 minutes 不管哪一个先达到都会触发。与此相似还有 log.retention.minutes和log.retention.ms,都设置的话,优先使用具备最小值的那个。(提示:时间保留数据是经过检查磁盘上日志片断文件的最后修改时间来实现的。也就是最后修改时间是指日志片断的关闭时间,也就是文件里最后一个消息的时间戳)
topic 每一个分区的最大文件大小,一个 topic 的大小限制 = 分区数*log.retention.bytes。-1 没有大小限制。log.retention.bytes 和 log.retention.minutes任意一个达到要求,都会执行删除。(注意若是是 log.retention.bytes 先达到了,则是删除多出来的部分数据),通常不推荐使用最大文件删除策略,而是推荐使用文件过时删除策略。
分区的日志存放在某个目录下诸多文件中,这些文件将分区的日志切分红一段一段的,咱们称为日志片断。这个属性就是每一个文件的最大尺寸;当尺寸达到这个数值时,就会关闭当前文件,并建立新文件。被关闭的文件就开始等待过时。默认为 1G。
若是一个主题天天只接受 100MB 的消息,那么根据默认设置,须要 10 天才能填满一个文件。并且由于日志片断在关闭以前,消息是不会过时的,因此若是 log.retention.hours 保持默认值的话,那么这个日志片断须要 17 天才过时。由于关闭日志片断须要 10 天,等待过时又须要 7 天。
做用和 log.segment.bytes 相似,只不过判断依据是时间。一样的,两个参数,以先到的为准。这个参数默认是不开启的。
表示一个服务器可以接收处理的消息的最大字节数,注意这个值 producer 和 consumer 必须设置一致,且不要大于 fetch.message.max.bytes 属性的值(消费者能读取的最大消息,这个值应该大于或等于 message.max.bytes)。该值默认是 1000000 字节,大概 900KB~1MB。若是启动压缩,判断压缩后的值。
这个值的大小对性能影响很大,值越大,网络和 IO 的时间越长,还会增长磁盘写入的大小。
Kafka 设计的初衷是迅速处理短小的消息,通常 10K 大小的消息吞吐性能最好(LinkedIn 的 kafka性能测试)
为 Kafka 选择合适的硬件更像是一门艺术,就跟它的名字同样,咱们分别从磁盘、内存、网络和 CPU 上来分析,肯定了这些关注点,就能够在预算范围以内选择最优的硬件配置。
磁盘吞吐量(IOPS 每秒的读写次数)会影响生产者的性能。由于生产者的消息必须被提交到服务器保存,大多数的客户端都会一直等待,直到至少有一个服务器确认消息已经成功提交为止。也就是说,磁盘写入速度越快,生成消息的延迟就越低。(SSD固态贵单个速度快,HDD 机械偏移能够多买几个,设置多个目录加快速度,具体状况具体分析)
磁盘容量的大小,则主要看须要保存的消息数量。若是天天收到 1TB 的数据,并保留 7 天,那么磁盘就须要 7TB 的数据。
Kafka自己并不须要太大内存,内存则主要是影响消费者性能。在大多数业务状况下,消费者消费的数据通常会从内存(页面缓存,从系统内存中分)
中获取,这比在磁盘上读取确定要快的多。通常来讲运行 Kafka 的 JVM 不须要太多的内存,剩余的系统内存能够做为页面缓存,或者用来缓存正在使用的日志片断,因此咱们通常 Kafka不会同其余的重要应用系统部署在一台服务器上,由于他们须要共享页面缓存,这个会下降 Kafka 消费者的性能。
网络吞吐量决定了 Kafka可以处理的最大数据流量。它和磁盘是制约 Kafka 拓展规模的主要因素。对于生产者、消费者写入数据和读取数据都要瓜分网络流量。同时作集群复制也很是消耗网络。
Kafka对 cpu的要求不高,主要是用在对消息解压和压缩上。因此 cpu 的性能不是在使用 Kafka的首要考虑因素。
咱们要为 Kafka选择合适的硬件时,优先考虑存储,包括存储的大小,而后考虑生产者的性能(也就是磁盘的吞吐量),选好存储之后,再来选择CPU 和内存就容易得多。网络的选择要根据业务上的状况来定,也是很是重要的一环。
本地开发,一台 Kafka足够使用。在实际生产中,集群能够跨服务器进行负载均衡,再则可使用复制功能来避免单独故障形成的数据丢失。同时集群能够提供高可用性。
要估量如下几个因素:
须要多少磁盘空间保留数据,和每一个 broker 上有多少空间能够用。好比,若是一个集群有 10TB 的数据须要保留,而每一个 broker 能够存储 2TB,那么至少须要 5 个 broker。若是启用了数据复制,则还须要一倍的空间,那么这个集群须要 10 个 broker。
集群处理请求的能力。若是由于磁盘吞吐量和内存不足形成性能问题,能够经过扩展 broker 来解决。
很是简单,只须要两个参数。第一,配置 zookeeper.connect,第二,为新增的 broker 设置一个集群内的惟一性 id。
Kafka中的集群是能够动态扩容的。