kafka教程

1、理论介绍
(一)相关资料
一、官方资料,很是详细:
   http://kafka.apache.org/documentation.html#quickstart
二、有一篇翻译版,基本一致,有些细节不一样,建议入门时先读此文,再读官方文档。若自认英语很强,请忽视:
   http://www.linuxidc.com/Linux/2014-07/104470.htm
三、还有一文也能够:
http://www.sxt.cn/info-2871-u-324.html
其主要内容来源于如下三篇文章:
日志:每一个软件工程师都应该知道的有关实时数据的统一律念 —— 这篇比较抽象,高屋建瓴,理论先行
Building LinkedIn’s Real-time Activity Data Pipeline —— 实践层的论文,把作事情的来龙去脉都写明白了
分布式发布订阅消息系统 Kafka 架构设计 —— 落地设计

(二)kafka是什么?

 一、Kafka is a distributed, partitioned, replicated commit log service. It provides the functionality of a messaging system, but with a unique design.
Kafka是一个 分布式的、可分区的、可复制的消息系统。它提供了普通消息系统的功能,但具备本身独特的设计。

二、能够简单的理解为:kafka是一个日志集群,各类各样的服务器将它们自身的日志发送到集群中进行统一汇总和存储,而后其它机器从集群中拉取消息进行分析处理,如ELT、数据挖掘等。

三、kafka提供了JAVA API,同时对多种语言都提供了支持。

(三)基本的架构
首先让咱们看几个基本的消息系统术语:
Kafka将消息以topic为单位进行概括。
将向Kafka topic发布消息的程序称为producers.
将预订topics并消费消息的程序称为consumer.
Kafka以集群的方式运行,能够由一个或多个服务组成,每一个服务叫作一个broker.
producers经过网络将消息发送到Kafka集群,集群向consumers提供消息,以下图所示:

(四)分区与副本
一、一个topic是对一组消息的概括。对每一个topic,Kafka 对它的日志进行了分区,以下图所示:

 
二、通常而言,一个topic会有多个分区,每一个分区会有多个副本。
分区是分了将一个topic分到多个地方存储,提升并行处理的能力。副本是为了容错,保证数据不丢失。

三、对于每个分区,都会选取一个leader,这个分区的全部读取都在这个leader中进行,而其它副本会同步leader中的数据,且只作备份。
即leader只是针对一个分区而言,而非整个集群。一个服务器对于某个分区是leader,对于其它分区多是follower。

四、 Producer将消息发布到它指定的topic中,并负责决定发布到哪一个分区。一般简单的由负载均衡机制随机选择分区,但也能够经过特定的分区函数选择分区。使用的更多的是第二种。

五、发布消息一般有两种模式:队列模式(queuing)和发布-订阅模式(publish-subscribe)。队列模式中,consumers能够同时从服务端读取消息,每一个消息只被其中一个consumer读到;发布-订阅模式中消息被广播到全部的consumer中。

Consumers能够加入一个consumer 组,共同竞争一个topic,topic中的消息将被分发到组中的一个成员中。同一组中的consumer能够在不一样的程序中,也能够在不一样的机器上。若是全部的consumer都在一个组中,这就成为了传统的队列模式,在各consumer中实现负载均衡。

若是全部的consumer都不在不一样的组中,这就成为了发布-订阅模式,全部的消息都被分发到全部的consumer中。

更常见的是,每一个topic都有若干数量的consumer组,每一个组都是一个逻辑上的“订阅者”,为了容错和更好的稳定性,每一个组由若干consumer组成。这其实就是一个发布-订阅模式,只不过订阅者是个组而不是单个consumer。

由两个机器组成的集群拥有4个分区 (P0-P3) 2个consumer组. A组有两个consumerB组有4个
 
相比传统的消息系统,Kafka能够很好的保证有序性。
传统的队列在服务器上保存有序的消息,若是多个consumers同时从这个服务器消费消息,服务器就会以消息存储的顺序向consumer分 发消息。虽然服务器按顺序发布消息,可是消息是被异步的分发到各consumer上,因此当消息到达时可能已经失去了原来的顺序,这意味着并发消费将致使 顺序错乱。为了不故障,这样的消息系统一般使用“专用consumer”的概念,其实就是只容许一个消费者消费消息,固然这就意味着失去了并发性。
 
在这方面Kafka作的更好,经过分区的概念,Kafka能够在多个consumer组并发的状况下提供较好的有序性和负载均衡。将每一个分区分 只分发给一个consumer组,这样一个分区就只被这个组的一个consumer消费,就能够顺序的消费这个分区的消息。由于有多个分区,依然能够在多 个consumer组之间进行负载均衡。注意consumer组的数量不能多于分区的数量,也就是有多少分区就容许多少并发消费。
 
Kafka只能保证一个分区以内消息的有序性,在不一样的分区之间是不能够的,这已经能够知足大部分应用的需求。若是须要topic中全部消息的有序性,那就只能让这个topic只有一个分区,固然也就只有一个consumer组消费它。

2、安装部署
(一)单机版安装

Step 1: 下载Kafka

下载最新的版本并解压.

$ wget http://mirrors.cnnic.cn/apache/kafka/0.8.2.1/kafka_2.10-0.8.2.1.tgz
$ tar -zxvf kafka_2.10-0.8.2.1.tgz
Step 2: 启动服务

Kafka用到了Zookeeper,全部首先启动Zookper,下面简单的启用一个单实例的Zookkeeper服务。能够在命令的结尾加个&符号,这样就能够启动后离开控制台。
> bin/zookeeper-server-start.sh config/zookeeper.properties &
...
如今启动Kafka:
> bin/kafka-server-start.sh config/server.properties

Step 3: 建立 topic

建立一个叫作“test”的topic,它只有一个分区,一个副本。
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
[2015-06-04 13:17:13,943] WARN Connected to an old server; r-o mode will be unavailable (org.apache.zookeeper.ClientCnxnSocket)
Created topic "test".
能够经过list命令查看建立的topic:  
> bin/kafka-topics.sh --list --zookeeper localhost:2181
test
除了手动建立topic,还能够配置broker让它自动建立topic.
Step 4:发送消息.

Kafka 使用一个简单的命令行producer,从文件中或者从标准输入中读取消息并发送到服务端。默认的每条命令将发送一条消息。     

运行producer并在控制台中输一些消息,这些消息将被发送到服务端:
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a messageThis is another message
ctrl+c能够退出发送。
默认状况下,日志数据会被放置到/tmp/kafka-logs中,每一个分区一个目录
Step 5: 启动consumer

Kafka also has a command line consumer that will dump out messages to standard output.
Kafka也有一个命令行consumer能够读取消息并输出到标准输出:
> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
This is a message
This is another message
你在一个终端中运行consumer命令行,另外一个终端中运行producer命令行,就能够在一个终端输入消息,另外一个终端读取消息。
这两个命令都有本身的可选参数,能够在运行的时候不加任何参数能够看到帮助信息。



(二)集群安装

注意,必须先搭建zookeeper集群,搭建方法请见????

一、使用3台机器搭建Kafka集群:

192.168.169.92 gdc-dn01-test
192.168.169.93 gdc-dn02-test
192.168.169.94 gdc-dn03-test

二、在安装Kafka集群以前,这里没有使用Kafka自带的Zookeeper,而是独立安装了一个Zookeeper集群,也是使用这3台机器,保证Zookeeper集群正常运行。
三、首先,在gdc-dn01-test上准备Kafka安装文件,执行以下命令:

cd

wget http://mirrors.cnnic.cn/apache/kafka/0.8.2.1/kafka_2.10-0.8.2.1.tgz

tar xvzf kafka_2.10-0.8.2.1.tgz

mv kafka_2.10-0.8.2.1 kafka


四、修改配置文件kafka/config/server.properties,修改以下内容:

broker.id=0

zookeeper.connect=192.168.169.91:2181,192.168.169.92:2181,192.168.169.93:2181/kafka
这里须要说明的是,默认Kafka会使用ZooKeeper默认的/路径,这样有关Kafka的ZooKeeper配置就会散落在根路径下面,若是 你有其余的应用也在使用ZooKeeper集群,查看ZooKeeper中数据可能会不直观,因此强烈建议指定一个chroot路径,直接在 zookeeper.connect配置项中指定:


zookeeper.connect=192.168.169.91:2181,192.168.169.92:2181,192.168.169.93:2181/kafka
并且,须要手动在ZooKeeper中建立路径/kafka,使用以下命令链接到任意一台ZooKeeper服务器:


cd ~/zookeeper

bin/zkCli.sh
在ZooKeeper执行以下命令建立chroot路径:


create /kafka ''
这样,每次链接Kafka集群的时候(使用--zookeeper选项),也必须使用带chroot路径的链接字符串,后面会看到。


五、而后,将配置好的安装文件同步到其余的dn0二、dn03节点上:

scp -r /usr/local/kafka_2.10-0.8.2.1/ 192.168.169.92:/home/hadoop

scp -r /usr/local/kafka_2.10-0.8.2.1/ 192.168.169.93:/home/hadoop

六、最后,在dn0二、dn03节点上配置修改配置文件kafka/config/server.properties内容以下所示:


broker.id=1  # 在dn02修改

broker.id=2  # 在dn03修改
由于Kafka集群须要保证各个Broker的id在整个集群中必须惟一,须要调整这个配置项的值(若是在单机上,能够经过创建多个Broker进程来模拟分布式的Kafka集群,也须要Broker的id惟一,还须要修改一些配置目录的信息)。
七、在集群中的dn0一、dn0二、dn03这三个节点上分别启动Kafka,分别执行以下命令:


bin/kafka-server-start.sh config/server.properties &
能够经过查看日志,或者检查进程状态,保证Kafka集群启动成功。
八、建立一个名称为my-replicated-topic5的Topic,5个分区,而且复制因子为3,执行以下命令:


bin/kafka-topics.sh --create --zookeeper 192.168.169.91:2181,192.168.169.92:2181,192.168.169.93:2181/kafka --replication-factor 3 --partitions 5 --topic my-replicated-topic5
九、查看建立的Topic,执行以下命令:


bin/kafka-topics.sh --describe --zookeeper 192.168.169.91:2181,192.168.169.92:2181,192.168.169.93:2181/kafka --topic my-replicated-topic5
结果信息以下所示:

Topic:my-replicated-topic5  PartitionCount:5  ReplicationFactor:3  Configs:
 Topic: my-replicated-topic5  Partition: 0  Leader: 2   Replicas: 2,0,1  Isr: 2,0,1
 Topic: my-replicated-topic5  Partition: 1  Leader: 0  Replicas: 0,1,2   Isr: 0,1,2
 Topic: my-replicated-topic5  Partition: 2  Leader: 1  Replicas: 1,2,0  Isr: 1,2,0
 Topic: my-replicated-topic5  Partition: 3  Leader: 2   Replicas: 2,1,0  Isr: 2,1,0
 Topic: my-replicated-topic5  Partition: 4  Leader: 0  Replicas: 0,2,1  Isr: 0,2,1
上面Leader、Replicas、Isr的含义以下:

1    Partition: 分区
2    Leader   : 负责读写指定分区的节点
3    Replicas : 复制该分区log的节点列表
4    Isr      : "in-sync" replicas,当前活跃的副本列表(是一个子集),而且可能成为Leader
咱们能够经过Kafka自带的bin/kafka-console-producer.sh和bin/kafka-console-consumer.sh脚本,来验证演示若是发布消息、消费消息。
十一、在一个终端,启动Producer,并向咱们上面建立的名称为my-replicated-topic5的Topic中生产消息,执行以下脚本:
bin/kafka-console-producer.sh --broker-list 192.168.169.92:9092, 192.168.169.93:9092, 192.168.169.94:9092 --topic my-replicated-topic5

十二、在另外一个终端,启动Consumer,并订阅咱们上面建立的名称为my-replicated-topic5的Topic中生产的消息,执行以下脚本:

bin/kafka-console-consumer.sh --zookeeper 192.168.169.91:2181,192.168.169.92:2181,192.168.169.93:2181/kafka --from-beginning --topic my-replicated-topic5
能够在Producer终端上输入字符串消息行,就能够在Consumer终端上看到消费者消费的消息内容。
也能够参考Kafka的Producer和Consumer的Java API,经过API编码的方式来实现消息生产和消费的处理逻辑。

3、配置文件

所有配置参数请 见http://kafka.apache.org/documentation.html#consumerconfigs

及http://blog.csdn.net/jinhong_lu/article/details/46518613

(一)重要配置参数
0、JVM配置 在bin/kafka-server-start.sh中添加如下内容:
export KAFKA_HEAP_OPTS="-Xms4g -Xmx4g -XX:PermSize=48m -XX:MaxPermSize=48m -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35"
一、broker.id=0 整数,建议根据ip区分,用于区分broker,确保每台机器不一样

二、log.dirs=/home/data/kafka  kafka用于放置消息的目录,默认为/tmp/kafka-logs

三、zookeeper.connect=192.168.169.91:2181,192.168.169.92:2181,192.168.169.93:2181/kafka zk用于放置kafka信息的地方

四、num.partitions=1 建立topic时,默认的分区数

五、num.network.threads=10  broker用于处理网络请求的线程数,如不配置默认为3

六、zookeeper.connection.timeout.ms=6000

七、message.max.bytes=1000000000

replica.fetch.max.bytes=1073741824

一条消息的最大字节数,说明以下:

kafka中出现如下异常:
[2015-06-09 17:03:05,094] ERROR [KafkaApi-0] Error processing ProducerRequest with correlation id 616 from client kafka-client on partition [test3,0] (kafka.server.KafkaApis)
kafka.common.MessageSizeTooLargeException: Message size is 2211366 bytes which exceeds the maximum configured message size of 1000012.
缘由是集群默认每次只能接受约1M的消息,若是客户端一次发送的消息大于这个数值则会致使异常。
在server.properties中添加如下参数
message.max.bytes=1000000000
replica.fetch.max.bytes=1073741824
同时在consumer.properties中添加如下参数:
fetch.message.max.bytes=1073741824
而后重启kafka进程便可,如今每次最大可接收100M的消息。
八、delete.topic.enable=true  默认为false,即delete topic时只是marked for deletion,但并不会真正删除topic。

九、关于日志的保存时间或量:
(1)log.retention.hours=24  消息被删除前保存多少小时,默认1周168小时

(2)log.retention.bytes 默认为-1,即不限制大小。注意此外的大小是指一个topic的一个分区的最大字节数。
当超出上述2个限制的任何一个时,日志均会被删除。

十、同步发送仍是异步发送,异步吞吐量较大,但可能引入错误,默认为sync
producer.type=sync|async
This parameter specifies whether the messages are sent asynchronously in a background thread. Valid values are (1) async for asynchronous send and (2) sync for synchronous send. By setting the producer to async we allow batching together of requests (which is great for throughput) but open the possibility of a failure of the client machine dropping unsent data.

十一、batch.size 默认值为16384
在async模式下,producer缓存多少个消息后再一块儿发送

十二、compression.type 默认值为none,可选gzip snappy
The compression type for all data generated by the producer. The default is none (i.e. no compression). Valid values are none, gzip, or snappy. Compression is of full batches of data, so the efficacy of batching will also impact the compression ratio (more batching means better compression).html

1三、default.replication.factor 消息副本的数量,默认为1,即没有副本java

 

如下配置说明来自网上转载:node

 

每一个kafka broker中配置文件server.properties默认必须配置的属性以下:转载请注明来自:http://blog.csdn.net/lizhitao/article/details/25667831linux

[java]  view plain copy 在CODE上查看代码片 派生到个人代码片
  1. broker.id=0  
  2. num.network.threads=2  
  3. num.io.threads=8  
  4. socket.send.buffer.bytes=1048576  
  5. socket.receive.buffer.bytes=1048576  
  6. socket.request.max.bytes=104857600  
  7. log.dirs=/tmp/kafka-logs  
  8. num.partitions=2  
  9. log.retention.hours=168  
  10.   
  11. log.segment.bytes=536870912  
  12. log.retention.check.interval.ms=60000  
  13. log.cleaner.enable=false  
  14.   
  15. zookeeper.connect=localhost:2181  
  16. zookeeper.connection.timeout.ms=1000000  

server.properties中全部配置参数说明(解释)以下列表:

参数apache

说明(解释)缓存

broker.id =0服务器

每个broker在集群中的惟一表示,要求是正数。当该服务器的IP地址发生改变时,broker.id没有变化,则不会影响consumers的消息状况网络

log.dirs=/data/kafka-logssession

kafka数据的存放地址,多个地址的话用逗号分割,多个目录分布在不一样磁盘上能够提升读写性能  /data/kafka-logs-1/data/kafka-logs-2架构

port =9092

broker server服务端口

message.max.bytes =6525000

表示消息体的最大大小,单位是字节

num.network.threads =4

broker处理消息的最大线程数,通常状况下数量为cpu核数

num.io.threads =8

broker处理磁盘IO的线程数,数值为cpu核数2倍

background.threads =4

一些后台任务处理的线程数,例如过时消息文件的删除等,通常状况下不须要去作修改

queued.max.requests =500

等待IO线程处理的请求队列最大数,如果等待IO的请求超过这个数值,那么会中止接受外部消息,应该是一种自我保护机制。

host.name

broker的主机地址,如果设置了,那么会绑定到这个地址上,如果没有,会绑定到全部的接口上,并将其中之一发送到ZK,通常不设置

socket.send.buffer.bytes=100*1024

socket的发送缓冲区,socket的调优参数SO_SNDBUFF

socket.receive.buffer.bytes =100*1024

socket的接受缓冲区,socket的调优参数SO_RCVBUFF

socket.request.max.bytes =100*1024*1024

socket请求的最大数值,防止serverOOMmessage.max.bytes必然要小于socket.request.max.bytes,会被topic建立时的指定参数覆盖

log.segment.bytes =1024*1024*1024

topic的分区是以一堆segment文件存储的,这个控制每一个segment的大小,会被topic建立时的指定参数覆盖

log.roll.hours =24*7

这个参数会在日志segment没有达到log.segment.bytes设置的大小,也会强制新建一个segment会被 topic建立时的指定参数覆盖

log.cleanup.policy = delete

日志清理策略选择有:deletecompact主要针对过时数据的处理,或是日志文件达到限制的额度,会被 topic建立时的指定参数覆盖

log.retention.minutes=300

log.retention.hours=24

数据文件保留多长时间, 存储的最大时间超过这个时间会根据log.cleanup.policy设置数据清除策略

log.retention.byteslog.retention.minutes或log.retention.hours任意一个达到要求,都会执行删除

 

有2删除数据文件方式:

      按照文件大小删除:log.retention.bytes

  按照2中不一样时间粒度删除:分别为分钟,小时

log.retention.bytes=-1

topic每一个分区的最大文件大小,一个topic的大小限制 =分区数*log.retention.bytes-1没有大小限log.retention.byteslog.retention.minutes任意一个达到要求,都会执行删除,会被topic建立时的指定参数覆盖

log.retention.check.interval.ms=5minutes

文件大小检查的周期时间,是否处罚log.cleanup.policy中设置的策略

log.cleaner.enable=false

是否开启日志清理

log.cleaner.threads = 2

日志清理运行的线程数

log.cleaner.io.max.bytes.per.second=None

日志清理时候处理的最大大小

log.cleaner.dedupe.buffer.size=500*1024*1024

日志清理去重时候的缓存空间,在空间容许的状况下,越大越好

log.cleaner.io.buffer.size=512*1024

日志清理时候用到的IO块大小通常不须要修改

log.cleaner.io.buffer.load.factor =0.9

日志清理中hash表的扩大因子通常不须要修改

log.cleaner.backoff.ms =15000

检查是否处罚日志清理的间隔

log.cleaner.min.cleanable.ratio=0.5

日志清理的频率控制,越大意味着更高效的清理,同时会存在一些空间上的浪费,会被topic建立时的指定参数覆盖

log.cleaner.delete.retention.ms =1day

对于压缩的日志保留的最长时间,也是客户端消费消息的最长时间,同log.retention.minutes的区别在于一个控制未压缩数据,一个控制压缩后的数据。会被topic建立时的指定参数覆盖

log.index.size.max.bytes =10*1024*1024

对于segment日志的索引文件大小限制,会被topic建立时的指定参数覆盖

log.index.interval.bytes =4096

当执行一个fetch操做后,须要必定的空间来扫描最近的offset大小,设置越大,表明扫描速度越快,可是也更好内存,通常状况下不须要搭理这个参数

log.flush.interval.messages=None

例如log.flush.interval.messages=1000

表示每当消息记录数达到1000时flush一次数据到磁盘

log文件”sync”到磁盘以前累积的消息条数,由于磁盘IO操做是一个慢操做,但又是一个数据可靠性"的必要手段,因此此参数的设置,须要在"数据可靠性""性能"之间作必要的权衡.若是此值过大,将会致使每次"fsync"的时间较长(IO阻塞),若是此值太小,将会致使"fsync"的次数较多,这也意味着总体的client请求有必定的延迟.物理server故障,将会致使没有fsync的消息丢失.

log.flush.scheduler.interval.ms =3000

检查是否须要固化到硬盘的时间间隔

log.flush.interval.ms = None

例如:log.flush.interval.ms=1000

表示每间隔1000毫秒flush一次数据到磁盘

仅仅经过interval来控制消息的磁盘写入时机,是不足的.此参数用于控制"fsync"的时间间隔,若是消息量始终没有达到阀值,可是离上一次磁盘同步的时间间隔达到阀值,也将触发.

log.delete.delay.ms =60000

文件在索引中清除后保留的时间通常不须要去修改

log.flush.offset.checkpoint.interval.ms =60000

控制上次固化硬盘的时间点,以便于数据恢复通常不须要去修改

auto.create.topics.enable =true

是否容许自动建立topic,如果false,就须要经过命令建立topic

default.replication.factor =1

是否容许自动建立topic,如果false,就须要经过命令建立topic

num.partitions =1

每一个topic的分区个数,如果在topic建立时候没有指定的话会被topic建立时的指定参数覆盖

 

 

如下是kafkaLeader,replicas配置参数

 

controller.socket.timeout.ms =30000

partition leaderreplicas之间通信时,socket的超时时间

controller.message.queue.size=10

partition leaderreplicas数据同步时,消息的队列尺寸

replica.lag.time.max.ms =10000

replicas响应partition leader的最长等待时间,如果超过这个时间,就将replicas列入ISR(in-sync replicas),并认为它是死的,不会再加入管理中

replica.lag.max.messages =4000

若是follower落后与leader太多,将会认为此follower[或者说partition relicas]已经失效

##一般,followerleader通信时,由于网络延迟或者连接断开,总会致使replicas中消息同步滞后

##若是消息以后太多,leader将认为此follower网络延迟较大或者消息吞吐能力有限,将会把此replicas迁移

##到其余follower.

##broker数量较少,或者网络不足的环境中,建议提升此值.

replica.socket.timeout.ms=30*1000

followerleader之间的socket超时时间

replica.socket.receive.buffer.bytes=64*1024

leader复制时候的socket缓存大小

replica.fetch.max.bytes =1024*1024

replicas每次获取数据的最大大小

replica.fetch.wait.max.ms =500

replicasleader之间通讯的最大等待时间,失败了会重试

replica.fetch.min.bytes =1

fetch的最小数据尺寸,若是leader中还没有同步的数据不足此值,将会阻塞,直到知足条件

num.replica.fetchers=1

leader进行复制的线程数,增大这个数值会增长followerIO

replica.high.watermark.checkpoint.interval.ms =5000

每一个replica检查是否将最高水位进行固化的频率

controlled.shutdown.enable =false

是否容许控制器关闭broker ,如果设置为true,会关闭全部在这个broker上的leader,并转移到其余broker

controlled.shutdown.max.retries =3

控制器关闭的尝试次数

controlled.shutdown.retry.backoff.ms =5000

每次关闭尝试的时间间隔

leader.imbalance.per.broker.percentage =10

leader的不平衡比例,如果超过这个数值,会对分区进行从新的平衡

leader.imbalance.check.interval.seconds =300

检查leader是否不平衡的时间间隔

offset.metadata.max.bytes

客户端保留offset信息的最大空间大小

kafkazookeeper参数配置

 

zookeeper.connect = localhost:2181

zookeeper集群的地址,能够是多个,多个之间用逗号分割hostname1:port1,hostname2:port2,hostname3:port3

zookeeper.session.timeout.ms=6000

ZooKeeper的最大超时时间,就是心跳的间隔,如果没有反映,那么认为已经死了,不易过大

zookeeper.connection.timeout.ms =6000

ZooKeeper的链接超时时间

zookeeper.sync.time.ms =2000

ZooKeeper集群中leaderfollower之间的同步实际那



 

 


4、错误处理

一、配置kafka时,若是使用zookeeper create /kafka建立了节点,kafka与storm集成时new ZkHosts(zks) 须要改为 new ZkHosts(zks,”/kafka/brokers”),否则会报java.lang.RuntimeException: java.lang.RuntimeException: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /brokers/topics/my-replicated-topic5/partitions。

storm-kafka插件默认kafka的 zk_path以下:
public class ZkHosts implements BrokerHosts {
private static final String DEFAULT_ZK_PATH = “/brokers”;

二、若是出现如下问题,表明偏移量出错,建议从新开一个topic
ERROR [KafkaApi-3] Error when processing fetch request for partition [xxxxx,1] offset 112394 from consumer with correlation id 0 (kafka.server.KafkaApis)
kafka.common.OffsetOutOfRangeException: Request for offset 112394 but we only have log segments in the range 0 to 665.  

三、当没有某个topic,或者是某个topic的node放置不在默认位置时,会有如下异常:
java.lang.RuntimeException: java.lang.RuntimeException: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /kafka/brokers/topics/mytest/partitions at storm.kafka.Dynam         


四、kafka中出现如下异常:
[2015-06-09 17:03:05,094] ERROR [KafkaApi-0] Error processing ProducerRequest with correlation id 616 from client kafka-client on partition [test3,0] (kafka.server.KafkaApis)
kafka.common.MessageSizeTooLargeException: Message size is 2211366 bytes which exceeds the maximum configured message size of 1000012.
缘由是集群默认每次只能接受约1M的消息,若是客户端一次发送的消息大于这个数值则会致使异常。
在server.properties中添加如下参数
message.max.bytes=1000000000
replica.fetch.max.bytes=1073741824
同时在consumer.properties中添加如下参数:
fetch.message.max.bytes=1073741824

而后重启kafka进程便可,如今每次最大可接收100M的消息。

 

五、open too many files

kafka出现异常,日志提示open too many file
查找文件打开数量
lsof -p 30353 | wc
若是在1000以上,通常都是不正常,走过65535就会出错。
缘由打开了太多producer,没关闭,调用producer.close()便可。


5、经常使用操做
一、启动集群

bin/kafka-server-start.sh config/server.properties &

二、建立topic

bin/kafka-topics.sh --create --zookeeper 192.168.169.91:2181,192.168.169.92:2181,192.168.169.93:2181/kafka --replication-factor 3 --partitions 5 --topic test_topic

三、查看topic信息

bin/kafka-topics.sh --describe --zookeeper bin/kafka-console-producer.sh --broker-list  192.168.169.91:2181,192.168.169.92:2181,192.168.169.93:2181/kafka --topic test_topic

四、启动一个console producer,用于在console中模拟输入消息

bin/kafka-console-producer.sh --broker-list 192.168.169.92:9092, 192.168.169.93:9092, 192.168.169.94:9092 --topic test_topic

五、启动一个console consumer,用于模拟接收消息,并在console中输出

bin/kafka-console-consumer.sh --zookeeper 192.168.169.91:2181,192.168.169.92:2181,192.168.169.93:2181/kafka --from-beginning --topic test_topic


六、删除一个topic
bin/kafka-topics.sh  --zookeeper 192.168.169.91:2181,192.168.169.92:2181,192.168.169.93:2181/kafka --delete --topic test2

七、列出全部topic

 bin/kafka-topics.sh  --zookeeper localhost/kafka --list

 6、API

 

7、zookeeper中的内容

默认状况,kafka在zk的/brokers目录下记录topic相关的信息,但若是在建立topic时,指定了路径,则放置到固定的路径中,如:

bin/kafka-topics.sh --create --zookeeper 192.168.169.91:2181,192.168.169.92:2181,192.168.169.93:2181/kafka --replication-factor 3 --partitions 5 --topic test_topic
建立的topic,其相关信息会放置到/kafka/brokers中,这个目录中主要包括2个子目录:ids 和 topics

一、ids:记录这个kafka集群中有多少个broker

如:

 

ls /kafka/brokers/ids/
3   2   5   4

 

这个集群有4个节点,节点id分别为2,3,4,5。 咱们看一下内容

 

[zk: localhost:2181(CONNECTED) 27] get  /kafka/brokers/ids/2
{"jmx_port":-1,"timestamp":"1435833841290","host":"kafka02-log.i.nease.net","version":1,"port":9092}
cZxid = 0x1000e8a68
ctime = Thu Jul 02 18:44:01 HKT 2015
mZxid = 0x1000e8a68
mtime = Thu Jul 02 18:44:01 HKT 2015
pZxid = 0x1000e8a68
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x44e440d0bdf06eb
dataLength = 104
numChildren = 0

记录着这个节点的一些基本状况。

 

二、topics

先看一下有哪些内容:

 

[zk: localhost:2181(CONNECTED) 29] ls /kafka/brokers/topics/test30/partitions
[3, 2, 1, 0, 4]
[zk: localhost:2181(CONNECTED) 30] ls /kafka/brokers/topics/test30/partitions/0
[state]
[zk: localhost:2181(CONNECTED) 1] get /kafka/brokers/topics/test30/partitions/0/state
{"controller_epoch":4,"leader":5,"version":1,"leader_epoch":2,"isr":[5]}
cZxid = 0x100017c5e
ctime = Wed Jul 01 14:54:24 HKT 2015
mZxid = 0x1000e8a84
mtime = Thu Jul 02 18:44:01 HKT 2015
pZxid = 0x100017c5e
cversion = 0
dataVersion = 2
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 72
numChildren = 0

能够看某个分区的leader是哪一个,从而读取kafka消息时,能够从这个leader中读取数据。

相关文章
相关标签/搜索