消息队列之 Kafka

  • Kafka 特点

Kafka 最早是由 LinkedIn 公司开发一种分布式的基于发布/订阅的消息系统,之后成为 Apache 的顶级项目。主要特点如下:

  1. 同时为发布和订阅提供高吞吐量

    Kafka 的设计目标是以时间复杂度为 O(1) 的方式提供消息持久化能力,即使对TB 级以上数据也能保证常数时间的访问性能。即使在非常廉价的商用机器上也能做到单机支持每秒 100K 条消息的传输。

  2. 消息持久化

    将消息持久化到磁盘,因此可用于批量消费,例如 ETL 以及实时应用程序。通过将数据持久化到硬盘以及 replication 防止数据丢失。

  3. 分布式

    支持 Server 间的消息分区及分布式消费,同时保证每个 partition 内的消息顺序传输。这样易于向外扩展,所有的producer、broker 和 consumer 都会有多个,均为分布式的。无需停机即可扩展机器。

  4. 消费消息采用 pull 模式

    消息被处理的状态是在 consumer 端维护,而不是由 server 端维护,broker 无状态,consumer 自己保存 offset。

  5. 支持 online 和 offline 的场景。

    同时支持离线数据处理和实时数据处理。

Kafka 中的基本概念

消息队列之 Kafka(上)

  1. Broker

    Kafka 集群中的一台或多台服务器统称为 Broker

  2. Topic

    每条发布到 Kafka 的消息都有一个类别,这个类别被称为 Topic 。(物理上不同Topic 的消息分开存储。逻辑上一个 Topic 的消息虽然保存于一个或多个broker上,但用户只需指定消息的 Topic 即可生产或消费数据而不必关心数据存于何处)

  3. Partition

    Topic 物理上的分组,一个 Topic 可以分为多个 Partition ,每个 Partition 是一个有序的队列。Partition 中的每条消息都会被分配一个有序的 id(offset)

  4. Producer

    消息和数据的生产者,可以理解为往 Kafka 发消息的客户端

  5. Consumer

    消息和数据的消费者,可以理解为从 Kafka 取消息的客户端

  6. Consumer Group

    每个 Consumer 属于一个特定的 Consumer Group(可为每个 Consumer 指定Group Name,若不指定 Group Name 则属于默认的 Group)。

    这是 Kafka 用来实现一个 Topic 消息的广播(发给所有的 Consumer )和单播(发给任意一个 Consumer )的手段。一个 Topic 可以有多个 Consumer Group。Topic 的消息会复制(不是真的复制,是概念上的)到所有的 Consumer Group,但每个 Consumer Group 只会把消息发给该 Consumer Group 中的一个 Consumer。如果要实现广播,只要每个 Consumer 有一个独立的 Consumer Group 就可以了。如果要实现单播只要所有的 Consumer 在同一个 Consumer Group 。用 Consumer Group 还可以将 Consumer 进行自由的分组而不需要多次发送消息到不同的 Topic 。

Kafka 安装

Mac 用户用 HomeBrew 来安装,安装前要先更新 brew

brew update

接着安装 kafka

brew install kafka

安装完成之后可以查看 kafka 的配置文件

cd /usr/local/etc/kafka

消息队列之 Kafka(上)

kafka 配置文件

我的电脑通过 HomeBrew 安装的 kafka 位置在 /usr/local/Cellar/kafka/0.11.0.1/bin ,可以看到 HomeBrew 安装下来的 kafka 的版本已经是 0.11.0.1 的了。

kafka 需要用到 zookeeper,HomeBrew 安装kafka 的时候会同时安装 zookeeper。下面先启动 zookeeper:

zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties

接着启动 kafka

cd /usr/local/Cellar/kafka/0.11.0.1

./bin/kafka-server-start /usr/local/etc/kafka/server.properties

创建 topic,设置 partition 数量为2,topic 的名字叫 test-topic,下面的例子都用这个 topic

cd /usr/local/Cellar/kafka/0.11.0.1

./bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic test-topic

查看创建的 topic

cd /usr/local/Cellar/kafka/0.11.0.1

./bin/kafka-topics --list --zookeeper localhost:2181

Kafka 命令行测试

发送消息

cd /usr/local/Cellar/kafka/0.11.0.1/bin

kafka-console-producer --broker-list localhost:9092 --topic test-topic

消费消息

cd /usr/local/Cellar/kafka/0.11.0.1/bin

kafka-console-consumer --bootstrap-server localhost:9092 --topic test-topic --from-beginning

删除 topic

cd /usr/local/Cellar/kafka/0.11.0.1/bin

./bin/kafka-topics --delete --zookeeper localhost:2181 --topic test-topic

如果 kafka 启动时加载的配置文件中 server.properties 没有配置delete.topic.enable=true,那么此时的删除并不是真正的删除,而是把 topic 标记为:marked for deletion

可以通过命令来查看所有 topic

cd /usr/local/Cellar/kafka/0.11.0.1/bin

./bin/kafka-topics --zookeeper localhost:2181 --list

如果想真正删除它,可以如下操作

登录zookeeper客户端:/usr/local/Cellar/zookeeper/3.4.10/bin/zkCli

找到topic所在的目录:ls /brokers/topics

找到要删除的topic,执行命令:rmr /brokers/topics/test-topic 即可,此时topic被彻底删除

Java 客户端访问

1. maven工程的pom文件中添加依赖

<dependency>

<groupId>org.apache.kafka</groupId>

<artifactId>kafka-clients</artifactId>

<version>0.11.0.1</version></dependency>

2. 消息生产者

package org.study.kafka;

import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.Producer;import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.HashMap;import java.util.Map;

public class ProducerSample {

public static void main(String[] args) {

Map<String, Object> props = new HashMap<String, Object>();

props.put("zk.connect", "127.0.0.1:2181");//zookeeper 的地址

props.put("bootstrap.servers", "localhost:9092");//用于建立与 kafka 集群连接的 host/port 组。

props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

String topic = "test-topic";

Producer<String, String> producer = new KafkaProducer<String, String>(props);

producer.send(new ProducerRecord<String, String>(topic, "idea-key2", "java-message 1"));

producer.send(new ProducerRecord<String, String>(topic, "idea-key2", "java-message 2"));

producer.send(new ProducerRecord<String, String>(topic, "idea-key2", "java-message 3"));

producer.close();

}

}

3. 消息消费者

package org.study.kafka;

import org.apache.kafka.clients.consumer.Consumer;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;import java.util.Properties;

public class ConsumerSample {

public static void main(String[] args) {

String topic = "test-topic";// topic name

Properties props = new Properties();

props.put("bootstrap.servers", "localhost:9092");//用于建立与 kafka 集群连接的 host/port 组。

props.put("group.id", "testGroup1");// Consumer Group Name

props.put("enable.auto.commit", "true");// Consumer 的 offset 是否自动提交

props.put("auto.commit.interval.ms", "1000");// 自动提交 offset 到 zookeeper 的时间间隔,时间是毫秒

props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

Consumer<String, String> consumer = new KafkaConsumer(props);

consumer.subscribe(Arrays.asList(topic));

while (true) {

ConsumerRecords<String, String> records = consumer.poll(100);

for (ConsumerRecord<String, String> record : records)

System.out.printf("partition = %d, offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value());

}

}

}

4. 启动 zookeeper

zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties

5. 启动 kafka 服务器

kafka-server-start /usr/local/etc/kafka/server.properties

6. 运行 Consumer

先运行 Consumer ,这样当生产者发送消息的时候能在消费者后端看到消息记录。

7.运行 Producer

运行 Producer,发布几条消息,在 Consumer 的控制台能看到接收的消息

消息队列之 Kafka(上)

Consumer 控制台

Kafka 集群配置

kafka 的集群配置一般有三种,即: single node - single broker ,single node - multiple broker ,multiple node - multiple broker

前两种实际上官网有介绍。

  • single node - single broker

消息队列之 Kafka(上)

单节点单 broker

1. 启动 zookeeper

zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties

2. 启动 kafka broker

kafka-server-start /usr/local/etc/kafka/server.properties

3. 创建一个 kafka topic

kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic topic-singlenode-singlebroker

4. 启动 producer 发送信息

kafka-console-producer --broker-list localhost:9092 --topic topic-singlenode-singlebroker

broker-list 和 topic 这两个参数是必须的,broker-list 指定要连接的 broker 的地址,格式为 node_address:port 。topic 是必须的,因为需要发送消息给订阅了该

topic 的 consumer group 。

现在可以在命令行里输入一些信息,每一行会被作为一个消息。

消息队列之 Kafka(上)

发送消息

5. 启动 consumer 消费消息

kafka-console-consumer --bootstrap-server localhost:9092 --topic topic-singlenode-singlebroker

在不同的终端窗口里分别启动 zookeeper、broker、producer、consumer 后,在

producer 终端里输入消息,消息就会在 consumer 终端中显示了。

消息队列之 Kafka(上)

消息显示

  • single node - multiple broker

消息队列之 Kafka(上)

单节点多 broker

1. 启动 zookeeper

zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties

2. 启动broker

如果需要在单个节点(即一台机子)上面启动多个 broker(这里作为例子启动三个 broker),需要准备多个server.properties文件即可,所以需要复制 /usr/local/etc/kafka/server.properties 文件。因为需要为每个 broker 指定单独的属性配置文件,其中 broker.id 、 port 、 log.dir 这三个属性必须是不同的。

新建一个 kafka-example 目录和三个存放日志的目录

  • mkdir kafka-example

  • mkdir kafka-logs-1

  • mkdir kafka-logs-2

  • mkdir kafka-logs-3

复制 /usr/local/etc/kafka/server.properties 文件三份

  • cp server.properties /Users/niwei/Downloads/kafka-example/server-1.properties

  • cp server.properties /Users/niwei/Downloads/kafka-example/server-2.properties

  • cp server.properties /Users/niwei/Downloads/kafka-example/server-3.properties

在 broker1 的配置文件 server-1.properties 中,相关要修改的参数为:

broker.id=1

port=9093

log.dirs=/Users/niwei/Downloads/kafka-example/kafka-logs-1

broker2 的配置文件 server-2.properties 中,相关要修改的参数为:

broker.id=2

port=9094

log.dirs=/Users/niwei/Downloads/kafka-example/kafka-logs-2

broker3 的配置文件 server-3.properties 中,相关要修改的参数为:

broker.id=3

port=9095

log.dirs=/Users/niwei/Downloads/kafka-example/kafka-logs-3

启动每个 broker

cd /Users/niwei/Downloads/kafka-example

kafka-server-start server-1.properties

kafka-server-start server-2.properties

kafka-server-start server-3.properties

3. 创建 topic

创建一个名为 topic-singlenode-multiplebroker 的topic

kafka-topics --create --zookeeper localhost:2181 --replication-factor 3

--partitions 1 --topic topic-singlenode-multiplebroker

4. 启动 producer 发送信息

如果一个 producer 需要连接多个 broker 则需要传递参数 broker-list

kafka-console-producer --broker-list localhost:9093, localhost:9094,

localhost:9095 --topic topic-singlenode-multiplebroker

5. 启动 consumer 消费消息

kafka-console-consumer --zookeeper localhost:2181 --topic topic-singlenode-multiplebroker

消息队列之 Kafka(上)

单节点多 broker 消费消息

  • multiple node - multiple broker

消息队列之 Kafka(上)