1. 下载 zookeeper-3.4.12html
2 配置Zookeepergithub
进入 zookeeper 的 conf 目录下,找到 zoo_sample.cfg 文件。首先将 zoo_sample.cfg 文件备份,并重命名为 zoo.cfgshell
blockchain@Dao:~/zookeeper-3.4.13/conf$ cp zoo_sample.cfg zoo.cfgapache
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/home/blockchain/tmp/zookeeper
#dataDir=/tmp/zookeeper
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1bootstrap
其中,
tickTime:zookeeper 服务器之间或客户端与服务器之间心跳的时间间隔。
dataDir:zookeeper 保存数据的目录,默认状况下,Zookeeper 将写数据的日志文件也保存在这个目录里。
clientPort:zookeeper 服务器监听端口,用来接受客户端的访问请求。
maxClientCnxns:zookeeper可以接收的最大客户端链接数。ubuntu
dataDir 默认是 /tmp/zookeeper,因为 /tmp 是 Ubuntu 的 临时目录,这个路径下的数据不能长久保存,所以须要指定到别的目录。
3. 启动 Zookeeperruby
blockchain@Dao:~/zookeeper-3.4.13$ blockchain@Dao:~/zookeeper-3.4.13$ ./bin/zkServer.sh start ZooKeeper JMX enabled by default Using config: /home/blockchain/zookeeper-3.4.13/bin/../conf/zoo.cfg Starting zookeeper ... STARTED blockchain@Dao:~/zookeeper-3.4.13$ blockchain@Dao:~/zookeeper-3.4.13$ jps 31392 Jps 25248 QuorumPeerMain blockchain@Dao:~/zookeeper-3.4.13$
使用 status 参数来查看 zookeeper 的状态bash
blockchain@Dao:~/zookeeper-3.4.13$ ./bin/zkServer.sh status ZooKeeper JMX enabled by default Using config: /home/blockchain/zookeeper-3.4.13/bin/../conf/zoo.cfg Mode: standalone blockchain@Dao:~/zookeeper-3.4.13$
./zkCli.sh -server ip:port,默认端口为2181服务器
bin/zkCli.sh -server localhost:2181
测试是否安装成功
telnet localhost 2181
而后输入srvr
下载kafka kafka_2.11-2.1.1.tgz
启动kafka
bin/kafka-server-start.sh ~/kafka/config/server.properties > ~/kafka/kafka.log 2>&1 &
中止kafka
bin/kafka-server-stop.sh config/server.properties
3.建立一个主题
首先建立一个名为test
的topic,只使用单个分区和一个复本
1
|
bin
/kafka-topics
.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic
test
|
如今能够运行list topic命令看到咱们的主题
1
|
bin
/kafka-topics
.sh --list --zookeeper localhost:2181
|
4.发送消息
1
2
3
|
bin
/kafka-console-producer
.sh --broker-list localhost:9092 --topic
test
This is a message
This is another message
|
若是要批量导入文件数据到kafka,参考:2.1 本地环境下kafka批量导入数据
1
|
bin
/kafka-console-producer
.sh --broker-list localhost:9092 --topic test_topic < file_pat
|
5.启动一个消费者,消费者会接收到消息
旧版消费者
1
|
bin
/kafka-console-consumer
.sh --zookeeper localhost:2181 --topic
test
--from-beginning 2>
/dev/null
|
新版消费者
1
|
bin
/kafka-console-consumer
.sh --new-consumer --bootstrap-server localhost:9092 --topic input --from-beginning 2>
/dev/null
|
6.查看指定的topic的offset信息
对于结尾是ZK的消费者,其消费者的信息是存储在Zookeeper中的
对于结尾是KF的消费者,其消费者的信息是存在在Kafka的broker中的
均可以使用下面的命令进行查看
1
|
bin
/kafka-consumer-offset-checker
.sh --zookeeper localhost:2181 --group xxx --topic xxx
|
结果
1
2
3
4
|
bin
/kafka-consumer-offset-checker
.sh --zookeeper localhost:2181 --group
test
-consumer-group --topic xxx
[2018-09-03 20:34:57,595] WARN WARNING: ConsumerOffsetChecker is deprecated and will be dropped
in
releases following 0.9.0. Use ConsumerGroupCommand instead. (kafka.tools.ConsumerOffsetChecker$)
Group Topic Pid Offset logSize Lag Owner
test
-consumer-group xxx 0 509 0 -509 none
|
或者
1
|
.
/bin/kafka-run-class
.sh kafka.tools.ConsumerOffsetChecker --zookeeper localhost:2181 --group xxxx --topic xxxx
|
结果
1
2
3
4
|
bin
/kafka-run-class
.sh kafka.tools.ConsumerOffsetChecker --zookeeper localhost:2181 --group
test
-consumer-group
[2018-09-03 20:45:02,967] WARN WARNING: ConsumerOffsetChecker is deprecated and will be dropped
in
releases following 0.9.0. Use ConsumerGroupCommand instead. (kafka.tools.ConsumerOffsetChecker$)
Group Topic Pid Offset logSize Lag Owner
test
-consumer-group xxx 0 509 509 0 none
|
lag是负数的缘由是 topic中的消息数量过时(超过kafka默认的7天后被删除了),变成了0,因此Lag=logSize减去Offset,因此就变成了负数
7.删除一个topic
须要在 conf/server.properties 文件中设置
1
2
|
# For delete topic
delete.topic.
enable
=
true
|
不然在执行了如下删除命令后,再 list 查看全部的topic,仍是会看到该topic
1
|
bin
/kafka-topics
.sh --zookeeper localhost:2181 --delete --topic topicB
|
再到 配置文件 中的kafka数据存储地址去删除物理数据了,个人地址为
1
|
/tmp/kafka-logs
|
最后须要到zk里删除kafka的元数据
1
2
3
|
.
/bin/zkCli
.sh
#进入zk shell
ls
/brokers/topics
rmr
/brokers/topics/topicA
|
8.查看某个group的信息
新版
1
|
bin
/kafka-consumer-groups
.sh --new-consumer --bootstrap-server localhost:9092 --describe --group xxx
|
结果
1
2
3
|
bin
/kafka-consumer-groups
.sh --new-consumer --bootstrap-server localhost:9092 --describe --group group_id
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG OWNER
group_id xxx 0 509 509 0 consumer-1_
/127
.0.0.1
|
若是这时候消费者进程关闭了以后,使用上面的命令和下面的-list命令将不会查出这个group_id,可是当消费者进程从新开启后,这个group_id又能从新查到,且消费的offset不会丢失
旧版
1
|
bin
/kafka-consumer-groups
.sh --zookeeper 127.0.0.1:2181 --group xxx --describe
|
9.查看consumer group的列表
ZK的消费者可使用下面命令查看,好比上面的例子中的 test-consumer-group
1
|
bin
/kafka-consumer-groups
.sh --zookeeper 127.0.0.1:2181 --list
|
KF的消费者可使用下面命令查看,好比上面的例子中的 console-consumer-xxx ,可是只会查看到相似于 KMOffsetCache-lintong-B250M-DS3H 的结果,这是因为这种消费者的信息是存放在 __consumer_offsets 中
对于如何查看存储于 __consumer_offsets 中的新版消费者的信息,能够参考huxihx的博文: Kafka 如何读取offset topic内容 (__consumer_offsets)
1
|
bin
/kafka-consumer-groups
.sh --new-consumer --bootstrap-server localhost:9092 --list
|
10.在zk中删除一个consumer group
1
|
rmr
/consumers/test-consumer-group
|
11.查看topic的offset的最小值
1
2
|
bin
/kafka-run-class
.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 -topic xxxx --
time
-2
xxxx:0:0
|
12.查看topic的offset的最大值
1
|
bin
/kafka-run-class
.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 -topic xxxx --
time
-1
|
13.重置topic的某个消费者的offset为0,须要高版本的kafka才有该命令,在高版本的kafka client对低版本的kafka集群执行该命令是会生效的
1
|
kafka-consumer-
groups
--bootstrap-server localhost:9092 --group xxx --topic xxx --reset-offsets --to-earliest --execute
|
安装Kafka客户端librdkafka
安装下载https://github.com/edenhill/librdkafka
预备环境:
The GNU toolchain
GNU make
pthreads
zlib (optional, for gzip compression support)
libssl-dev (optional, for SSL and SASL SCRAM support)
libsasl2-dev (optional, for SASL GSSAPI support)
编译和安装:
./configure
make
sudo make install
ubuntu14安装kafka