若自认英语很是强,请忽视:
http://www.linuxidc.com/Linux/2014-07/104470.htm
三、另外一文也可以:
http://www.sxt.cn/info-2871-u-324.html
其主要内容来源于下面三篇文章:
日志:每个软件project师都应该知道的有关实时数据的统一律念 —— 这篇比較抽象,高屋建瓴,理论先行
Building LinkedIn’s Real-time Activity Data Pipeline —— 实践层的论文,把作事情的来龙去脉都写明确了
分布式公布订阅消息系统 Kafka 架构设计 —— 落地设计
(二)kafka是什么?
一、Kafka is a distributed, partitioned, replicated commit log service. It provides the functionality of a messaging system, but with a unique design.
Kafka是一个 分布式的、可分区的、可复制的消息系统。它提供了普通消息系统的功能,但具备本身独特的设计。html
二、可以简单的理解为:kafka是一个日志集群,各类各样的server将它们自身的日志发送到集群中进行统一汇总和存储。而后其余机器从集群中拉取消息进行分析处理,如ELT、数据挖掘等。java
三、kafka提供了JAVA API,同一时候对多种语言都提供了支持。
(三)主要的架构
首先让咱们看几个主要的消息系统术语:
Kafka将消息以topic为单位进行概括。
将向Kafka topic公布消息的程序称为producers.
将预订topics并消费消息的程序称为consumer.
Kafka以集群的方式执行,可以由一个或多个服务组成,每个服务叫作一个broker.
producers经过网络将消息发送到Kafka集群,集群向consumers提供消息,例如如下图所看到的:
(四)分区与副本
一、一个topic是对一组消息的概括。对每个topic,Kafka 对它的日志进行了分区,例如如下图所看到的:
二、通常而言,一个topic会有多个分区,每个分区会有多个副本。
分区是分了将一个topic分到多个地方存储,提升并行处理的能力。副本是为了容错,保证数据不丢失。node
三、对于每一个分区。都会选取一个leader。这个分区的所有读取都在这个leader中进行,而其余副本会同步leader中的数据,且仅仅作备份。
即leader仅仅是针对一个分区而言,而非整个集群。linux
一个server对于某个分区是leader,对于其余分区多是follower。apache
四、 Producer将消息公布到它指定的topic中,并负责决定公布到哪一个分区。一般简单的由负载均衡机制随机选择分区,但也可以经过特定的分区函数选择分区。缓存
使用的不少其它的是另一种。网络
五、公布消息一般有两种模式:队列模式(queuing)和公布-订阅模式(publish-subscribe)。session
队列模式中。consumers可以同一时候从服务端读取消息,每个消息仅仅被当中一个consumer读到。公布-订阅模式中消息被广播到所有的consumer中。架构
Consumers可以增长一个consumer 组。共同竞争一个topic,topic中的消息将被分发到组中的一个成员中。并发
同一组中的consumer可以在不一样的程序中,也可以在不一样的机器上。假设所有的consumer都在一个组中,这就成为了传统的队列模式。在各consumer中实现负载均衡。
假设所有的consumer都不在不一样的组中,这就成为了公布-订阅模式。所有的消息都被分发到所有的consumer中。
更常见的是。每个topic都有若干数量的consumer组,每个组都是一个逻辑上的“订阅者”,为了容错和更好的稳定性,每个组由若干consumer组成。
这事实上就是一个公布-订阅模式,仅仅只是订阅者是个组而不是单个consumer。
由两个机器组成的集群拥有4个分区 (P0-P3) 2个consumer组. A组有两个consumerB组有4个
相比传统的消息系统,Kafka可以很是好的保证有序性。
传统的队列在server上保存有序的消息。假设多个consumers同一时候从这个server消费消息,server就会以消息存储的顺序向consumer分 发消息。尽管server按顺序公布消息,但是消息是被异步的分发到各consumer上。因此当消息到达时可能已经失去了原来的顺序。这意味着并发消费将致使 顺序错乱。为了不故障,这种消息系统一般使用“专用consumer”的概念,事实上就是仅仅赞成一个消费者消费消息,固然这就意味着失去了并发性。
在这方面Kafka作的更好。经过分区的概念,Kafka可以在多个consumer组并发的状况下提供较好的有序性和负载均衡。
将每个分区分 仅仅分发给一个consumer组。这样一个分区就仅仅被这个组的一个consumer消费。就可以顺序的消费这个分区的消息。因为有多个分区,依旧可以在多 个consumer组之间进行负载均衡。注意consumer组的数量不能多于分区的数量,也就是有多少分区就赞成多少并发消费。
Kafka仅仅能保证一个分区以内消息的有序性,在不一样的分区之间是不可以的,这已经可以知足大部分应用的需求。假设需要topic中所有消息的有序性。那就仅仅能让这个topic仅仅有一个分区。固然也就仅仅有一个consumer组消费它。
2、安装部署
(一)单机版安装
Step 1: 下载Kafka
下载最新的版本号并解压.
$ wget http://mirrors.cnnic.cn/apache/kafka/0.8.2.1/kafka_2.10-0.8.2.1.tgz
$ tar -zxvf kafka_2.10-0.8.2.1.tgz
Step 2: 启动服务
Kafka用到了Zookeeper,所有首先启动Zookper,如下简单的启用一个单实例的Zookkeeper服务。可以在命令的结尾加个&符号,这样就可以启动后离开控制台。
> bin/zookeeper-server-start.sh config/zookeeper.properties &
...
现在启动Kafka:
> bin/kafka-server-start.sh config/server.properties
Step 3: 建立 topic
建立一个叫作“test”的topic,它仅仅有一个分区。一个副本。
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
[2015-06-04 13:17:13,943] WARN Connected to an old server; r-o mode will be unavailable (org.apache.zookeeper.ClientCnxnSocket)
Created topic "test".
可以经过list命令查看建立的topic:
> bin/kafka-topics.sh --list --zookeeper localhost:2181
test
除了手动建立topic,还可以配置broker让它本身主动建立topic.
Step 4:发送消息.
Kafka 使用一个简单的命令行producer,从文件里或者从标准输入中读取消息并发送到服务端。默认的每条命令将发送一条消息。
执行producer并在控制台中输一些消息,这些消息将被发送到服务端:
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a messageThis is another message
ctrl+c可以退出发送。
默认状况下。日志数据会被放置到/tmp/kafka-logs中。每个分区一个文件夹
Step 5: 启动consumer
Kafka also has a command line consumer that will dump out messages to standard output.
Kafka也有一个命令行consumer可以读取消息并输出到标准输出:
> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
This is a message
This is another message
你在一个终端中执行consumer命令行,还有一个终端中执行producer命令行。就可以在一个终端输入消息。还有一个终端读取消息。
这两个命令都有本身的可选參数。可以在执行的时候不加不论什么參数可以看到帮助信息。
(二)集群安装
注意,必须先搭建zookeeper集群。搭建方法请见????
一、使用3台机器搭建Kafka集群:
192.168.169.92 gdc-dn01-test
192.168.169.93 gdc-dn02-test
192.168.169.94 gdc-dn03-test
二、在安装Kafka集群以前。这里没有使用Kafka自带的Zookeeper,而是独立安装了一个Zookeeper集群,也是使用这3台机器,保证Zookeeper集群正常执行。
三、首先,在gdc-dn01-test上准备Kafka安装文件,运行例如如下命令:
cd
wget http://mirrors.cnnic.cn/apache/kafka/0.8.2.1/kafka_2.10-0.8.2.1.tgz
tar xvzf kafka_2.10-0.8.2.1.tgz
mv kafka_2.10-0.8.2.1 kafka
四、改动配置文件kafka/config/server.properties。改动例如如下内容:
broker.id=0
zookeeper.connect=192.168.169.91:2181,192.168.169.92:2181,192.168.169.93:2181/kafka
这里需要说明的是,默认Kafka会使用ZooKeeper默认的/路径。这样有关Kafka的ZooKeeper配置就会散落在根路径如下,假设 你有其它的应用也在使用ZooKeeper集群,查看ZooKeeper中数据可能会不直观,因此强烈建议指定一个chroot路径,直接在 zookeeper.connect配置项中指定:
zookeeper.connect=192.168.169.91:2181,192.168.169.92:2181,192.168.169.93:2181/kafka
而且。需要手动在ZooKeeper中建立路径/kafka,使用例如如下命令链接到随意一台ZooKeeperserver:
cd ~/zookeeper
bin/zkCli.sh
在ZooKeeper运行例如如下命令建立chroot路径:
create /kafka ''
这样。每次链接Kafka集群的时候(使用--zookeeper选项),也必须使用带chroot路径的链接字符串,后面会看到。
五、而后,将配置好的安装文件同步到其它的dn0二、dn03节点上:
scp -r /usr/local/kafka_2.10-0.8.2.1/ 192.168.169.92:/home/hadoop
scp -r /usr/local/kafka_2.10-0.8.2.1/ 192.168.169.93:/home/hadoop
六、最后,在dn0二、dn03节点上配置改动配置文件kafka/config/server.properties内容例如如下所看到的:
broker.id=1 # 在dn02改动
broker.id=2 # 在dn03改动
因为Kafka集群需要保证各个Broker的id在整个集群中必须惟一。需要调整这个配置项的值(假设在单机上,可以经过创建多个Broker进程来模拟分布式的Kafka集群。也需要Broker的id惟一,还需要改动一些配置文件夹的信息)。
七、在集群中的dn0一、dn0二、dn03这三个节点上分别启动Kafka,分别运行例如如下命令:
bin/kafka-server-start.sh config/server.properties &
可以经过查看日志,或者检查进程状态。保证Kafka集群启动成功。
八、建立一个名称为my-replicated-topic5的Topic。5个分区。并且复制因子为3,运行例如如下命令:
bin/kafka-topics.sh --create --zookeeper 192.168.169.91:2181,192.168.169.92:2181,192.168.169.93:2181/kafka --replication-factor 3 --partitions 5 --topic my-replicated-topic5
九、查看建立的Topic。运行例如如下命令:
bin/kafka-topics.sh --describe --zookeeper 192.168.169.91:2181,192.168.169.92:2181,192.168.169.93:2181/kafka --topic my-replicated-topic5
结果信息例如如下所看到的:
Topic:my-replicated-topic5 PartitionCount:5 ReplicationFactor:3 Configs:
Topic: my-replicated-topic5 Partition: 0 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
Topic: my-replicated-topic5 Partition: 1 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
Topic: my-replicated-topic5 Partition: 2 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
Topic: my-replicated-topic5 Partition: 3 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0
Topic: my-replicated-topic5 Partition: 4 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1
上面Leader、Replicas、Isr的含义例如如下:
1 Partition: 分区
2 Leader : 负责读写指定分区的节点
3 Replicas : 复制该分区log的节点列表
4 Isr : "in-sync" replicas。当前活跃的副本列表(是一个子集)。并且可能成为Leader
咱们可以经过Kafka自带的bin/kafka-console-producer.sh和bin/kafka-console-consumer.sh脚本,来验证演示假设公布消息、消费消息。
十一、在一个终端,启动Producer,并向咱们上面建立的名称为my-replicated-topic5的Topic中生产消息,运行例如如下脚本:
bin/kafka-console-producer.sh --broker-list 192.168.169.92:9092, 192.168.169.93:9092, 192.168.169.94:9092 --topic my-replicated-topic5
十二、在还有一个终端。启动Consumer。并订阅咱们上面建立的名称为my-replicated-topic5的Topic中生产的消息,运行例如如下脚本:
bin/kafka-console-consumer.sh --zookeeper 192.168.169.91:2181,192.168.169.92:2181,192.168.169.93:2181/kafka --from-beginning --topic my-replicated-topic5
可以在Producer终端上输入字符串消息行,就可以在Consumer终端上看到消费者消费的消息内容。
也可以參考Kafka的Producer和Consumer的Java API,经过API编码的方式来实现消息生产和消费的处理逻辑。
3、配置文件
全部配置參数请 见http://kafka.apache.org/documentation.html#consumerconfigs
及http://blog.csdn.net/jinhong_lu/article/details/46518613
(一)重要配置參数
0、JVM配置 在bin/kafka-server-start.sh中加入下面内容:
export KAFKA_HEAP_OPTS="-Xms4g -Xmx4g -XX:PermSize=48m -XX:MaxPermSize=48m -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35"
一、broker.id=0 整数。建议依据ip区分,用于区分broker,确保每台机器不一样
二、log.dirs=/home/data/kafka kafka用于放置消息的文件夹。默以为/tmp/kafka-logs
三、zookeeper.connect=192.168.169.91:2181,192.168.169.92:2181,192.168.169.93:2181/kafka zk用于放置kafka信息的地方
四、num.partitions=1 建立topic时。默认的分区数
五、num.network.threads=10 broker用于处理网络请求的线程数。如不配置默以为3
六、zookeeper.connection.timeout.ms=6000
七、message.max.bytes=1000000000
replica.fetch.max.bytes=1073741824
一条消息的最大字节数,说明例如如下:
kafka中出现下面异常:
[2015-06-09 17:03:05,094] ERROR [KafkaApi-0] Error processing ProducerRequest with correlation id 616 from client kafka-client on partition [test3,0] (kafka.server.KafkaApis)
kafka.common.MessageSizeTooLargeException: Message size is 2211366 bytes which exceeds the maximum configured message size of 1000012.
缘由是集群默认每次仅仅能接受约1M的消息。假设client一次发送的消息大于这个数值则会致使异常。
在server.properties中加入下面參数
message.max.bytes=1000000000
replica.fetch.max.bytes=1073741824
同一时候在consumer.properties中加入下面參数:
fetch.message.max.bytes=1073741824
而后从新启动kafka进程就能够,现在每次最大可接收100M的消息。
八、delete.topic.enable=true 默以为false,即delete topic时仅仅是marked for deletion,但并不会真正删除topic。
九、关于日志的保存时间或量:
(1)log.retention.hours=24 消息被删除前保存多少小时,默认1周168小时
(2)log.retention.bytes 默以为-1,即不大小限制。
注意此外的大小是指一个topic的一个分区的最大字节数。
当超出上述2个限制的不论什么一个时。日志均会被删除。
十、同步发送仍是异步发送,异步吞吐量较大,但可能引入错误。默以为sync
producer.type=sync|async
This parameter specifies whether the messages are sent asynchronously in a background thread. Valid values are (1) async for asynchronous send and (2) sync for synchronous send. By setting the producer to async we allow batching together of requests (which is great for throughput) but open the possibility of a failure of the client machine dropping unsent data.
十一、batch.size 默认值为16384
在async模式下,producer缓存多少个消息后再一块儿发送
十二、compression.type 默认值为none,可选gzip snappy
The compression type for all data generated by the producer. The default is none (i.e. no compression). Valid values are none, gzip, or snappy. Compression is of full batches of data, so the efficacy of batching will also impact the compression ratio (more batching means better compression).
1三、default.replication.factor 消息副本的数量。默以为1。即没有副本
下面配置说明来自网上转载:
每个kafka broker中配置文件server.properties默认必须配置的属性例如如下:转载请注明来自:http://blog.csdn.net/lizhitao/article/details/25667831
參数 |
说明(解释) |
broker.id =0 |
每一个broker在集群中的惟一表示。要求是正数。当该server的IP地址发生改变时,broker.id没有变化。则不会影响consumers的消息状况 |
log.dirs=/data/kafka-logs |
kafka数据的存放地址,多个地址的话用逗号切割,多个文件夹分布在不一样磁盘上可以提升读写性能 /data/kafka-logs-1,/data/kafka-logs-2 |
port =9092 |
broker server服务port |
message.max.bytes =6525000 |
表示消息体的最大大小。单位是字节 |
num.network.threads =4 |
broker处理消息的最大线程数。普通状况下数量为cpu核数 |
num.io.threads =8 |
broker处理磁盘IO的线程数,数值为cpu核数2倍 |
background.threads =4 |
一些后台任务处理的线程数,好比过时消息文件的删除等,普通状况下不需要去作改动 |
queued.max.requests =500 |
等待IO线程处理的请求队列最大数,如果等待IO的请求超过这个数值。那么会中止接受外部消息,应该是一种自我保护机制。 |
host.name |
broker的主机地址,如果设置了,那么会绑定到这个地址上,如果没有。会绑定到所有的接口上,并将当中之中的一个发送到ZK,通常不设置 |
socket.send.buffer.bytes=100*1024 |
socket的发送缓冲区。socket的调优參数SO_SNDBUFF |
socket.receive.buffer.bytes =100*1024 |
socket的接受缓冲区。socket的调优參数SO_RCVBUFF |
socket.request.max.bytes =100*1024*1024 |
socket请求的最大数值,防止serverOOM,message.max.bytes一定要小于socket.request.max.bytes,会被topic建立时的指定參数覆盖 |
log.segment.bytes =1024*1024*1024 |
topic的分区是以一堆segment文件存储的。这个控制每个segment的大小。会被topic建立时的指定參数覆盖 |
log.roll.hours =24*7 |
这个參数会在日志segment没有达到log.segment.bytes设置的大小。也会强制新建一个segment会被 topic建立时的指定參数覆盖 |
log.cleanup.policy = delete |
日志清理策略选择有:delete和compact主要针对过时数据的处理,或是日志文件达到限制的额度。会被 topic建立时的指定參数覆盖 |
log.retention.minutes=300 或 log.retention.hours=24 |
数据文件保留多长时间, 存储的最大时间超过这个时间会依据log.cleanup.policy设置数据清除策略 log.retention.bytes和log.retention.minutes或log.retention.hours随意一个达到要求,都会运行删除
有2删除数据文件方式: 依照文件大小删除:log.retention.bytes 依照2中不一样一时候间粒度删除:分别为分钟,小时 |
log.retention.bytes=-1 |
topic每个分区的最大文件大小,一个topic的限制大小 = 分区数*log.retention.bytes。 -1没有大小限log.retention.bytes和log.retention.minutes随意一个达到要求。都会运行删除,会被topic建立时的指定參数覆盖 |
log.retention.check.interval.ms=5minutes |
文件大小检查的周期时间,是否处罚 log.cleanup.policy中设置的策略 |
log.cleaner.enable=false |
是否开启日志清理 |
log.cleaner.threads = 2 |
日志清理执行的线程数 |
log.cleaner.io.max.bytes.per.second=None |
日志清理时候处理的最大大小 |
log.cleaner.dedupe.buffer.size=500*1024*1024 |
日志清理去重时候的缓存空间。在空间赞成的状况下,越大越好 |
log.cleaner.io.buffer.size=512*1024 |
日志清理时候用到的IO块大小通常不需要改动 |
log.cleaner.io.buffer.load.factor =0.9 |
日志清理中hash表的扩大因子通常不需要改动 |
log.cleaner.backoff.ms =15000 |
检查是否处罚日志清理的间隔 |
log.cleaner.min.cleanable.ratio=0.5 |
日志清理的频率控制。越大意味着更高效的清理。同一时候会存在一些空间上的浪费,会被topic建立时的指定參数覆盖 |
log.cleaner.delete.retention.ms =1day |
对于压缩的日志保留的最长时间,也是client消费消息的最长时间,同log.retention.minutes的差异在于一个控制未压缩数据,一个控制压缩后的数据。会被topic建立时的指定參数覆盖 |
log.index.size.max.bytes =10*1024*1024 |
对于segment日志的索引文件限制大小,会被topic建立时的指定參数覆盖 |
log.index.interval.bytes =4096 |
当运行一个fetch操做后,需要必定的空间来扫描近期的offset大小,设置越大,表明扫描速度越快。但是也更好内存,普通状况下不需要搭理这个參数 |
log.flush.interval.messages=None 好比log.flush.interval.messages=1000 表示每当消息记录数达到1000时flush一次数据到磁盘 |
log文件”sync”到磁盘以前累积的消息条数,因为磁盘IO操做是一个慢操做,但又是一个”数据可靠性"的必要手段,因此此參数的设置,需要在"数据可靠性"与"性能"之间作必要的权衡.假设此值过大,将会致使每次"fsync"的时间较长(IO堵塞),假设此值太小,将会致使"fsync"的次数较多,这也意味着整体的client请求有必定的延迟.物理server故障,将会致使没有fsync的消息丢失. |
log.flush.scheduler.interval.ms =3000 |
检查是否需要固化到硬盘的时间间隔 |
log.flush.interval.ms = None 好比:log.flush.interval.ms=1000 表示每间隔1000毫秒flush一次数据到磁盘 |
只经过interval来控制消息的磁盘写入时机,是不足的.此參数用于控制"fsync"的时间间隔,假设消息量始终没有达到阀值,但是离上一次磁盘同步的时间间隔达到阀值,也将触发. |
log.delete.delay.ms =60000 |
文件在索引中清除后保留的时间通常不需要去改动 |
log.flush.offset.checkpoint.interval.ms =60000 |
控制上次固化硬盘的时间点,以便于数据恢复通常不需要去改动 |
auto.create.topics.enable =true |
是否赞成本身主动建立topic,如果false,就需要经过命令建立topic |
default.replication.factor =1 |
是否赞成本身主动建立topic。如果false。就需要经过命令建立topic |
num.partitions =1 |
每个topic的分区个数,如果在topic建立时候没有指定的话会被topic建立时的指定參数覆盖 |
|
|
下面是kafka中Leader,replicas配置參数 |
|
controller.socket.timeout.ms =30000 |
partition leader与replicas之间通信时,socket的超时时间 |
controller.message.queue.size=10 |
partition leader与replicas数据同步时,消息的队列尺寸 |
replica.lag.time.max.ms =10000 |
replicas响应partition leader的最长等待时间。如果超过这个时间,就将replicas列入ISR(in-sync replicas)。并以为它是死的,不会再增长管理中 |
replica.lag.max.messages =4000 |
假设follower落后与leader太多,将会以为此follower[或者说partition relicas]已经失效 ##一般,在follower与leader通信时,因为网络延迟或者连接断开,总会致使replicas中消息同步滞后 ##假设消息以后太多,leader将以为此follower网络延迟较大或者消息吞吐能力有限,将会把此replicas迁移 ##到其它follower中. ##在broker数量较少,或者网络不足的环境中,建议提升此值. |
replica.socket.timeout.ms=30*1000 |
follower与leader之间的socket超时时间 |
replica.socket.receive.buffer.bytes=64*1024 |
leader复制时候的socket缓存大小 |
replica.fetch.max.bytes =1024*1024 |
replicas每次获取数据的最大大小 |
replica.fetch.wait.max.ms =500 |
replicas同leader之间通讯的最大等待时间,失败了会重试 |
replica.fetch.min.bytes =1 |
fetch的最小数据尺寸,假设leader中还没有同步的数据不足此值,将会堵塞,直到知足条件 |
num.replica.fetchers=1 |
leader进行复制的线程数,增大这个数值会添加follower的IO |
replica.high.watermark.checkpoint.interval.ms =5000 |
每个replica检查是否将最高水位进行固化的频率 |
controlled.shutdown.enable =false |
是否赞成控制器关闭broker ,如果设置为true,会关闭所有在这个broker上的leader。并转移到其它broker |
controlled.shutdown.max.retries =3 |
控制器关闭的尝试次数 |
controlled.shutdown.retry.backoff.ms =5000 |
每次关闭尝试的时间间隔 |
leader.imbalance.per.broker.percentage =10 |
leader的不平衡比例,如果超过这个数值,会对分区进行又一次的平衡 |
leader.imbalance.check.interval.seconds =300 |
检查leader是否不平衡的时间间隔 |
offset.metadata.max.bytes |
client保留offset信息的最大空间大小 |
kafka中zookeeper參数配置 |
|
zookeeper.connect = localhost:2181 |
zookeeper集群的地址,可以是多个,多个之间用逗号切割 hostname1:port1,hostname2:port2,hostname3:port3 |
zookeeper.session.timeout.ms=6000 |
ZooKeeper的最大超时时间,就是心跳的间隔,如果没有反映,那么以为已经死了,不易过大 |
zookeeper.connection.timeout.ms =6000 |
ZooKeeper的链接超时时间 |
zookeeper.sync.time.ms =2000 |
ZooKeeper集群中leader和follower之间的同步实际那 |
storm-kafka插件默认kafka的 zk_path例如如下:
public class ZkHosts implements BrokerHosts {
private static final String DEFAULT_ZK_PATH = “/brokers”;
二、假设出现下面问题,表明偏移量出错。建议又一次开一个topic
ERROR [KafkaApi-3] Error when processing fetch request for partition [xxxxx,1] offset 112394 from consumer with correlation id 0 (kafka.server.KafkaApis)
kafka.common.OffsetOutOfRangeException: Request for offset 112394 but we only have log segments in the range 0 to 665.
三、当没有某个topic,或者是某个topic的node放置不在默认位置时,会有下面异常:
java.lang.RuntimeException: java.lang.RuntimeException: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /kafka/brokers/topics/mytest/partitions at storm.kafka.Dynam
四、kafka中出现下面异常:
[2015-06-09 17:03:05,094] ERROR [KafkaApi-0] Error processing ProducerRequest with correlation id 616 from client kafka-client on partition [test3,0] (kafka.server.KafkaApis)
kafka.common.MessageSizeTooLargeException: Message size is 2211366 bytes which exceeds the maximum configured message size of 1000012.
缘由是集群默认每次仅仅能接受约1M的消息。假设client一次发送的消息大于这个数值则会致使异常。
在server.properties中加入下面參数
message.max.bytes=1000000000
replica.fetch.max.bytes=1073741824
同一时候在consumer.properties中加入下面參数:
fetch.message.max.bytes=1073741824
而后从新启动kafka进程就能够。现在每次最大可接收100M的消息。
五、open too many files
kafka出现异常。日志提示open too many file
查找文件打开数量
lsof -p 30353 | wc
假设在1000以上。通常都是不正常,走过65535就会出错。
缘由打开了太多producer,没关闭,调用producer.close()就能够。
6、API
7、zookeeper中的内容
默认状况,kafka在zk的/brokers文件夹下记录topic相关的信息,但假设在建立topic时,指定了路径,则放置到固定的路径中。如:
bin/kafka-topics.sh --create --zookeeper 192.168.169.91:2181,192.168.169.92:2181,192.168.169.93:2181/kafka --replication-factor 3 --partitions 5 --topic test_topic
建立的topic,其相关信息会放置到/kafka/brokers中,这个文件夹中主要包含2个子文件夹:ids 和 topics
一、ids:记录这个kafka集群中有多少个broker
如:
ls /kafka/brokers/ids/ 3 2 5 4
这个集群有4个节点,节点id分别为2,3,4。5。 咱们看一下内容
[zk: localhost:2181(CONNECTED) 27] get /kafka/brokers/ids/2 {"jmx_port":-1,"timestamp":"1435833841290","host":"kafka02-log.i.nease.net","version":1,"port":9092} cZxid = 0x1000e8a68 ctime = Thu Jul 02 18:44:01 HKT 2015 mZxid = 0x1000e8a68 mtime = Thu Jul 02 18:44:01 HKT 2015 pZxid = 0x1000e8a68 cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x44e440d0bdf06eb dataLength = 104 numChildren = 0记录着这个节点的一些基本状况。
二、topics
先看一下有哪些内容:
[zk: localhost:2181(CONNECTED) 29] ls /kafka/brokers/topics/test30/partitions [3, 2, 1, 0, 4]
[zk: localhost:2181(CONNECTED) 30] ls /kafka/brokers/topics/test30/partitions/0 [state]
[zk: localhost:2181(CONNECTED) 1] get /kafka/brokers/topics/test30/partitions/0/state {"controller_epoch":4,"leader":5,"version":1,"leader_epoch":2,"isr":[5]} cZxid = 0x100017c5e ctime = Wed Jul 01 14:54:24 HKT 2015 mZxid = 0x1000e8a84 mtime = Thu Jul 02 18:44:01 HKT 2015 pZxid = 0x100017c5e cversion = 0 dataVersion = 2 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 72 numChildren = 0可以看某个分区的leader是哪一个。从而读取kafka消息时。可以从这个leader中读取数据。