目前Kafka ACL支持多种权限认证,今天笔者给你们介绍一下SCRAM和PLAIN的权限认证。验证环境以下:html
首先,在$KAFAK_HOME/config目录新建一个文本文件,名为kafka_server_plain_jaas.conf,配置内容以下:java
KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-secret" user_admin="admin-secret" user_ke="ke"; };
接着,将脚本文件kafka-server-start.sh重命名为kafka-server-plain-start.sh,并修改最后一行的内容为:apache
# 添加鉴权文件 exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=$base_dir/../config/kafka_server_plain_jaas.conf kafka.Kafka "$@"
而后,复制server.properties文件并重命名为plain.properties,接着修改服务端配置文件plain.properties,内容以下:oop
# Protocol listeners=SASL_PLAINTEXT://127.0.0.1:9092 security.inter.broker.protocol=SASL_PLAINTEXT sasl.mechanism.inter.broker.protocol=PLAIN sasl.enabled.mechanisms=PLAIN # ACL allow.everyone.if.no.acl.found=false super.users=User:admin authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
最后,建立客户端用户认证文件,kafka_client_plain_jaas.conf内容以下:学习
KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required username="ke" password="ke"; };
# Zookeeper的配置比较简单,这里很少作介绍 zkServer.sh start
# 进入到Kafka安装bin目录 ./kafka-server-plain-start.sh ../config/plain.properties &
./kafka-topics.sh --create --zookeeper 127.0.0.1:2181/plain --replication-factor 1 --partitions 3 --topic test_plain
# 添加读权限 ./kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=127.0.0.1:2181/plain --add --allow-principal User:ke --operation Read --topic test_plain # 添加写权限 ./kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=127.0.0.1:2181/plain --add --allow-principal User:ke --operation Write --topic test_plain # 添加消费者组权限 ./kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=127.0.0.1:2181/plain --add --allow-principal User:ke --operation Read --group g_plain_test # 查看权限列表 ./kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=127.0.0.1:2181/plain --list
PLAIN认证有个问题,就是不能动态新增用户,每次添加用户后,须要重启正在运行的Kafka集群才能生效。为此,在生产环境,这种认证方式不符合实际业务场景。而SCRAM不同,使用SCRAM认证,能够动态新增用户,添加用户后,能够不用重启正在运行的Kafka集群便可进行鉴权。大数据
新增kafka_server_scram_jaas.conf,配置内容以下:ui
KafkaServer { org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="admin-secret"; };
接着,将脚本文件kafka-server-start.sh重命名为kafka-server-scram-start.sh,并修改最后一行的内容为:spa
# 添加鉴权文件
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=$base_dir/../config/kafka_server_scram_jaas.conf kafka.Kafka "$@"
而后在$KAFKA_HOME/config目录中,复制server.properties文件并重命名为scram.properties,接着修改服务端配置文件scram.properties,内容以下:3d
# Protocol listeners=SASL_PLAINTEXT://dn1:9092 security.inter.broker.protocol=SASL_PLAINTEXT sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256 sasl.enabled.mechanisms=SCRAM-SHA-256 # ACL allow.everyone.if.no.acl.found=false super.users=User:admin authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
# Zookeeper的配置比较简单,这里很少作介绍 zkServer.sh start
# 添加管理员
./kafka-configs.sh --zookeeper 127.0.0.1:2181/scram --alter --add-config 'SCRAM-SHA-256=[password=admin-secret],SCRAM-SHA-512=[password=admin-secret]' --entity-type users --entity-name admin
# 添加普通用户(ke)
./kafka-configs.sh --zookeeper 127.0.0.1:2181/scram --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=ke],SCRAM-SHA-512=[password=ke]' --entity-type users --entity-name ke
./kafka-server-scram-start.sh ../config/scram.properties &
./kafka-topics.sh --create --zookeeper 127.0.0.1:2181/scram --replication-factor 1 --partitions 3 --topic test_scram
# 添加读权限
./kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=127.0.0.1:2181/scram --add --allow-principal User:ke --operation Read --topic test_scram
# 添加写权限 ./kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=127.0.0.1:2181/scram --add --allow-principal User:ke --operation Write --topic test_scram
# 添加消费者组权限 ./kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=127.0.0.1:2181/scram --add --allow-principal User:ke --operation Read --group g_scram_test
# 查看权限列表 ./kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=127.0.0.1:2181/scram --list
Kafka权限级别包含Topic、Group、Cluster、TransactionalId,每一个维度涉及的权限内容以下:code
Resource | Operations |
Topic | Read,Write,Describe,Delete,DescribeConfigs,AlterConfigs,All |
Group | Read,Describe,All |
Cluster | Create,ClusterAction,DescribeConfigs,AlterConfigs,IdempotentWrite,Alter,Describe,All |
TransactionalId | Describe,Write,All |
例如,统计Topic的Capacity大小时,若是抛出异常“Cluster authorization failed”,这是因为没有开启Cluster级别的Describe权限,执行以下命令便可:
./kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=127.0.0.1:2181/scram --add --allow-principal User:ke --operation Describe --cluster
那么如何使用Kafka Eagle来集成有SCRAM认证的Kafka集群,进行监控呢?访问http://www.kafka-eagle.org/,下载安装包,解压并配置以下内容:
# 这里对启动一个拥有SCRAM认证的Kafka集群(别名为cluster1)进行配置 cluster1.kafka.eagle.sasl.enable=true cluster1.kafka.eagle.sasl.protocol=SASL_PLAINTEXT cluster1.kafka.eagle.sasl.mechanism=SCRAM-SHA-256 cluster1.kafka.eagle.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="ke" password="ke"; # 这里ClientId若是不须要,能够不用配置 cluster1.kafka.eagle.sasl.client.id=
而后,执行ke.sh start进行启动Kafka Eagle监控系统。
执行以下SQL语句,代码以下:
select * from "test_scram" where "partition" in (0) limit 1
执行结果以下:
生产环境中,用户可能随着业务须要,增长或者删除,此时动态控制用户时颇有必要的。并且,生产环境Kafka集群不可能随随便便重启,所以采用SCRAM来进行Kafka鉴权很是适合。
这篇博客就和你们分享到这里,若是你们在研究学习的过程中有什么问题,能够加群进行讨论或发送邮件给我,我会尽我所能为您解答,与君共勉!
另外,博主出书了《Kafka并不难学》和《Hadoop大数据挖掘从入门到进阶实战》,喜欢的朋友或同窗, 能够在公告栏那里点击购买连接购买博主的书进行学习,在此感谢你们的支持。关注下面公众号,根据提示,可免费获取书籍的教学视频。