Kafka监控工具KafkaOffsetMonitor配置及使用

 1、KafkaOffsetMonitor简述

KafkaOffsetMonitor是Kafka的一款客户端消费监控工具,用来实时监控Kafka服务的Consumer以及它们所在的Partition中的Offset,咱们能够浏览当前的消费者组,而且每一个Topic的全部Partition的消费状况均可以一目了然。html

2、KafkaOffsetMonitor下载

KafkaOffsetMonitor托管在Github上,能够经过Github下载。
下载地址:https://github.com/quantifind/KafkaOffsetMonitor/releasesjava

或者下载百度网盘:连接:https://pan.baidu.com/s/1geEBEvT 密码:jaeulinux

 

3、KafkaOffsetMonitor启动

将下载下来的KafkaOffsetMonitor jar包上传到linux上,能够新建一个目录KafkaMonitor,用于存放KafkaOffsetMonitor-assembly-0.2.0.jar进入到KafkaMonitor目录下,经过java编译命令来运行这个jar包:git

[root@kafka50 KafkaMonitor]# java -cp KafkaOffsetMonitor-assembly-0.2.0.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb --zk 10.0.0.50:12181,10.0.0.60:12181,10.0.0.70:12181 --port 8088 --refresh 5.seconds --retain 1.days 按回车后,能够看到控制台输出:

serving resources from: jar:file:/data/KafkaMonitor/KafkaOffsetMonitor-assembly-0.2.0.jar!/offsetapp
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
2018-01-05 21:17:36.267:INFO:oejs.Server:jetty-7.x.y-SNAPSHOT
log4j:WARN No appenders could be found for logger (org.I0Itec.zkclient.ZkConnection).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
2018-01-05 21:17:36.630:INFO:oejsh.ContextHandler:started o.e.j.s.ServletContextHandler{/,jar:file:/data/KafkaMonitor/KafkaOffsetMonitor-assembly-0.2.0.jar!/offsetapp}
2018-01-05 21:17:36.662:INFO:oejs.AbstractConnector:Started SocketConnector@0.0.0.0:8088

若是没有指定端口,则默认会开启一个随机端口。github

参数说明:

zk :zookeeper主机地址,若是有多个,用逗号隔开
port :应用程序端口
refresh :应用程序在数据库中刷新和存储点的频率
retain :在db中保留多长时间
dbName :保存的数据库文件名,默认为offsetapp

为了更方便的启动KafkaOffsetMonitor,能够写一个启动脚原本直接运行,我这里新建一个名为:kafka-monitor-start.sh的脚本,而后编辑这个脚本:数据库

[root@kafka50 KafkaMonitor]# vim kafka-monitor-start.sh java -Xms512M -Xmx512M -Xss1024K -XX:PermSize=256m -XX:MaxPermSize=512m -cp KafkaOffsetMonitor-assembly-0.2.0.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb \ --port 8088 \ --zk 10.0.0.50:12181,10.0.0.60:12181,10.0.0.70:12181 \ --refresh 5.minutes \ --retain 1.day >/dev/null 2>&1;

而后退出保存便可,接下来修改一下kafka-monitor-start.sh的权限apache

[root@kafka50 KafkaMonitor]# chmod +x kafka-monitor-start.sh 

启动KafkaOffsetMonitor:vim

[root@kafka50 KafkaMonitor]# nohup /data/KafkaMonitor/kafka-monitor-start.sh & [1] 6551 [root@kafka50 KafkaMonitor]# lsof -i:8088 COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME java 6552 root   16u  IPv6  26047      0t0  TCP *:radan-http (LISTEN)

4、KafkaOffsetMonitor Web UI

在游览器中输入:http://ip:port便可以查看KafkaOffsetMonitor Web UI,以下图:安全

 

在下图中有一个Visualizations选项卡,点击其中的Cluster Overview能够查看当前Kafka集群的Broker状况app

 

5、简单的Producer

一、新建一个Topic

  首先为本次试验新建一个Topic,命令以下:

bin/kafka-topics.sh \ --create \ --zookeeper 10.0.0.50:12181 \ --replication-factor 3 \ --partition 3 \ --topic kafkamonitor-simpleproducer

 

二、新建SimpleProducer代码

  在上一篇文章中提到的Producer封装Github代码的基础上,写了一个往kafkamonitor-simpleproducer发送message的java代码。

import com.ckm.kafka.producer.impl.KafkaProducerToolImpl; import com.ckm.kafka.producer.inter.KafkaProducerTool; /** * Created by ckm on 2016/8/30. */
public class SimpleProducer { public static void main(String[] args) { KafkaProducerTool kafkaProducerTool = new KafkaProducerToolImpl(); int i = 0; String message = ""; while (true) { message = "test-simple-producer : " + i ++; kafkaProducerTool.publishMessage("kafkamonitor-simpleproducer", message); } } }

  程序运行效果: 
  这里写图片描述

三、ConsoleConsumer消费该topic

  用kafka自带的ConsoleConsumer消费kafkamonitor-simpleproducer中的message。

bin/kafka-console-consumer.sh --zookeeper m000:2181 --from-beginning --topic kafkamonitor-simpleproducer

  消费截图以下: 
  这里写图片描述

四、KafkaOffsetMonitor页面

(1)在Topic List选项卡中,咱们能够看到刚才新建的kafkamonitor-simpleproducer 
  这里写图片描述
(2)点开后,能看到有一个console-consumer正在消费该topic 
  这里写图片描述
(3)继续进入该Consumer,能够查看该Consumer当前的消费情况 
  这里写图片描述
  这张图片的左上角显示了当前Topic的生产速率,右上角显示了当前Consumer的消费速率。 
  图片中还有三种颜色的线条,蓝色的表示当前Topic中的Message数目,灰色的表示当前Consumer消费的offset位置,红色的表示蓝色灰色的差值,即当前Consumer滞后于Producer的message数目。 
(4)看一眼各partition中的message消费状况 
  这里写图片描述
  从上图能够看到,当前有3个Partition,每一个Partition中的message数目分布很不均匀。这里能够与接下来的自定义Producer的状况进行一个对比。

6、自定义Partitioner的Producer

一、新建一个Topic

bin/kafka-topics.sh \ --create \ --zookeeper 10.0.0.50:12181 \ --replication-factor 3 \ --partition 3 \ --topic kafkamonitor-partitionedproducer

二、Partitioner代码

  逻辑很简单,循环依次往各Partition中发送message。

import kafka.producer.Partitioner; /** * Created by ckm on 2018/1/8. */
public class TestPartitioner implements Partitioner { public TestPartitioner() { } @Override public int partition(Object key, int numPartitions) { int intKey = (int) key; return intKey % numPartitions; } }

三、Producer代码

  将自定义的Partitioner设置到Producer,其余调用过程和二中相似。

import com.ckm.kafka.producer.impl.KafkaProducerToolImpl; import com.ckm.kafka.producer.inter.KafkaProducerTool; /** * Created by ckm on 2016/8/30. */
public class PartitionedProducer { public static void main(String[] args) { KafkaProducerTool kafkaProducerTool = new KafkaProducerToolImpl(); kafkaProducerTool.getProducerProperties().put("partitioner.class", "TestPartitioner"); int i = 0; String message = ""; while (true) { message = "test-partitioner-producer : " + i; System.out.println(message); kafkaProducerTool.publishPartitionedMessage("kafkamonitor-partitionedproducer", i + "", message); i ++; } } }

  代码运行效果以下图: 
  这里写图片描述

四、ConsoleConsumer消费Message

bin/kafka-console-consumer.sh --zookeeper 10.0.0.50:12181 --from-beginning --topic kafkamonitor-partitionedproducer

 

  消费效果以下图: 
  这里写图片描述

五、KafkaOffsetMonitor页面

  其余页面与上面的相似,这里只观察一下每一个partition中的message数目与第二节中的对比。能够看到这里每一个Partition中message分别是很均匀的。 
  这里写图片描述

注意事项: 
  注意这里有一个坑,默认状况下Producer往一个不存在的Topic发送message时会自动建立这个Topic。因为在这个封装中,有同时传递message和topic的状况,若是调用方法时传入的参数反了,将会在Kafka集群中自动建立Topic。在正常状况下,应该是先把Topic根据须要建立好,而后Producer往该Topic发送Message,最好把Kafka这个默认自动建立Topic的功能关掉。 
  那么,假设真的不当心建立了多余的Topic,在删除时,会出现“marked for deletion”提示,只是将该topic标记为删除,使用list命令仍然能看到。若是须要调整这两个功能的话,在server.properties中配置以下两个参数:

 

参数 默认值 做用
auto.create.topics.enable true Enable auto creation of topic on the server
delete.topic.enable false Enables delete topic. Delete topic through the admin tool will have no effect if this config is turned off

 

七,KafkaOffsetMonitor 总结

KafkaOffsetMonitor:程序一个jar包的形式运行,部署较为方便。只有监控功能,使用起来也较为安全。除了KafkaOffsetMonitor,Kafka监控工具还有另外两款:Kafka Web Console:监控功能较为全面,能够预览消息,监控Offset、Lag等信息,但存在bug,不建议在生产环境中使用。Kafka Manager:偏向Kafka集群管理,若操做不当,容易致使集群出现故障。对Kafka实时生产和消费消息是经过JMX实现的。没有记录Offset、Lag等信息。

相关文章
相关标签/搜索