5-Kafka 0.10.0 SASL/PLAIN身份认证及权限实现

本文主要介绍一下使用官方发布的 Kafka 0.10.0 版如何实现 SASL/PLAIN 认证机制以及权限控制。html

Kafka 安全机制

Kafka 的安全机制主要分为两部分:java

  • 身份认证(Authentication):对client 与服务器的链接进行身份认证。
  • 权限控制(Authorization):实现对于消息级别的权限控制

In release 0.9.0.0, the Kafka community added a number of features that, used either separately or together, increases security in a Kafka cluster.
These features are considered to be of beta quality. The following security measures are currently supported:apache

  1. Authentication of connections to brokers from clients (producers and consumers), other brokers and tools, using either SSL or SASL (Kerberos). SASL/PLAIN can also be used from release 0.10.0.0 onwards.
  2. Authentication of connections from brokers to ZooKeeper
  3. Encryption of data transferred between brokers and clients, between brokers, or between brokers and tools using SSL (Note that there is a performance degradation when SSL is enabled, the magnitude of which depends on the CPU type and the JVM implementation.)
  4. Authorization of read / write operations by clients
  5. Authorization is pluggable and integration with external authorization services is supported

这段话的中文意思也就是说json

  1. 可使用 SSL 或者 SASL 进行客户端(producer 和 consumer)、其余 brokers、tools与 brokers 之间链接的认证,SASL/PLAIN将在0.10.0中获得支持;
  2. 对brokers和zookeeper之间的链接进行Authentication;
  3. 数据传输用SSL加密,性能会降低;
  4. 对clients的读写操做进行Authorization;
  5. Authorization 是pluggable,与外部的authorization services结合进行支持。

Kafka身份认证

Kafka 目前支持SSL、SASL/Kerberos、SASL/PLAIN三种认证机制,关于这些认证机制的介绍能够参考一下三篇文章。安全

SASL/PLAIN 认证

能够参考kafka使用SASL验证,这个官方文档的中文版。bash

Kafka Server 端配置

须要在 Kafka 安装目录下的config/server.properties文件中配置如下信息服务器

Kafka Server 端配置

在 kafka 安装目录下的config/server.properties配置一下信息app

listeners=SASL_PLAINTEXT://ip:pot
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
authorizer.class.name = kafka.security.auth.SimpleAclAuthorizer
super.users=User:admin

还须要配置一个名 kafka_server_jaas.conf 的配置文件,将配置文件放置在conf目录下。 ide

KafkaServer {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="admin"
    user_admin="admin"
    user_alice="alice";
};

这里,咱们配置了两个用户:admin 和 alice,密码分别为 admin 和 alice。
最后须要为 Kafka 添加 java.security.auth.login.config 环境变量。在 bin/kafka-run-class.sh中添加如下内容性能

KAFKA_SASL_OPTS='-Djava.security.auth.login.config=/opt/meituan/kafka_2.10-0.10.0.0/config/kafka_server_jaas.conf'
# Launch mode
if [ "x$DAEMON_MODE" = "xtrue" ]; then
  nohup $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_SASL_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@" > "$CONSOLE_OUTPUT_FILE" 2>&1 < /dev/null &
else
  exec $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_SASL_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@"
fi

注:实际上,咱们只是添加了第一行,并在第4和第6行中添加了 $KAFKA_SASL_OPTS 这个环境变量。

KafkaClient 配置

首先须要在客户端配置 kafka_client_jaas.conf 文件

KafkaClient {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="alice"
  password="alice";
};

而后在(producer 和 consumer)程序中添加环境变量和配置,以下所示

System.setProperty("java.security.auth.login.config", ".../kafka_client_jaas.conf"); // 环境变量添加,须要输入配置文件的路径
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "PLAIN");

配置完以上内容后,就能够正常运行 producer 和 consumer 程序,若是帐户密码错误的话,程序就不能正常进行,可是不会有任何提示,这方面后面会进行一些改进。

Kafka权限控制

这个小节介绍一下 Kafka 的 ACL 。

权限的内容

权限 说明
READ 读取topic
WRITE 写入topic
DELETE 删除topic
CREATE 建立topic
ALTER 修改topic
DESCRIBE 获取topic的信息
ClusterAction  
ALL 全部权限

访问控制列表ACL存储在zk上,路径为/kafka-acl

权限配置

Kafka 提供的命令以下表所示

Option Description Default Option type
–add Indicates to the script that user is trying to add an acl.   Action
–remove Indicates to the script that user is trying to remove an acl.   Action
–list Indicates to the script that user is trying to list acts.   Action
–authorizer Fully qualified class name of the authorizer. kafka.security.auth.SimpleAclAuthorizer Configuration
–authorizer-properties key=val pairs that will be passed to authorizer for initialization. For the default authorizer the example values are: zookeeper.connect=localhost:2181   Configuration
–cluster Specifies cluster as resource.   Resource
–topic [topic-name] Specifies the topic as resource.   Resource
–group [group-name] Specifies the consumer-group as resource.   Resource
–allow-principal Principal is in PrincipalType:name format that will be added to ACL with Allowpermission. You can specify multiple –allow-principal in a single command.   Principal
–deny-principal Principal is in PrincipalType:name format that will be added to ACL with Denypermission. You can specify multiple –deny-principal in a single command.   Principal
–allow-host IP address from which principals listed in –allow-principal will have access. if –allow-principal is specified defaults to * which translates to “all hosts” Host
–deny-host IP address from which principals listed in –deny-principal will be denied access. if –deny-principal is specified defaults to * which translates to “all hosts” Host
–operation Operation that will be allowed or denied. Valid values are : Read, Write, Create, Delete, Alter, Describe, ClusterAction, All All Operation
–producer Convenience option to add/remove acls for producer role. This will generate acls that allows WRITE, DESCRIBE on topic and CREATE on cluster.   Convenience
–consumer Convenience option to add/remove acls for consumer role. This will generate acls that allows READ, DESCRIBE on topic and READ on consumer-group.   Convenience

权限设置

经过几个例子介绍一下如何进行权限设置。

add 操做

# 为用户 alice 在 test(topic)上添加读写的权限
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=data-rt-dev02:2181/kafka_test10 --add --allow-principal User:alice --operation Read --operation Write --topic test
# 对于 topic 为 test 的消息队列,拒绝来自 ip 为198.51.100.3帐户为 BadBob  进行 read 操做,其余用户都容许
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=data-rt-dev02:2181/kafka_test10 --add --allow-principal User:* --allow-host * --deny-principal User:BadBob --deny-host 198.51.100.3 --operation Read --topic test
# 为bob 和 alice 添加all,以容许来自 ip 为198.51.100.0或者198.51.100.1的读写请求
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=data-rt-dev02:2181/kafka_test10 --add --allow-principal User:bob --allow-principal User:alice --allow-host 198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write --topic test

list 操做

# 列出 topic 为 test 的全部权限帐户
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=data-rt-dev02:2181/kafka_test10 --list --topic test

输出信息为:

Current ACLs for resource `Topic:test`:
    User:alice has Allow permission for operations: Describe from hosts: *
    User:alice has Allow permission for operations: Read from hosts: *
    User:alice has Allow permission for operations: Write from hosts: *

remove 操做

# 移除 acl
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=data-rt-dev02:2181/kafka_test10 --remove --allow-principal User:Bob --allow-principal User:Alice --allow-host 198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write --topic test

producer 和 consumer 的操做

# producer
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=data-rt-dev02:2181/kafka_test10 --add --allow-principal User:alice --producer --topic test
#consumer
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=data-rt-dev02:2181/kafka_test10 --add --allow-principal User:alice --consumer --topic test —group test-group

本小节记录了在使用 SASL/PLAIN 时遇到的一些坑。

Controller链接broker失败

错误信息以下:

[2016-07-27 17:45:46,047] WARN [Controller-1-to-broker-1-send-thread], Controller 1's connection to broker XXXX:9092 (id: 1 rack: null) was unsuccessful (kafka.controller.RequestSendThread)
java.io.IOException: Connection to XXXX:9092 (id: 1 rack: null) failed
    at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$extension$2.apply(NetworkClientBlockingOps.scala:63)
    at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$extension$2.apply(NetworkClientBlockingOps.scala:59)
    at kafka.utils.NetworkClientBlockingOps$.recursivePoll$1(NetworkClientBlockingOps.scala:112)
    at kafka.utils.NetworkClientBlockingOps$.kafka$utils$NetworkClientBlockingOps$$pollUntil$extension(NetworkClientBlockingOps.scala:120)
    at kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(NetworkClientBlockingOps.scala:59)
    at kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:232)
    at kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:181)
    at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:180)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
[2016-07-27 17:45:46,056] INFO [delete-topics-thread-1], Starting  (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
[2016-07-27 17:45:46,057] DEBUG [Topic Deletion Manager 1], Waiting for signal to start or continue topic deletion (kafka.controller.TopicDeletionManager)
[2016-07-27 17:45:46,351] WARN [Controller-1-to-broker-1-send-thread], Controller 1's connection to broker XXXX:9092 (id: 1 rack: null) was unsuccessful (kafka.controller.RequestSendThread)
java.io.IOException: Connection to XXXX:9092 (id: 1 rack: null) failed
    at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$extension$2.apply(NetworkClientBlockingOps.scala:63)
    at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$extension$2.apply(NetworkClientBlockingOps.scala:59)
    at kafka.utils.NetworkClientBlockingOps$.recursivePoll$1(NetworkClientBlockingOps.scala:112)
    at kafka.utils.NetworkClientBlockingOps$.kafka$utils$NetworkClientBlockingOps$$pollUntil$extension(NetworkClientBlockingOps.scala:120)
    at kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(NetworkClientBlockingOps.scala:59)
    at kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:232)
    at kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:181)
    at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:180)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)

查找缘由查找了半天,以前觉得是kafka_server_jaas.conf文件的格式有问题,改了以后发现 Kafka 有时启动正常,有时不能正常启动,修改以前 conf 文件为:

KafkaServer {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="admin"
    user_matt=“33"
    user_alice="alice";
};

最后分析多是由于没有在 user 中配置 admin 帐户,由于 broker 之间也开启了身份认证,修改以后的配置文件以下。

KafkaServer {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="admin"
    user_admin="admin"
    user_alice="alice";
};

修改完以后,Kafka 就能够正常运行了。

相关文章
相关标签/搜索