Kafka ACL使用实战

自0.9.0.0.版本引入Security以后,Kafka一直在完善security的功能。当前Kafka security主要包含3大功能:认证(authentication)、信道加密(encryption)和受权(authorization)。信道加密就是为client到broker、broker到broker以及工具脚本与broker之间的数据传输配置SSL;认证机制主要是指配置SASL,而受权是经过ACL接口命令来完成的。html

生产环境中,用户若要使用SASL则必须配置Kerberos,但对于一些小公司而言,他们的用户系统并不复杂(特别是专门为Kafka集群服务的用户可能不是不少),显然使用Kerberos有些大材小用,并且因为运行在内网环境,SSL加密也不是很必要。所以一个SASL+PLAINTEXT的集群环境足以应付通常的使用场景。本文给出一个可运行的实例来演示一下如何在不使用Kerberos的状况下配置SASL + ACL来构建secured Kafka集群。java

在开始以前,咱们简单学习下Kafka ACL的格式。根据官网的介绍,Kafka中一条ACL的格式以下:“Principal P is [Allowed/Denied] Operation O From Host H On Resource R”。它们的含义描述以下:apache

  • principal:表示一个Kafka user
  • operation:表示一个具体的操做类型,如WRITE, READ, DESCRIBE等。完整的操做列表详见:http://docs.confluent.io/current/kafka/authorization.html#overview
  • Host:表示连向Kafka集群的client的IP地址,若是是‘*’则表示全部IP。注意:当前Kafka不支持主机名,只能指定IP地址
  • Resource:表示一种Kafka资源类型。当前共有4种类型:TOPIC、CLUSTER、GROUP、TRANSACTIONID

下面我使用Kafka 0.11.0.0版原本演示下如何构建支持SASL + PLAINTEXT + ACL的Kafka集群环境。bootstrap

1. Broker端配置

要配置SASL和ACL,咱们须要在broker端进行两个方面的设置。首先是建立包含全部认证用户信息的JAAS文件。本例中,咱们假设有3个用户:admin, reader和writer,其中admin是管理员,reader用户读取Kafka集群中topic数据,而writer用户则负责向Kafka集群写入消息。咱们假设这3个用户的密码分别与用户名相同(在实际场景中,管理员须要单独把密码发给各自的用户),所以咱们能够这样编写JAAS文件:安全

KafkaServer {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="admin"
  password="admin"
  user_admin="admin"
  user_reader="reader"
  user_writer="writer";
};工具

保存该文件为kafka_cluster_jaas.conf(本例中的完整路径是/Users/huxi/SourceCode/newenv/kafka_cluster_jaas.conf),以后咱们须要把该文件的完整路径做为一个JVM参数传递给Kafka的启动脚本。不过因为bin/kafka-server-start.sh只接收server.properties的位置,再也不接收其余任何参数,故咱们须要修改该启动脚本。具体作法以下:学习

$ pwd
/Users/huxi/SourceCode/newenv/kafka_0.11
$ cp bin/kafka-server-start.sh bin/secured-kafka-server-start.sh测试

$ vi bin/secured-kafka-server-start.shfetch

把该文件中的这行:
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"
修改成下面这行,而后保存退出
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=/Users/huxi/SourceCode/newenv/kafka_cluster_jaas.conf kafka.Kafka "$@"ui

作完上面的步骤后,咱们就在bin/目录下新建了一个secured-kafka-server-start.sh启动脚本。

配置好JAAS文件后,咱们开始修改broker启动所需的server.properties文件,你至少须要配置(或修改)如下这些参数:

# 配置ACL入口类
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
# 本例使用SASL_PLAINTEXT
listeners=SASL_PLAINTEXT://:9092
security.inter.broker.protocol= SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
# 设置本例中admin为超级用户
super.users=User:admin

Okay,如今咱们能够启动broker了(当前确定要先启动Zookeeper):

bin/secured-kafka-server-start.sh ../config_files/server.properties

.......

[2017-08-17 16:07:30,417] INFO [Kafka Server 0], started (kafka.server.KafkaServer) 

可见,Kafka broker已经成功启动了。不过当前该broker只会接收已认证client发来的请求。下面咱们继续clients端的配置。

2. Client端配置

咱们先来建立一个测试topic,名为test,单分区,副本因子是1。

$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic test --partitions 1 --replication-factor 1
Created topic "test".

咦?稍等下,咱们不是启用ACL了吗?这里为何会建立成功呢?这里我简要说明一下: 首先咱们启用ACL了吗?固然!因为咱们没有显式地设置allow.everyone.if.no.acl.found,故这个参数默认是false,也就是说对于目前的这个Kafka集群而言,除了超级用户以外的其余任何用户都没法执行任何操做。其次,那为何建立topic成功了呢?这是由于当前kafka-topics.sh脚本直接链接Zookeeper,故不受ACL的限制。因此不管是否配置了security,用户老是可使用kafka-topics来管理topic。

言归正传,本例中咱们的目的是要测试:

  • 用户writer向test topic写入消息
  • 用户reader从test topic读取消息

下面咱们先来启动一个console-consumer和一个console-producer来看下当前是个什么情况:

$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>hello, kafka
[2017-08-17 16:16:09,626] WARN Bootstrap broker localhost:9092 disconnected (org.apache.kafka.clients.NetworkClient)
[2017-08-17 16:16:09,685] WARN Bootstrap broker localhost:9092 disconnected (org.apache.kafka.clients.NetworkClient)
[2017-08-17 16:16:09,740] WARN Bootstrap broker localhost:9092 disconnected (org.apache.kafka.clients.NetworkClient)
[2017-08-17 16:16:09,799] WARN Bootstrap broker localhost:9092 disconnected (org.apache.kafka.clients.NetworkClient)

......

$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
[2017-08-17 16:16:47,936] WARN Bootstrap broker localhost:9092 disconnected (org.apache.kafka.clients.NetworkClient)
[2017-08-17 16:16:47,992] WARN Bootstrap broker localhost:9092 disconnected (org.apache.kafka.clients.NetworkClient)
[2017-08-17 16:16:48,052] WARN Bootstrap broker localhost:9092 disconnected (org.apache.kafka.clients.NetworkClient)
[2017-08-17 16:16:48,163] WARN Bootstrap broker localhost:9092 disconnected (org.apache.kafka.clients.NetworkClient)
......

看到了吧,当前的console producer/consumer都没法工做,报的错误就是没法链接broker(localhost:9092)。这就是由于咱们设置了security的缘故。下面咱们为writer用户配置安全,首先须要建立一个属于writer用户的JAAS文件,该文件中指定了writer用户的credentials,以下所示:

KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="writer"
password="writer";
};

把上述内容保存到writer_jaas.conf文件(/Users/huxi/SourceCode/newenv/writer_jaas.conf)中。和broker相似,咱们须要拷贝一份新的bin/kafka-console-producer.sh将该JAAS文件做为一个JVM参数传给console producer:

$ cp bin/kafka-console-producer.sh bin/writer-kafka-console-producer.sh
$ vi bin/writer-kafka-console-producer.sh

把该文件中的这行:
exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleProducer "$@"
修改成下面这行,而后保存退出
exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=/Users/huxi/SourceCode/newenv/writer_jaas.conf kafka.tools.ConsoleProducer "$@"

而后,咱们建立一个producer.config为该console producer指定如下两个属性:

security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN

如今咱们使用新的脚原本启动console producer:

$ bin/writer-kafka-console-producer.sh --broker-list localhost:9092 --topic test --producer.config producer.config
>hello, kafka
[2017-08-17 16:28:13,930] WARN Error while fetching metadata with correlation id 1 : {test=UNKNOWN_TOPIC_OR_PARTITION} (org.apache.kafka.clients.NetworkClient)
[2017-08-17 16:28:14,036] WARN Error while fetching metadata with correlation id 3 : {test=UNKNOWN_TOPIC_OR_PARTITION} (org.apache.kafka.clients.NetworkClient)
[2017-08-17 16:28:14,143] WARN Error while fetching metadata with correlation id 4 : {test=UNKNOWN_TOPIC_OR_PARTITION} (org.apache.kafka.clients.NetworkClient)
[2017-08-17 16:28:14,246] WARN Error while fetching metadata with correlation id 5 : {test=UNKNOWN_TOPIC_OR_PARTITION} (org.apache.kafka.clients.NetworkClient)

依然有错误,不过错误变成“没法获取元数据”了。这说明咱们运行的console producer经过了认证,可是没有经过受权,所以咱们须要配置ACL来让writer用户有权限写入topic:

$ bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:writer --operation Write --topic test

再试producer:

$ bin/writer-kafka-console-producer.sh --broker-list localhost:9092 --topic test --producer.config producer.config
>hello
>hello, kafka
>message abc

......

 producer生产消息成功了。等等!子曰:没有消费以前永远不要断言生产成功了!下面咱们就来配置下consumer,即用户reader。

和writer用户相似,咱们首先建立reader用户的JAAS文件,保存在/Users/huxi/SourceCode/newenv/kafka_0.11/reader_jaas.conf中:

KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="reader"
password="reader";
};

而后拷贝一份新的console consumer来指定上面的reader_jaas.conf:

$ cp bin/kafka-console-consumer.sh bin/reader-kafka-console-consumer.sh
$ vi bin/reader-kafka-console-consumer.sh

把该文件中的这行:
exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleConsumer "$@"
修改成下面这行,而后保存退出
exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=/Users/huxi/SourceCode/newenv/reader_jaas.conf kafka.tools.ConsoleConsumer "$@"

而后,咱们建立一个consumer.config为该console producer指定如下3个属性:

security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
group.id=test-group

如今运行console consumer:

$ bin/reader-kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning --consumer.config consumer.config
[2017-08-17 16:37:54,124] WARN Error while fetching metadata with correlation id 2 : {test=UNKNOWN_TOPIC_OR_PARTITION} (org.apache.kafka.clients.NetworkClient)
[2017-08-17 16:37:54,127] ERROR Unknown error when running consumer: (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: test-group

能够看到此次出现了两个错误:第一个问题依然是没法获取元数据——这代表reader用户经过了认证但没有经过受权;第二个问题代表reader用户无权访问consumer group——这一样是受权的问题。咱们须要设置ACL来解决它们。首先,咱们为reader用户设置test topic的读权限:

bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:reader --operation Read --topic test

而后设置访问group的权限:

$ bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:reader --operation Read --group test-group

作完这些以后,咱们从新运行console consumer程序:

$ bin/reader-kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning --consumer.config consumer.config
hello
hello, kafka
message abc
.......

能够看到,此次reader用户成功地读取了writer用户生产的消息。这证实了writer和reader用户均可以正常地工做了。

 

最后总结一下,这种方案适用于用户数不多但又必须启用安全的Kafka集群,不过此方案比较麻烦的地方在于须要为每一个脚本都从新定制,加上-Djava.security.auth.login.config参数以识别JAAS文件,不如Kerberos来得简单。另外用户彻底能够根据官网的教程配置SSL,而后很轻易地把本文中的例子改为SASL_SSL。

相关文章
相关标签/搜索