本文已同步至我的博客 liaosi's blog-Kafka 的安装和使用(Quickstart)
去官网下载地址下载最新版本并解压:java
tar -xzf kafka_2.11-1.1.0.tgz cd kafka_2.11-1.1.0
vim config/server.properties
命令,最主要的是设置ZooKeeper链接的地址和接口:Kafka用到了ZooKeeper,因此要先启动ZooKeepr。
第一种方式(推荐)是启动独立的ZooKeeper程序,在$ZOOKEEPER_HOME/bin
目录下,执行./zkServer.sh start
。
第二种方式是启动Kafka内置的一个单节点ZooKeeper实例,方法是在$KAFKA_HOME
目录下执行:apache
bin/zookeeper-server-start.sh config/zookeeper.properties
启动了ZooKeeper以后,就能够启动Kafka服务了,在$KAFKA_HOME
目录下执行:bootstrap
bin/kafka-server-start.sh config/server.properties
若是要以守护线程启动,则执行:vim
bin/kafka-server-start.sh -daemon config/server.properties
建立一个叫作testTopic
的topic,而且它只有一个分区,一个副本。并发
[root@server01 kafka_2.11-1.0.0]# bin/kafka-topics.sh --create --topic testTopic --zookeeper localhost:2181 --replication-factor 1 --partitions 1 Created topic "testTopic".
partitions参数:表示主题的分区(partition)的数量。
replication-factor参数:表示每一个partition的备份数量。
zookeeper参数:表示zookeeper的主机地址和客户端链接端口。若是zookeeper是集群,这里也能够只写一个节点就行。app
列出Kafka上的全部topic:oop
[root@server01 kafka_2.11-1.0.0]# bin/kafka-topics.sh --list --zookeeper localhost:2181 myTest testTopic
上面说明个人kafka已经建立了两个主题:myTest和testTopic。测试
Kafka 使用一个简单的命令行客户端producer,能够从文件中或者从标准输入中读取消息并发送到Kafka服务端集群。默认的每一行都将做为一条独立的消息ui
运行producer并在控制台中输一些消息,这些消息将被发送到服务端:命令行
[root@myServer kafka_2.11-1.0.0]# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic testTopic >This is a message >This is another message
Kafka也有一个命令行consumer能够读取消息并输出到标准输出:
[root@myServer kafka_2.11-1.0.0]#bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic testTopic --from-beginning This is a message This is another message
若是你在一个终端中运行consumer命令行,另外一个终端中运行producer命令行,就能够在一个终端输入消息,另外一个终端读取消息。
这两个命令都有本身的可选参数,能够在运行的时候不加任何参数能够看到帮助信息。
刚才只是启动了单个broker,如今启动有3个broker组成的集群,这些broker节点也都是在本机上的:
首先为每一个节点编写配置文件:
> cp config/server.properties config/server-1.properties > cp config/server.properties config/server-2.properties
在拷贝出的新文件中修改以下3个参数:
config/server-1.properties:
broker.id=1 listeners=PLAINTEXT://:9093 log.dir=/tmp/kafka-logs-1
config/server-2.properties:
broker.id=2 listeners=PLAINTEXT://:9094 log.dir=/tmp/kafka-logs-2
broker.id在集群中惟一的标注一个节点,由于在同一个机器上,因此必须制定不一样的端口和日志文件,避免数据被覆盖。
刚才已经启动可Zookeeper和一个节点,如今启动另外两个Kafka节点:
> bin/kafka-server-start.sh -daemon config/server-1.properties ... > bin/kafka-server-start.sh -daemon config/server-2.properties ...
查看进程:
[root@server01 kafka_2.11-1.0.0]# jps -lm 9857 sun.tools.jps.Jps -lm 1737 org.apache.zookeeper.server.quorum.QuorumPeerMain /home/hadoop/app/zookeeper-3.4.11/bin/../conf/zoo.cfg 9517 kafka.Kafka config/server-2.properties 5566 kafka.Kafka ../config/server.properties 9199 kafka.Kafka config/server-1.properties
建立一个拥有3个副本的topic:
[root@server01 kafka_2.11-1.0.0]# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 3 --topic my-replicated-topic Created topic "my-replicated-topic".
如今咱们搭建了一个集群,怎么知道每一个节点的信息呢?运行“"describe topics”命令就能够了:
[root@server01 kafka_2.11-1.0.0]# bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic Topic:my-replicated-topic PartitionCount:3 ReplicationFactor:2 Configs: Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 2,0 Isr: 2,0 Topic: my-replicated-topic Partition: 1 Leader: 0 Replicas: 0,1 Isr: 0,1 Topic: my-replicated-topic Partition: 2 Leader: 1 Replicas: 1,2 Isr: 1,2
下面解释一下这些输出。第一行是对全部分区的一个描述,而后每一个分区都会对应一行,咱们建立topic的时候指明是3个分区,因此有3行。
leader:负责处理消息的读和写的broker,leader是从全部节点中随机选择的.
replicas:列出了全部的副本节点,无论节点是否在服务中.
isr:是正在服务中的节点.
向topic发送消息:
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic ... my test message 1 my test message 2 ^C
消费这些消息:
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic ... my test message 2 my test message 1 ^C
这里接收到的顺序不一致时由于发送到了不一样的partiton所致。
测试一下kafka集群的容错能力。对于partition=1,其leader是brokerid=0的节点,如今咱们kill掉它:
> ps -aux | grep server.properties root 32658 0.5 17.4 4305700 329028 pts/0 Sl+ 10:06 0:15 /opt/jdk1.8.0_121/bin/java... > kill -9 32658
再次查询该topic,能够看到另一个brokerid=1的节点被选作了leader,brokeid=0的节点再也不出如今 in-sync 副本列表中:
[root@server01 kafka_2.11-1.0.0]# bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic Topic:my-replicated-topic PartitionCount:3 ReplicationFactor:2 Configs: Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 2,0 Isr: 2 Topic: my-replicated-topic Partition: 1 Leader: 1 Replicas: 0,1 Isr: 1 Topic: my-replicated-topic Partition: 2 Leader: 1 Replicas: 1,2 Isr: 1,2
虽然最初负责写消息的leader down掉了,但以前的消息仍是能够消费的,不过bootstrap-server参数要选择没有down掉的节点:
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --from-beginning --topic my-replicated-topic ... my test message 2 my test message 1 ^C