本文主要介绍一下使用官方发布的 Kafka 0.10.0 版如何实现 SASL/PLAIN 认证机制以及权限控制。html
Kafka 的安全机制主要分为两部分:java
In release 0.9.0.0, the Kafka community added a number of features that, used either separately or together, increases security in a Kafka cluster.
These features are considered to be of beta quality. The following security measures are currently supported:apache
- Authentication of connections to brokers from clients (producers and consumers), other brokers and tools, using either SSL or SASL (Kerberos). SASL/PLAIN can also be used from release 0.10.0.0 onwards.
- Authentication of connections from brokers to ZooKeeper
- Encryption of data transferred between brokers and clients, between brokers, or between brokers and tools using SSL (Note that there is a performance degradation when SSL is enabled, the magnitude of which depends on the CPU type and the JVM implementation.)
- Authorization of read / write operations by clients
- Authorization is pluggable and integration with external authorization services is supported
这段话的中文意思也就是说json
Kafka 目前支持SSL、SASL/Kerberos、SASL/PLAIN三种认证机制,关于这些认证机制的介绍能够参考一下三篇文章。安全
能够参考kafka使用SASL验证,这个官方文档的中文版。bash
须要在 Kafka 安装目录下的config/server.properties文件中配置如下信息服务器
在 kafka 安装目录下的config/server.properties
配置一下信息app
listeners=SASL_PLAINTEXT://ip:pot security.inter.broker.protocol=SASL_PLAINTEXT sasl.mechanism.inter.broker.protocol=PLAIN sasl.enabled.mechanisms=PLAIN authorizer.class.name = kafka.security.auth.SimpleAclAuthorizer super.users=User:admin
还须要配置一个名 kafka_server_jaas.conf
的配置文件,将配置文件放置在conf
目录下。 ide
KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin" user_admin="admin" user_alice="alice"; };
这里,咱们配置了两个用户:admin 和 alice,密码分别为 admin 和 alice。
最后须要为 Kafka 添加 java.security.auth.login.config
环境变量。在 bin/kafka-run-class.sh
中添加如下内容性能
KAFKA_SASL_OPTS='-Djava.security.auth.login.config=/opt/meituan/kafka_2.10-0.10.0.0/config/kafka_server_jaas.conf' # Launch mode if [ "x$DAEMON_MODE" = "xtrue" ]; then nohup $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_SASL_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@" > "$CONSOLE_OUTPUT_FILE" 2>&1 < /dev/null & else exec $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_SASL_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@" fi
注:实际上,咱们只是添加了第一行,并在第4和第6行中添加了 $KAFKA_SASL_OPTS 这个环境变量。
KafkaClient 配置
首先须要在客户端配置 kafka_client_jaas.conf
文件
KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required username="alice" password="alice"; };
而后在(producer 和 consumer)程序中添加环境变量和配置,以下所示
System.setProperty("java.security.auth.login.config", ".../kafka_client_jaas.conf"); // 环境变量添加,须要输入配置文件的路径 props.put("security.protocol", "SASL_PLAINTEXT"); props.put("sasl.mechanism", "PLAIN");
配置完以上内容后,就能够正常运行 producer 和 consumer 程序,若是帐户密码错误的话,程序就不能正常进行,可是不会有任何提示,这方面后面会进行一些改进。
这个小节介绍一下 Kafka 的 ACL 。
权限 | 说明 |
---|---|
READ | 读取topic |
WRITE | 写入topic |
DELETE | 删除topic |
CREATE | 建立topic |
ALTER | 修改topic |
DESCRIBE | 获取topic的信息 |
ClusterAction | |
ALL | 全部权限 |
访问控制列表ACL存储在zk上,路径为/kafka-acl
。
Kafka 提供的命令以下表所示
Option | Description | Default | Option type |
---|---|---|---|
–add | Indicates to the script that user is trying to add an acl. | Action | |
–remove | Indicates to the script that user is trying to remove an acl. | Action | |
–list | Indicates to the script that user is trying to list acts. | Action | |
–authorizer | Fully qualified class name of the authorizer. | kafka.security.auth.SimpleAclAuthorizer | Configuration |
–authorizer-properties | key=val pairs that will be passed to authorizer for initialization. For the default authorizer the example values are: zookeeper.connect=localhost:2181 | Configuration | |
–cluster | Specifies cluster as resource. | Resource | |
–topic [topic-name] | Specifies the topic as resource. | Resource | |
–group [group-name] | Specifies the consumer-group as resource. | Resource | |
–allow-principal | Principal is in PrincipalType:name format that will be added to ACL with Allowpermission. You can specify multiple –allow-principal in a single command. | Principal | |
–deny-principal | Principal is in PrincipalType:name format that will be added to ACL with Denypermission. You can specify multiple –deny-principal in a single command. | Principal | |
–allow-host | IP address from which principals listed in –allow-principal will have access. | if –allow-principal is specified defaults to * which translates to “all hosts” | Host |
–deny-host | IP address from which principals listed in –deny-principal will be denied access. | if –deny-principal is specified defaults to * which translates to “all hosts” | Host |
–operation | Operation that will be allowed or denied. Valid values are : Read, Write, Create, Delete, Alter, Describe, ClusterAction, All | All | Operation |
–producer | Convenience option to add/remove acls for producer role. This will generate acls that allows WRITE, DESCRIBE on topic and CREATE on cluster. | Convenience | |
–consumer | Convenience option to add/remove acls for consumer role. This will generate acls that allows READ, DESCRIBE on topic and READ on consumer-group. | Convenience |
经过几个例子介绍一下如何进行权限设置。
# 为用户 alice 在 test(topic)上添加读写的权限 bin/kafka-acls.sh --authorizer-properties zookeeper.connect=data-rt-dev02:2181/kafka_test10 --add --allow-principal User:alice --operation Read --operation Write --topic test # 对于 topic 为 test 的消息队列,拒绝来自 ip 为198.51.100.3帐户为 BadBob 进行 read 操做,其余用户都容许 bin/kafka-acls.sh --authorizer-properties zookeeper.connect=data-rt-dev02:2181/kafka_test10 --add --allow-principal User:* --allow-host * --deny-principal User:BadBob --deny-host 198.51.100.3 --operation Read --topic test # 为bob 和 alice 添加all,以容许来自 ip 为198.51.100.0或者198.51.100.1的读写请求 bin/kafka-acls.sh --authorizer-properties zookeeper.connect=data-rt-dev02:2181/kafka_test10 --add --allow-principal User:bob --allow-principal User:alice --allow-host 198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write --topic test
# 列出 topic 为 test 的全部权限帐户 bin/kafka-acls.sh --authorizer-properties zookeeper.connect=data-rt-dev02:2181/kafka_test10 --list --topic test
输出信息为:
Current ACLs for resource `Topic:test`: User:alice has Allow permission for operations: Describe from hosts: * User:alice has Allow permission for operations: Read from hosts: * User:alice has Allow permission for operations: Write from hosts: *
# 移除 acl bin/kafka-acls.sh --authorizer-properties zookeeper.connect=data-rt-dev02:2181/kafka_test10 --remove --allow-principal User:Bob --allow-principal User:Alice --allow-host 198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write --topic test
# producer bin/kafka-acls.sh --authorizer-properties zookeeper.connect=data-rt-dev02:2181/kafka_test10 --add --allow-principal User:alice --producer --topic test #consumer bin/kafka-acls.sh --authorizer-properties zookeeper.connect=data-rt-dev02:2181/kafka_test10 --add --allow-principal User:alice --consumer --topic test —group test-group
本小节记录了在使用 SASL/PLAIN 时遇到的一些坑。
错误信息以下:
[2016-07-27 17:45:46,047] WARN [Controller-1-to-broker-1-send-thread], Controller 1's connection to broker XXXX:9092 (id: 1 rack: null) was unsuccessful (kafka.controller.RequestSendThread) java.io.IOException: Connection to XXXX:9092 (id: 1 rack: null) failed at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$extension$2.apply(NetworkClientBlockingOps.scala:63) at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$extension$2.apply(NetworkClientBlockingOps.scala:59) at kafka.utils.NetworkClientBlockingOps$.recursivePoll$1(NetworkClientBlockingOps.scala:112) at kafka.utils.NetworkClientBlockingOps$.kafka$utils$NetworkClientBlockingOps$$pollUntil$extension(NetworkClientBlockingOps.scala:120) at kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(NetworkClientBlockingOps.scala:59) at kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:232) at kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:181) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:180) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63) [2016-07-27 17:45:46,056] INFO [delete-topics-thread-1], Starting (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2016-07-27 17:45:46,057] DEBUG [Topic Deletion Manager 1], Waiting for signal to start or continue topic deletion (kafka.controller.TopicDeletionManager) [2016-07-27 17:45:46,351] WARN [Controller-1-to-broker-1-send-thread], Controller 1's connection to broker XXXX:9092 (id: 1 rack: null) was unsuccessful (kafka.controller.RequestSendThread) java.io.IOException: Connection to XXXX:9092 (id: 1 rack: null) failed at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$extension$2.apply(NetworkClientBlockingOps.scala:63) at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$extension$2.apply(NetworkClientBlockingOps.scala:59) at kafka.utils.NetworkClientBlockingOps$.recursivePoll$1(NetworkClientBlockingOps.scala:112) at kafka.utils.NetworkClientBlockingOps$.kafka$utils$NetworkClientBlockingOps$$pollUntil$extension(NetworkClientBlockingOps.scala:120) at kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(NetworkClientBlockingOps.scala:59) at kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:232) at kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:181) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:180) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
查找缘由查找了半天,以前觉得是kafka_server_jaas.conf
文件的格式有问题,改了以后发现 Kafka 有时启动正常,有时不能正常启动,修改以前 conf 文件为:
KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin" user_matt=“33" user_alice="alice"; };
最后分析多是由于没有在 user 中配置 admin 帐户,由于 broker 之间也开启了身份认证,修改以后的配置文件以下。
KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin" user_admin="admin" user_alice="alice"; };
修改完以后,Kafka 就能够正常运行了。