转载请注明出处:http://www.cnblogs.com/xiaodf/java
bin/kafka-topics.sh --zookeeper node01:2181 --create --topic t_cdr --partitions 30 --replication-factor 2
注: partitions指定topic分区数,replication-factor指定topic每一个分区的副本数node
bin/kafka-topics.sh --zookeeper node01:2181 --list
bin/kafka-topics.sh --zookeeper node01:2181 --describe --topic t_cdr
bin/kafka-console-producer.sh --broker-list node86:9092 --topic t_cdr
bin/kafka-console-consumer.sh --zookeeper node01:2181 --topic t_cdr --from-beginning
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --topic hive-mdatabase-hostsltable --time -1 --broker-list node86:9092 --partitions 0
注: time为-1时表示最大值,time为-2时表示最小值shell
为topic t_cdr 增长10个分区服务器
bin/kafka-topics.sh --zookeeper node01:2181 --alter --topic t_cdr --partitions 10
bin/kafka-run-class.sh kafka.admin.DeleteTopicCommand --zookeeper node01:2181 --topic t_cdr
这个会显示出consumer group的offset状况, 必须参数为--group, 不指定--topic,默认为全部topic微信
Displays the: Consumer Group, Topic, Partitions, Offset, logSize, Lag, Owner for the specified set of Topics and Consumer Group工具
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker required argument: [group] Option Description ------ ----------- --broker-info Print broker info --group Consumer group. --help Print this message. --topic Comma-separated list of consumer topics (all topics if absent). --zkconnect ZooKeeper connect string. (default: localhost:2181) Example, bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group pv Group Topic Pid Offset logSize Lag Owner pv page_visits 0 21 21 0 none pv page_visits 1 19 19 0 none pv page_visits 2 20 20 0 none
以上图中参数含义解释以下:
topic:建立时topic名称
pid:分区编号
offset:表示该parition已经消费了多少条message
logSize:表示该partition已经写了多少条message
Lag:表示有多少条message没有被消费。
Owner:表示消费者大数据
细看kafka-run-class.sh脚本,它是调用 了ConsumerOffsetChecker的main方法,因此,咱们也能够经过java代码来访问scala的ConsumerOffsetChecker类,代码以下:ui
import kafka.tools.ConsumerOffsetChecker; /** * kafka自带不少工具类,其中ConsumerOffsetChecker能查看到消费者消费的状况, * ConsumerOffsetChecker只是将信息打印到标准的输出流中 * */ public class RunClass { public static void main(String[] args) { //group-1是消费者的group名称,能够在zk中 String[] arr = new String[]{"--zookeeper=192.168.199.129:2181,192.168.199.130:2181,192.168.199.131:2181/kafka","--group=group-1"}; ConsumerOffsetChecker.main(arr); } }
更多大数据技术干货,欢迎关注“大数据技术进阶”微信公众号
this