1、kafka 简介html
今社会各类应用系统诸如商业、社交、搜索、浏览等像信息工厂同样不断的生产出各类信息,在大数据时代,咱们面临以下几个挑战:java
以上几个挑战造成了一个业务需求模型,即生产者生产(produce)各类信息,消费者消费(consume)(处理分析)这些信息,而在生产者与消费者之间,须要一个沟通二者的桥梁-消息系统。从一个微观层面来讲,这种需求也可理解为不一样的系统之间如何传递消息。 linux
kafka是一种高吞吐量的分布式发布订阅消息系统,它能够处理消费者规模的网站中的全部动做流数据。这种动做(网页浏览,搜索和其余用户的行动)是在现代网络上的许多社会功能的一个关键因素。这些数据一般是因为吞吐量的要求而经过处理日志和日志聚合来解决。 web
Apache kafka 它具有快速、可扩展、可持久化的特色。它如今是Apache旗下的一个开源系统,做为hadoop生态系统的一部分,被各类商业公司普遍应用。它的最大的特性就是能够实时的处理大量数据以知足各类需求场景:好比基于hadoop的批处理系统、低延迟的实时系统、storm/spark流式处理引擎。算法
下面介绍先大致介绍一下Kafka的主要设计思想,可让相关人员在短期内了解到kafka相关特性,若是想深刻研究,后面会对其中每个特性都作详细介绍。apache
客户端和服务端经过TCP协议通讯。Kafka提供了Java客户端,而且对多种语言都提供了支持。api
Kafka中发布订阅的对象是topic。咱们能够为每类数据建立一个topic,把向topic发布消息的客户端称做producer,从topic订阅消息的客户端称做consumer。Producers和consumers能够同时从多个topic读写数据。一个kafka集群由一个或多个broker服务器组成,它负责持久化和备份具体的kafka消息。producers经过网络将消息发送到Kafka集群,集群向消费者提供消息 数组
消息发送时都被发送到一个topic,其本质就是一个目录,而topic由是由一些Partition Logs(分区日志)组成,其组织结构以下图所示:缓存
咱们能够看到,每一个Partition中的消息都是有序的,生产的消息被不断追加到Partition log上,其中的每个消息都被赋予了一个惟一的offset值。
Kafka集群会保存全部的消息,无论消息有没有被消费;咱们能够设定消息的过时时间,只有过时的数据才会被自动清除以释放磁盘空间。好比咱们设置消息过时时间为2天,那么这2天内的全部消息都会被保存到集群中,数据只有超过了两天才会被清除。
Kafka须要维持的元数据只有一个–消费消息在Partition中的offset值,Consumer每消费一个消息,offset就会加1。其实消息的状态彻底是由Consumer控制的,Consumer能够跟踪和重设这个offset值,这样的话Consumer就能够读取任意位置的消息。
把消息日志以Partition的形式存放有多重考虑,第一,方便在集群中扩展,每一个Partition能够经过调整以适应它所在的机器,而一个topic又能够有多个Partition组成,所以整个集群就能够适应任意大小的数据了;第二就是能够提升并发,由于能够以Partition为单位读写了。服务器
1.1 kafka名词解释
每一个消息(也叫做record记录,也被称为消息)是由一个key,一个value和时间戳构成。
1.2 kafka有四个核心API介绍
1.3 kafka基基原理
一般来说,消息模型能够分为两种:队列和发布-订阅式。队列的处理方式是一组消费者从服务器读取消息,一条消息只有其中的一个消费者来处理。在发布-订阅模型中,消息被广播给全部的消费者,接收到消息的消费者均可以处理此消息。Kafka为这两种模型提供了单一的消费者抽象模型: 消费者组(consumer group)。消费者用一个消费者组名标记本身。
一个发布在Topic上消息被分发给此消费者组中的一个消费者。假如全部的消费者都在一个组中,那么这就变成了queue模型。假如全部的消费者都在不一样的组中,那么就彻底变成了发布-订阅模型。更通用的, 咱们能够建立一些消费者组做为逻辑上的订阅者。每一个组包含数目不等的消费者,一个组内多个消费者能够用来扩展性能和容错。
而且,kafka可以保证生产者发送到一个特定的Topic的分区上,消息将会按照它们发送的顺序依次加入,也就是说,若是一个消息M1和M2使用相同的producer发送,M1先发送,那么M1将比M2的offset低,而且优先的出如今日志中。消费者收到的消息也是此顺序。若是一个Topic配置了复制因子(replication facto)为N,那么能够容许N-1服务器宕机而不丢失任何已经提交(committed)的消息。此特性说明kafka有比传统的消息系统更强的顺序保证。可是,相同的消费者组中不能有比分区更多的消费者,不然多出的消费者一直处于空等待,不会收到消息。
1.5 主题和日志 (Topic和Log)
先来看一下Kafka提供的一个抽象概念:topic.
一个topic是对一组消息的概括。对每一个topic,Kafka 对它的日志进行了分区
每个分区(partition)都是一个顺序的、不可变的消息队列,而且能够持续的添加。分区中的消息都被分了一个序列号,称之为偏移量(offset),在每一个分区中此偏移量都是惟一的。Kafka集群保持全部的消息,直到它们过时,不管消息是否被消费了。
在一个可配置的时间段内,Kafka集群保留全部发布的消息,无论这些消息有没有被消费。好比,若是消息的保存策略被设置为2天,那么在一个消息被发布的两天时间内,它都是能够被消费的。以后它将被丢弃以释放空间。Kafka的性能是和数据量无关的常量级的,因此保留太多的数据并非问题。
实际上消费者所持有的仅有的元数据就是这个偏移量,也就是消费者在这个log中的位置。 这个偏移量由消费者控制:正常状况当消费者消费消息的时候,偏移量也线性的的增长。可是实际偏移量由消费者控制,消费者能够将偏移量重置为更老的一个偏移量,从新读取消息。
实际上每一个consumer惟一须要维护的数据是消息在日志中的位置,也就是offset.这个offset有consumer来维护:通常状况下随着consumer不断的读取消息,这offset的值不断增长,但其实consumer能够以任意的顺序读取消息,好比它能够将offset设置成为一个旧的值来重读以前的消息。
能够看到这种设计对消费者来讲操做自如, 一个消费者的操做不会影响其它消费者对此log的处理。 再说说分区。Kafka中采用分区的设计有几个目的。一是能够处理更多的消息,不受单台服务器的限制。Topic拥有多个分区意味着它能够不受限的处理更多的数据。第二,分区能够做为并行处理的单元,稍后会谈到这一点。
1.6 分布式(Distribution)
每一个分区在Kafka集群的若干服务中都有副本,这样这些持有副本的服务能够共同处理数据和请求,副本数量是能够配置的。副本使Kafka具有了容错能力。
每一个分区都由一个服务器做为“leader”,零或若干服务器做为“followers”,leader负责处理消息的读和写,followers则去复制leader.若是leader down了,followers中的一台则会自动成为leader。集群中的每一个服务都会同时扮演两个角色:做为它所持有的一部分分区的leader,同时做为其余分区的followers,这样集群就会据有较好的负载均衡。
Log的分区被分布到集群中的多个服务器上。每一个服务器处理它分到的分区。根据配置每一个分区还能够复制到其它服务器做为备份容错。 每一个分区有一个leader,零或多个follower。Leader处理此分区的全部的读写请求,而follower被动的复制数据。若是leader宕机,其它的一个follower会被推举为新的leader。 一台服务器可能同时是一个分区的leader,另外一个分区的follower。 这样能够平衡负载,避免全部的请求都只让一台或者某几台服务器处理。
Producer将消息发布到它指定的topic中,并负责决定发布到哪一个分区。一般简单的由负载均衡机制随机选择分区,但也能够经过特定的分区函数选择分区。使用的更多的是第二种。
发布消息一般有两种模式:队列模式(queuing)和发布-订阅模式(publish-subscribe)。队列模式中,consumers能够同时从服务端读取消息,每一个消息只被其中一个consumer读到;发布-订阅模式中消息被广播到全部的consumer中。Consumers能够加入一个consumer 组,共同竞争一个topic,topic中的消息将被分发到组中的一个成员中。同一组中的consumer能够在不一样的程序中,也能够在不一样的机器上。若是全部的consumer都在一个组中,这就成为了传统的队列模式,在各consumer中实现负载均衡。若是全部的consumer都不在不一样的组中,这就成为了发布-订阅模式,全部的消息都被分发到全部的consumer中。更常见的是,每一个topic都有若干数量的consumer组,每一个组都是一个逻辑上的“订阅者”,为了容错和更好的稳定性,每一个组由若干consumer组成。这其实就是一个发布-订阅模式,只不过订阅者是个组而不是单个consumer。
由两个机器组成的集群拥有4个分区 (P0-P3) 2个consumer组. A组有两个consumerB组有4个
相比传统的消息系统,Kafka能够很好的保证有序性。
传统的队列在服务器上保存有序的消息,若是多个consumers同时从这个服务器消费消息,服务器就会以消息存储的顺序向consumer分发消息。虽然服务器按顺序发布消息,可是消息是被异步的分发到各consumer上,因此当消息到达时可能已经失去了原来的顺序,这意味着并发消费将致使顺序错乱。为了不故障,这样的消息系统一般使用“专用consumer”的概念,其实就是只容许一个消费者消费消息,固然这就意味着失去了并发性。
在这方面Kafka作的更好,经过分区的概念,Kafka能够在多个consumer组并发的状况下提供较好的有序性和负载均衡。将每一个分区分只分发给一个consumer组,这样一个分区就只被这个组的一个consumer消费,就能够顺序的消费这个分区的消息。由于有多个分区,依然能够在多个consumer组之间进行负载均衡。注意consumer组的数量不能多于分区的数量,也就是有多少分区就容许多少并发消费。
Kafka只能保证一个分区以内消息的有序性,在不一样的分区之间是不能够的,这已经能够知足大部分应用的需求。若是须要topic中全部消息的有序性,那就只能让这个topic只有一个分区,固然也就只有一个consumer组消费它。
在深刻学习Kafka以前,须要先了解topics, brokers, producers和consumers等几个主要术语。 下面说明了主要术语的详细描述和组件。
在上图中,主题(topic)被配置为三个分区。 分区1(Partition 1)具备两个偏移因子0
和1
。分区2(Partition 2)具备四个偏移因子0
,1
,2
和3
,分区3(Partition 3)具备一个偏移因子0
。replica 的id与托管它的服务器的id相同。
假设,若是该主题的复制因子设置为3
,则Kafka将为每一个分区建立3个相同的副本,并将它们放入群集中以使其可用于其全部操做。 为了平衡集群中的负载,每一个代理存储一个或多个这些分区。 多个生产者和消费者能够同时发布和检索消息。
Brokers
N
个代理中有N
个分区,则每一个代理将有一个分区。Kafka Cluster - Kafka拥有多个经纪人称为Kafka集群。 Kafka集群能够在无需停机的状况下进行扩展。 这些集群用于管理消息数据的持久性和复制。
经过上面介绍的咱们能够知道,kafka中的数据是持久化的而且可以容错的。Kafka容许用户为每一个topic设置副本数量,副本数量决定了有几个broker来存放写入的数据。若是你的副本数量设置为3,那么一份数据就会被存放在3台不一样的机器上,那么就容许有2个机器失败。通常推荐副本数量至少为2,这样就能够保证增减、重启机器时不会影响到数据消费。若是对数据持久化有更高的要求,能够把副本数量设置为3或者更多。
Kafka中的topic是以partition的形式存放的,每个topic均可以设置它的partition数量,Partition的数量决定了组成topic的log的数量。Producer在生产数据时,会按照必定规则(这个规则是能够自定义的)把消息发布到topic的各个partition中。上面将的副本都是以partition为单位的,不过只有一个partition的副本会被选举成leader做为读写用。
关于如何设置partition值须要考虑的因素。一个partition只能被一个消费者消费(一个消费者能够同时消费多个partition),所以,若是设置的partition的数量小于consumer的数量,就会有消费者消费不到数据。因此,推荐partition的数量必定要大于同时运行的consumer的数量。另一方面,建议partition的数量大于集群broker的数量,这样leader partition就能够均匀的分布在各个broker中,最终使得集群负载均衡。在Cloudera,每一个topic都有上百个partition。须要注意的是,kafka须要为每一个partition分配一些内存来缓存消息数据,若是partition数量越大,就要为kafka分配更大的heap space。
Producers直接发送消息到broker上的leader partition,不须要通过任何中介一系列的路由转发。为了实现这个特性,kafka集群中的每一个broker均可以响应producer的请求,并返回topic的一些元信息,这些元信息包括哪些机器是存活的,topic的leader partition都在哪,现阶段哪些leader partition是能够直接被访问的。
Producer客户端本身控制着消息被推送到哪些partition。实现的方式能够是随机分配、实现一类随机负载均衡算法,或者指定一些分区算法。Kafka提供了接口供用户实现自定义的分区,用户能够为每一个消息指定一个partitionKey,经过这个key来实现一些hash分区算法。好比,把userid做为partitionkey的话,相同userid的消息将会被推送到同一个分区。
以Batch的方式推送数据能够极大的提升处理效率,kafka Producer 能够将消息在内存中累计到必定数量后做为一个batch发送请求。Batch的数量大小能够经过Producer的参数控制,参数值能够设置为累计的消息的数量(如500条)、累计的时间间隔(如100ms)或者累计的数据大小(64KB)。经过增长batch的大小,能够减小网络请求和磁盘IO的次数,固然具体参数设置须要在效率和时效性方面作一个权衡。
Producers能够异步的并行的向kafka发送消息,可是一般producer在发送完消息以后会获得一个future响应,返回的是offset值或者发送过程当中遇到的错误。这其中有个很是重要的参数“acks”,这个参数决定了producer要求leader partition 收到确认的副本个数,若是acks设置数量为0,表示producer不会等待broker的响应,因此,producer没法知道消息是否发送成功,这样有可能会致使数据丢失,但同时,acks值为0会获得最大的系统吞吐量。
若acks设置为1,表示producer会在leader partition收到消息时获得broker的一个确认,这样会有更好的可靠性,由于客户端会等待直到broker确认收到消息。若设置为-1,producer会在全部备份的partition收到消息时获得broker的确认,这个设置能够获得最高的可靠性保证。
Kafka 消息有一个定长的header和变长的字节数组组成。由于kafka消息支持字节数组,也就使得kafka能够支持任何用户自定义的序列号格式或者其它已有的格式如Apache Avro、protobuf等。Kafka没有限定单个消息的大小,但咱们推荐消息大小不要超过1MB,一般通常消息大小都在1~10kB以前。
咱们上面已经知道了Kafka支持以集合(batch)为单位发送消息,在此基础上,Kafka还支持对消息集合进行压缩,Producer端能够经过GZIP或Snappy格式对消息集合进行压缩。Producer端进行压缩以后,在Consumer端需进行解压。压缩的好处就是减小传输的数据量,减轻对网络传输的压力,在对大数据处理上,瓶颈每每体如今网络上而不是CPU(压缩和解压会耗掉部分CPU资源)。
那么如何区分消息是压缩的仍是未压缩的呢,Kafka在消息头部添加了一个描述压缩属性字节,这个字节的后两位表示消息的压缩采用的编码,若是后两位为0,则表示消息未被压缩。
在消息系统中,保证消息在生产和消费过程当中的可靠性是十分重要的,在实际消息传递过程当中,可能会出现以下三中状况:
有许多系统声称它们实现了exactly-once,可是它们其实忽略了生产者或消费者在生产和消费过程当中有可能失败的状况。好比虽然一个Producer成功发送一个消息,可是消息在发送途中丢失,或者成功发送到broker,也被consumer成功取走,可是这个consumer在处理取过来的消息时失败了。
从Producer端看:Kafka是这么处理的,当一个消息被发送后,Producer会等待broker成功接收到消息的反馈(可经过参数控制等待时间),若是消息在途中丢失或是其中一个broker挂掉,Producer会从新发送(咱们知道Kafka有备份机制,能够经过参数控制是否等待全部备份节点都收到消息)。
从Consumer端看:前面讲到过partition,broker端记录了partition中的一个offset值,这个值指向Consumer下一个即将消费message。当Consumer收到了消息,但却在处理过程当中挂掉,此时Consumer能够经过这个offset值从新找到上一个消息再进行处理。Consumer还有权限控制这个offset值,对持久化到broker端的消息作任意处理。
备份机制是Kafka0.8版本的新特性,备份机制的出现大大提升了Kafka集群的可靠性、稳定性。有了备份机制后,Kafka容许集群中的节点挂掉后而不影响整个集群工做。一个备份数量为n的集群容许n-1个节点失败。在全部备份节点中,有一个节点做为lead节点,这个节点保存了其它备份节点列表,并维持各个备份间的状体同步。下面这幅图解释了Kafka的备份机制:
Kafka高度依赖文件系统来存储和缓存消息,通常的人认为磁盘是缓慢的,这致使人们对持久化结构具备竞争性持怀疑态度。其实,磁盘远比你想象的要快或者慢,这决定于咱们如何使用磁盘。
一个和磁盘性能有关的关键事实是:磁盘驱动器的吞吐量跟寻到延迟是相背离的,也就是所,线性写的速度远远大于随机写。好比:在一个6 7200rpm SATA RAID-5 的磁盘阵列上线性写的速度大概是600M/秒,可是随机写的速度只有100K/秒,二者相差将近6000倍。线性读写在大多数应用场景下是能够预测的,所以,操做系统利用read-ahead和write-behind技术来从大的数据块中预取数据,或者将多个逻辑上的写操做组合成一个大写物理写操做中。更多的讨论能够在ACMQueueArtical中找到,他们发现,对磁盘的线性读在有些状况下能够比内存的随机访问要快一些。
为了补偿这个性能上的分歧,现代操做系统都会把空闲的内存用做磁盘缓存,尽管在内存回收的时候会有一点性能上的代价。全部的磁盘读写操做会在这个统一的缓存上进行。
此外,若是咱们是在JVM的基础上构建的,熟悉java内存应用管理的人应该清楚如下两件事情:
基于这些事实,利用文件系统而且依靠页缓存比维护一个内存缓存或者其余结构要好——咱们至少要使得可用的缓存加倍,经过自动访问可用内存,而且经过存储更紧凑的字节结构而不是一个对象,这将有可能再次加倍。这么作的结果就是在一台32GB的机器上,若是不考虑GC惩罚,将最多有28-30GB的缓存。此外,这些缓存将会一直存在即便服务重启,然而进程内缓存须要在内存中重构(10GB缓存须要花费10分钟)或者它须要一个彻底冷缓存启动(很是差的初始化性能)。它同时也简化了代码,由于如今全部的维护缓存和文件系统之间内聚的逻辑都在操做系统内部了,这使得这样作比one-off in-process attempts更加高效与准确。若是你的磁盘应用更加倾向于顺序读取,那么read-ahead在每次磁盘读取中实际上获取到这人缓存中的有用数据。
以上这些建议了一个简单的设计:不一样于维护尽量多的内存缓存而且在须要的时候刷新到文件系统中,咱们换一种思路。全部的数据不须要调用刷新程序,而是马上将它写到一个持久化的日志中。事实上,这仅仅意味着,数据将被传输到内核页缓存中并稍后被刷新。咱们能够增长一个配置项以让系统的用户来控制数据在何时被刷新到物理硬盘上。
消息系统中持久化数据结构的设计一般是维护者一个和消费队列有关的B树或者其它可以随机存取结构的元数据信息。B树是一个很好的结构,能够用在事务型与非事务型的语义中。可是它须要一个很高的花费,尽管B树的操做须要O(logN)。一般状况下,这被认为与常数时间等价,但这对磁盘操做来讲是不对的。磁盘寻道一次须要10ms,而且一次只能寻一个,所以并行化是受限的。
直觉上来说,一个持久化的队列能够构建在对一个文件的读和追加上,就像通常状况下的日志解决方案。尽管和B树相比,这种结构不能支持丰富的语义,可是它有一个优势,全部的操做都是常数时间,而且读写之间不会相互阻塞。这种设计具备极大的性能优点:最终系统性能和数据大小彻底无关,服务器能够充分利用廉价的硬盘来提供高效的消息服务。
事实上还有一点,磁盘空间的无限增大而不影响性能这点,意味着咱们能够提供通常消息系统没法提供的特性。好比说,消息被消费后不是立马被删除,咱们能够将这些消息保留一段相对比较长的时间(好比一个星期)。
咱们已经为效率作了很是多的努力。可是有一种很是主要的应用场景是:处理Web活动数据,它的特色是数据量很是大,每一次的网页浏览都会产生大量的写操做。更进一步,咱们假设每个被发布的消息都会被至少一个consumer消费,所以咱们更要怒路让消费变得更廉价。
经过上面的介绍,咱们已经解决了磁盘方面的效率问题,除此以外,在此类系统中还有两类比较低效的场景:
为了减小大量小I/O操做的问题,kafka的协议是围绕消息集合构建的。Producer一次网络请求能够发送一个消息集合,而不是每一次只发一条消息。在server端是以消息块的形式追加消息到log中的,consumer在查询的时候也是一次查询大量的线性数据块。消息集合即MessageSet,实现自己是一个很是简单的API,它将一个字节数组或者文件进行打包。因此对消息的处理,这里没有分开的序列化和反序列化的上步骤,消息的字段能够按需反序列化(若是没有须要,能够不用反序列化)。
另外一个影响效率的问题就是字节拷贝。为了解决字节拷贝的问题,kafka设计了一种“标准字节消息”,Producer、Broker、Consumer共享这一种消息格式。Kakfa的message log在broker端就是一些目录文件,这些日志文件都是MessageSet按照这种“标准字节消息”格式写入到磁盘的。
维持这种通用的格式对这些操做的优化尤其重要:持久化log 块的网络传输。流行的unix操做系统提供了一种很是高效的途径来实现页面缓存和socket之间的数据传递。在Linux操做系统中,这种方式被称做:sendfile system call(Java提供了访问这个系统调用的方法:FileChannel.transferTo api)。
为了理解sendfile的影响,须要理解通常的将数据从文件传到socket的路径:
这种操做方式明显是很是低效的,这里有四次拷贝,两次系统调用。若是使用sendfile,就能够避免两次拷贝:操做系统将数据直接从页缓存发送到网络上。因此在这个优化的路径中,只有最后一步将数据拷贝到网卡缓存中是须要的。
咱们指望一个主题上有多个消费者是一种常见的应用场景。利用上述的zero-copy,数据只被拷贝到页缓存一次,而后就能够在每次消费时被重得利用,而不须要将数据存在内存中,而后在每次读的时候拷贝到内核空间中。这使得消息消费速度能够达到网络链接的速度。这样以来,经过页面缓存和sendfile的结合使用,整个kafka集群几乎都已以缓存的方式提供服务,并且即便下游的consumer不少,也不会对整个集群服务形成压力。
关于sendfile和zero-copy,请参考:zero-copy
为了提升性能,推荐采用专用的服务器来部署kafka集群,尽可能与hadoop集群分开,由于kafka依赖磁盘读写和大的页面缓存,若是和hadoop共享节点的话会影响其使用页面缓存的性能。
Kafka集群的大小须要根据硬件的配置、生产者消费者的并发数量、数据的副本个数、数据的保存时长综合肯定。
磁盘的吞吐量尤其重要,由于一般kafka的瓶颈就在磁盘上。
Kafka依赖于zookeeper,建议采用专用服务器来部署zookeeper集群,zookeeper集群的节点采用偶数个,通常建议用三、五、7个。注意zookeeper集群越大其读写性能越慢,由于zookeeper须要在节点之间同步数据。一个3节点的zookeeper集群容许一个节点失败,一个5节点集群容许2个几点失败。
有不少因素决定着kafka集群须要具有存储能力的大小,最准确的衡量办法就是模拟负载来测算一下,Kafka自己也提供了负载测试的工具。
若是不想经过模拟实验来评估集群大小,最好的办法就是根据硬盘的空间需求来推算。下面我就根据网络和磁盘吞吐量需求来作一下估算。
咱们作以下假设:
通常的来讲,kafka集群瓶颈在于网络和磁盘吞吐量,因此咱们先评估一下集群的网络和磁盘需求。
对于每条消息,每一个副本都要写一遍,因此总体写的速度是W*R。读数据的部分主要是集群内部各个副本从leader同步消息读和集群外部的consumer读,因此集群内部读的速率是(R-1)*W,同时,外部consumer读的速度是C*W,所以:
须要注意的是,咱们能够在读的时候缓存部分数据来减小IO操做,若是一个集群有M MB内存,写的速度是W MB/sec,则容许M/(W*R) 秒的写能够被缓存。若是集群有32GB内存,写的速度是50MB/s的话,则能够至少缓存10分钟的数据。
2、kafka 安装
2.1 jdk安装
#以oracle jdk为例,下载地址http://java.sun.com/javase/downloads/index.jsp
yum -y install jdk-8u141-linux-x64.rpm
2.2 安装zookeeper
wget http://apache.forsale.plus/zookeeper/zookeeper-3.4.9/zookeeper-3.4.9.tar.gz tar zxf zookeeper-3.4.9.tar.gz mv zookeeper-3.4.9 /data/zk
修改配置文件内容以下所示:
[root@localhost ~]# cat /data/zk/conf/zoo.cfg tickTime=2000 initLimit=10 syncLimit=5 dataDir=/data/zk/data/zookeeper dataLogDir=/data/zk/data/logs clientPort=2181 maxClientCnxns=60 autopurge.snapRetainCount=3 autopurge.purgeInterval=1 server.1=zk01:2888:3888 server.2=zk02:2888:3888 server.3=zk03:2888:3888
参数说明:
server.id=host:port:port:表示了不一样的zookeeper服务器的自身标识,做为集群的一部分,每一台服务器应该知道其余服务器的信息。用户能够从“server.id=host:port:port” 中读取到相关信息。在服务器的data(dataDir参数所指定的目录)下建立一个文件名为myid的文件,这个
文件的内容只有一行,指定的是自身的id值。好比,服务器“1”应该在myid文件中写入“1”。这个id必须在集群环境中服务器标识中是惟一的,且大小在1~255之间。这同样配置中,zoo1表明第一台服务器的IP地址。第一个端口号(port)是从follower链接到leader机器的
端口,第二个端口是用来进行leader选举时所用的端口。因此,在集群配置过程当中有三个很是重要的端口:clientPort:218一、port:288八、port:3888。
关于zoo.cfg配置文件说明,参考链接https://zookeeper.apache.org/doc/r3.4.10/zookeeperAdmin.html#sc_configuration;
若是想更换日志输出位置,除了在zoo.cfg加入"dataLogDir=/data/zk/data/logs"外,还须要修改zkServer.sh文件,大概修改方式地方在125行左右,内容以下:
125 ZOO_LOG_DIR="$($GREP "^[[:space:]]*dataLogDir" "$ZOOCFG" | sed -e 's/.*=//')" 126 if [ ! -w "$ZOO_LOG_DIR" ] ; then 127 mkdir -p "$ZOO_LOG_DIR" 128 fi
在启动服务以前,还须要分别在zookeeper建立myid,方式以下:
echo 1 > /data/zk/data/zookeeper/myid
启动服务
/data/zk/bin/zkServer.sh start
验证服务
### 查看相关端口号[root@localhost ~]# ss -lnpt|grep java LISTEN 0 50 :::34442 :::* users:(("java",pid=2984,fd=18)) LISTEN 0 50 ::ffff:192.168.15.133:3888 :::* users:(("java",pid=2984,fd=26)) LISTEN 0 50 :::2181 :::* users:(("java",pid=2984,fd=25))###查看zookeeper服务状态 root@localhost ~]# /data/zk/bin/zkServer.sh status ZooKeeper JMX enabled by default Using config: /data/zk/bin/../conf/zoo.cfgMode: follower
zookeeper相关命令说明,参考https://zookeeper.apache.org/doc/r3.4.10/zookeeperStarted.html (文末有说明);
2.3 安装kafka
tar zxf kafka_2.11-0.11.0.0.tgz
mv kafka_2.11-0.11.0.0 /data/kafka
修改配置
[root@localhost ~]# grep -Ev "^#|^$" /data/kafka/config/server.properties broker.id=0 delete.topic.enable=true listeners=PLAINTEXT://192.168.15.131:9092 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/data/kafka/data num.partitions=1 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.flush.interval.messages=10000 log.flush.interval.ms=1000 log.retention.hours=168 log.retention.bytes=1073741824 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connect=192.168.15.131:2181,192.168.15.132:2181,192.168.15.133:2181 zookeeper.connection.timeout.ms=6000 group.initial.rebalance.delay.ms=0
提示:其余主机将该机器的kafka目录拷贝便可,而后须要修改broker.id、listeners地址。有关kafka配置文件参数,参考:http://orchome.com/12;
启动服务
/data/kafka/bin/kafka-server-start.sh /data/kafka/config/server.properties
验证服务
### 随便在其中一台主机执行 /data/kafka/bin/kafka-topics.sh --create --zookeeper 192.168.15.131:2181,192.168.15.132:2181,192.168.15.133:2181 --replication-factor 1 --partitions 1 --topic test ###在其余主机查看 /data/kafka/bin/kafka-topics.sh --list --zookeeper 192.168.15.131:2181,192.168.15.132:2181,192.168.15.133:2181
参考:Kafka基本原理