> tar -xzf kafka_2.10-0.8.2.0.tgz> cd kafka_2.10-0.8.2.0
Kafka 使用 ZooKeeper 所以须要首先启动 ZooKeeper 服务。若是你没有ZooKeeper 服务,可使用Kafka自带脚本启动一个应急的单点的 ZooKeeper 实例。 java
> bin/zookeeper-server-start.sh config/zookeeper.properties
[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)...
Now start the Kafka server:
> bin/kafka-server-start.sh config/server.properties
[2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)[2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)...
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
运行以下命令查看此topic信息:
> bin/kafka-topics.sh --list --zookeeper localhost:2181 test
咱们也能够经过配置
,让brokers
自动建立该topic,
当向一个不存在的topic发布消息时 。
运行producer 而后输入几条消息到控制台,输入回车发送到服务端。 node
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message
> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
This is a message
This is another message
不加参数运行命令会显示详细使用文档. apache
首先咱们为每一个broker 准备一个配置文件: dom
> cp config/server.properties config/server-1.properties
> cp config/server.properties config/server-2.properties
Now 编辑这些新文件,内容以下:
config/server-1.properties:
broker.id=1
port=9093
log.dir=/tmp/kafka-logs-1
config/server-2.properties:
broker.id=2
port=9094
log.dir=/tmp/kafka-logs-2
以前已经开启过Zookeeper 和 broker了,如今只须要再启动两就好: socket
> bin/kafka-server-start.sh config/server-1.properties &...
> bin/kafka-server-start.sh config/server-2.properties &...
建立一个新的topic 并为其设置副本因子:
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
OK,如今咱们已经搭建好一个集群了,可是咱们还不知道他是怎么工做的?使用"describe topics"命令查看:
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0输出的第一行全部分区的概述, 附加的每行是关于每一个分区的说明。由于咱们只有一个分区因此,这里只有一行。
咱们再查看下原来的 topic: 工具
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
Topic:test PartitionCount:1 ReplicationFactor:1 Configs:
Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
这没什么惊讶的—原来的 topic 没有设置副本,因此
Replicas是
0, 咱们建立它时集群中只有一个节点。
让咱们发布几条消息到咱们的新 topic: 测试
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
...
my test message 1
my test message 2
^C
Now let's consume these messages:
> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic
...
my test message 1
my test message 2
^C
如今让咱们测试下容错性。这里 Broker 1 是 leader 因此咱们 kill 掉它:
> ps | grep server-1.properties
7564 ttys002 0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.6/Home/bin/java
...
> kill -9 7564
而后leader 切换为其余节点而且节点1 也再也不 in-sync 设置中了:
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 1,2,0 Isr: 2,0
可是消息仍然能够消费, 即便是以前的leader 写下的日志:
> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic
...
my test message 1
my test message 2
^C