kafka(二)—Kafka 的安装和使用(Quickstart)

本文已同步至我的博客 liaosi's blog-Kafka 的安装和使用(Quickstart)

Step1:下载Kafka

官网下载地址下载最新版本并解压:java

tar -xzf kafka_2.11-1.1.0.tgz
cd kafka_2.11-1.1.0

Step2:配置

  1. ZooKeeper的配置,参考前面的ZooKeeper的文章,此处不详细作说明。
  2. Kafka的配置
    在Kafka的home目录下,vim config/server.properties命令,最主要的是设置ZooKeeper链接的地址和接口:
    zookeeper.connect=localhost:2181

Step3:启动服务

Kafka用到了ZooKeeper,因此要先启动ZooKeepr。
第一种方式(推荐)是启动独立的ZooKeeper程序,在$ZOOKEEPER_HOME/bin目录下,执行./zkServer.sh start
第二种方式是启动Kafka内置的一个单节点ZooKeeper实例,方法是在$KAFKA_HOME目录下执行:apache

bin/zookeeper-server-start.sh config/zookeeper.properties

启动了ZooKeeper以后,就能够启动Kafka服务了,在$KAFKA_HOME目录下执行:bootstrap

bin/kafka-server-start.sh config/server.properties

若是要以守护线程启动,则执行:vim

bin/kafka-server-start.sh -daemon config/server.properties

Step4:建立一个topic

建立一个叫作testTopic的topic,而且它只有一个分区,一个副本。并发

[root@server01 kafka_2.11-1.0.0]# bin/kafka-topics.sh --create --topic testTopic --zookeeper localhost:2181 --replication-factor 1 --partitions 1
Created topic "testTopic".

partitions参数:表示主题的分区(partition)的数量。
replication-factor参数:表示每一个partition的备份数量。
zookeeper参数:表示zookeeper的主机地址和客户端链接端口。若是zookeeper是集群,这里也能够只写一个节点就行。app

列出Kafka上的全部topic:oop

[root@server01 kafka_2.11-1.0.0]# bin/kafka-topics.sh --list --zookeeper localhost:2181
myTest
testTopic

上面说明个人kafka已经建立了两个主题:myTest和testTopic。测试

Step5:发送消息

Kafka 使用一个简单的命令行客户端producer,能够从文件中或者从标准输入中读取消息并发送到Kafka服务端集群。默认的每一行都将做为一条独立的消息ui

运行producer并在控制台中输一些消息,这些消息将被发送到服务端:命令行

[root@myServer kafka_2.11-1.0.0]# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic testTopic
>This is a message
>This is another message

Step6:启动consumer

Kafka也有一个命令行consumer能够读取消息并输出到标准输出:

[root@myServer kafka_2.11-1.0.0]#bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic testTopic --from-beginning
This is a message
This is another message

若是你在一个终端中运行consumer命令行,另外一个终端中运行producer命令行,就能够在一个终端输入消息,另外一个终端读取消息。
这两个命令都有本身的可选参数,能够在运行的时候不加任何参数能够看到帮助信息。

Step7:搭建一个多个broker的集群

刚才只是启动了单个broker,如今启动有3个broker组成的集群,这些broker节点也都是在本机上的:
首先为每一个节点编写配置文件:

> cp config/server.properties config/server-1.properties
> cp config/server.properties config/server-2.properties

在拷贝出的新文件中修改以下3个参数:

config/server-1.properties:

broker.id=1
    listeners=PLAINTEXT://:9093
    log.dir=/tmp/kafka-logs-1

config/server-2.properties:

broker.id=2
    listeners=PLAINTEXT://:9094
    log.dir=/tmp/kafka-logs-2

broker.id在集群中惟一的标注一个节点,由于在同一个机器上,因此必须制定不一样的端口和日志文件,避免数据被覆盖。
刚才已经启动可Zookeeper和一个节点,如今启动另外两个Kafka节点:

> bin/kafka-server-start.sh  -daemon config/server-1.properties 
...
> bin/kafka-server-start.sh  -daemon config/server-2.properties 
...

查看进程:

[root@server01 kafka_2.11-1.0.0]# jps -lm
9857 sun.tools.jps.Jps -lm
1737 org.apache.zookeeper.server.quorum.QuorumPeerMain /home/hadoop/app/zookeeper-3.4.11/bin/../conf/zoo.cfg
9517 kafka.Kafka config/server-2.properties
5566 kafka.Kafka ../config/server.properties
9199 kafka.Kafka config/server-1.properties

建立一个拥有3个副本的topic:

[root@server01 kafka_2.11-1.0.0]# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 3 --topic my-replicated-topic
Created topic "my-replicated-topic".

如今咱们搭建了一个集群,怎么知道每一个节点的信息呢?运行“"describe topics”命令就能够了:

[root@server01 kafka_2.11-1.0.0]# bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic   PartitionCount:3    ReplicationFactor:2 Configs:
    Topic: my-replicated-topic  Partition: 0    Leader: 2   Replicas: 2,0   Isr: 2,0
    Topic: my-replicated-topic  Partition: 1    Leader: 0   Replicas: 0,1   Isr: 0,1
    Topic: my-replicated-topic  Partition: 2    Leader: 1   Replicas: 1,2   Isr: 1,2

下面解释一下这些输出。第一行是对全部分区的一个描述,而后每一个分区都会对应一行,咱们建立topic的时候指明是3个分区,因此有3行。
leader:负责处理消息的读和写的broker,leader是从全部节点中随机选择的.
replicas:列出了全部的副本节点,无论节点是否在服务中.
isr:是正在服务中的节点.

向topic发送消息:

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
...
my test message 1
my test message 2
^C

消费这些消息:

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
...
my test message 2
my test message 1
^C

这里接收到的顺序不一致时由于发送到了不一样的partiton所致。

测试一下kafka集群的容错能力。对于partition=1,其leader是brokerid=0的节点,如今咱们kill掉它:

> ps -aux | grep server.properties
root     32658  0.5 17.4 4305700 329028 pts/0  Sl+  10:06   0:15 /opt/jdk1.8.0_121/bin/java...
> kill -9 32658

再次查询该topic,能够看到另一个brokerid=1的节点被选作了leader,brokeid=0的节点再也不出如今 in-sync 副本列表中:

[root@server01 kafka_2.11-1.0.0]# bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic   PartitionCount:3    ReplicationFactor:2 Configs:
    Topic: my-replicated-topic  Partition: 0    Leader: 2   Replicas: 2,0   Isr: 2
    Topic: my-replicated-topic  Partition: 1    Leader: 1   Replicas: 0,1   Isr: 1
    Topic: my-replicated-topic  Partition: 2    Leader: 1   Replicas: 1,2   Isr: 1,2

虽然最初负责写消息的leader down掉了,但以前的消息仍是能够消费的,不过bootstrap-server参数要选择没有down掉的节点:

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --from-beginning --topic my-replicated-topic
...
my test message 2
my test message 1
^C
相关文章
相关标签/搜索