KafkaOffsetMonitor是Kafka的一款客户端消费监控工具,用来实时监控Kafka服务的Consumer以及它们所在的Partition中的Offset,咱们能够浏览当前的消费者组,而且每一个Topic的全部Partition的消费状况均可以一目了然。html
KafkaOffsetMonitor托管在Github上,能够经过Github下载。
下载地址:https://github.com/quantifind/KafkaOffsetMonitor/releasesjava
或者下载百度网盘:连接:https://pan.baidu.com/s/1geEBEvT 密码:jaeulinux
将下载下来的KafkaOffsetMonitor jar包上传到linux上,能够新建一个目录KafkaMonitor,用于存放KafkaOffsetMonitor-assembly-0.2.0.jar进入到KafkaMonitor目录下,经过java编译命令来运行这个jar包:git
[root@kafka50 KafkaMonitor]# java -cp KafkaOffsetMonitor-assembly-0.2.0.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb --zk 10.0.0.50:12181,10.0.0.60:12181,10.0.0.70:12181 --port 8088 --refresh 5.seconds --retain 1.days 按回车后,能够看到控制台输出: serving resources from: jar:file:/data/KafkaMonitor/KafkaOffsetMonitor-assembly-0.2.0.jar!/offsetapp SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. 2018-01-05 21:17:36.267:INFO:oejs.Server:jetty-7.x.y-SNAPSHOT log4j:WARN No appenders could be found for logger (org.I0Itec.zkclient.ZkConnection). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. 2018-01-05 21:17:36.630:INFO:oejsh.ContextHandler:started o.e.j.s.ServletContextHandler{/,jar:file:/data/KafkaMonitor/KafkaOffsetMonitor-assembly-0.2.0.jar!/offsetapp} 2018-01-05 21:17:36.662:INFO:oejs.AbstractConnector:Started SocketConnector@0.0.0.0:8088
若是没有指定端口,则默认会开启一个随机端口。github
参数说明:
zk :zookeeper主机地址,若是有多个,用逗号隔开
port :应用程序端口
refresh :应用程序在数据库中刷新和存储点的频率
retain :在db中保留多长时间
dbName :保存的数据库文件名,默认为offsetapp
为了更方便的启动KafkaOffsetMonitor,能够写一个启动脚原本直接运行,我这里新建一个名为:kafka-monitor-start.sh的脚本,而后编辑这个脚本:数据库
[root@kafka50 KafkaMonitor]# vim kafka-monitor-start.sh java -Xms512M -Xmx512M -Xss1024K -XX:PermSize=256m -XX:MaxPermSize=512m -cp KafkaOffsetMonitor-assembly-0.2.0.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb \ --port 8088 \ --zk 10.0.0.50:12181,10.0.0.60:12181,10.0.0.70:12181 \ --refresh 5.minutes \ --retain 1.day >/dev/null 2>&1;
而后退出保存便可,接下来修改一下kafka-monitor-start.sh的权限apache
[root@kafka50 KafkaMonitor]# chmod +x kafka-monitor-start.sh
启动KafkaOffsetMonitor:vim
[root@kafka50 KafkaMonitor]# nohup /data/KafkaMonitor/kafka-monitor-start.sh & [1] 6551 [root@kafka50 KafkaMonitor]# lsof -i:8088 COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME java 6552 root 16u IPv6 26047 0t0 TCP *:radan-http (LISTEN)
在游览器中输入:http://ip:port便可以查看KafkaOffsetMonitor Web UI,以下图:安全
在下图中有一个Visualizations选项卡,点击其中的Cluster Overview能够查看当前Kafka集群的Broker状况app
首先为本次试验新建一个Topic,命令以下:
bin/kafka-topics.sh \ --create \ --zookeeper 10.0.0.50:12181 \ --replication-factor 3 \ --partition 3 \ --topic kafkamonitor-simpleproducer
在上一篇文章中提到的Producer封装Github代码的基础上,写了一个往kafkamonitor-simpleproducer发送message的java代码。
import com.ckm.kafka.producer.impl.KafkaProducerToolImpl; import com.ckm.kafka.producer.inter.KafkaProducerTool; /** * Created by ckm on 2016/8/30. */
public class SimpleProducer { public static void main(String[] args) { KafkaProducerTool kafkaProducerTool = new KafkaProducerToolImpl(); int i = 0; String message = ""; while (true) { message = "test-simple-producer : " + i ++; kafkaProducerTool.publishMessage("kafkamonitor-simpleproducer", message); } } }
程序运行效果:
用kafka自带的ConsoleConsumer消费kafkamonitor-simpleproducer中的message。
bin/kafka-console-consumer.sh --zookeeper m000:2181 --from-beginning --topic kafkamonitor-simpleproducer
消费截图以下:
(1)在Topic List选项卡中,咱们能够看到刚才新建的kafkamonitor-simpleproducer
(2)点开后,能看到有一个console-consumer正在消费该topic
(3)继续进入该Consumer,能够查看该Consumer当前的消费情况
这张图片的左上角显示了当前Topic的生产速率,右上角显示了当前Consumer的消费速率。
图片中还有三种颜色的线条,蓝色的表示当前Topic中的Message数目,灰色的表示当前Consumer消费的offset位置,红色的表示蓝色灰色的差值,即当前Consumer滞后于Producer的message数目。
(4)看一眼各partition中的message消费状况
从上图能够看到,当前有3个Partition,每一个Partition中的message数目分布很不均匀。这里能够与接下来的自定义Producer的状况进行一个对比。
bin/kafka-topics.sh \ --create \ --zookeeper 10.0.0.50:12181 \ --replication-factor 3 \ --partition 3 \ --topic kafkamonitor-partitionedproducer
逻辑很简单,循环依次往各Partition中发送message。
import kafka.producer.Partitioner; /** * Created by ckm on 2018/1/8. */
public class TestPartitioner implements Partitioner { public TestPartitioner() { } @Override public int partition(Object key, int numPartitions) { int intKey = (int) key; return intKey % numPartitions; } }
将自定义的Partitioner设置到Producer,其余调用过程和二中相似。
import com.ckm.kafka.producer.impl.KafkaProducerToolImpl; import com.ckm.kafka.producer.inter.KafkaProducerTool; /** * Created by ckm on 2016/8/30. */
public class PartitionedProducer { public static void main(String[] args) { KafkaProducerTool kafkaProducerTool = new KafkaProducerToolImpl(); kafkaProducerTool.getProducerProperties().put("partitioner.class", "TestPartitioner"); int i = 0; String message = ""; while (true) { message = "test-partitioner-producer : " + i; System.out.println(message); kafkaProducerTool.publishPartitionedMessage("kafkamonitor-partitionedproducer", i + "", message); i ++; } } }
代码运行效果以下图:
bin/kafka-console-consumer.sh --zookeeper 10.0.0.50:12181 --from-beginning --topic kafkamonitor-partitionedproducer
消费效果以下图:
其余页面与上面的相似,这里只观察一下每一个partition中的message数目与第二节中的对比。能够看到这里每一个Partition中message分别是很均匀的。
注意事项:
注意这里有一个坑,默认状况下Producer往一个不存在的Topic发送message时会自动建立这个Topic。因为在这个封装中,有同时传递message和topic的状况,若是调用方法时传入的参数反了,将会在Kafka集群中自动建立Topic。在正常状况下,应该是先把Topic根据须要建立好,而后Producer往该Topic发送Message,最好把Kafka这个默认自动建立Topic的功能关掉。
那么,假设真的不当心建立了多余的Topic,在删除时,会出现“marked for deletion”提示,只是将该topic标记为删除,使用list命令仍然能看到。若是须要调整这两个功能的话,在server.properties中配置以下两个参数:
参数 | 默认值 | 做用 |
---|---|---|
auto.create.topics.enable | true | Enable auto creation of topic on the server |
delete.topic.enable | false | Enables delete topic. Delete topic through the admin tool will have no effect if this config is turned off |
KafkaOffsetMonitor:程序一个jar包的形式运行,部署较为方便。只有监控功能,使用起来也较为安全。除了KafkaOffsetMonitor,Kafka监控工具还有另外两款:Kafka Web Console:监控功能较为全面,能够预览消息,监控Offset、Lag等信息,但存在bug,不建议在生产环境中使用。Kafka Manager:偏向Kafka集群管理,若操做不当,容易致使集群出现故障。对Kafka实时生产和消费消息是经过JMX实现的。没有记录Offset、Lag等信息。