实战Kafka ACL机制

1.概述   在Kafka0.9版本以前,Kafka集群时没有安全机制的。Kafka Client应用能够经过链接Zookeeper地址,例如zk1:2181:zk2:2181,zk3:2181等。来获取存储在Zookeeper中的Kafka元数据信息。拿到Kafka Broker地址后,链接到Kafka集群,就能够操做集群上的全部主题了。因为没有权限控制,集群核心的业务主题时存在风险的。java

2.内容 2.2 身份认证   Kafka的认证范围包含以下:程序员

Client与Broker之间sql

Broker与Broker之间apache

Broker与Zookeeper之间安全

  当前Kafka系统支持多种认证机制,如SSL、SASL(Kerberos、PLAIN、SCRAM)。bash

2.3 SSL认证流程   在Kafka系统中,SSL协议做为认证机制默认是禁止的,若是须要使用,能够手动启动SSL机制。安装和配置SSL协议的步骤,以下所示:架构

在每一个Broker中Create一个Tmp密钥库并发

建立CA分布式

给证书签名高并发

配置Server和Client

  执行脚本以下所示:

1#! /bin/bash 2# 1.Create rsa 3keytool -keystore server.keystore.jks -alias dn1 -validity 365 -genkey -keyalg RSA 4# 2.Create CA 5openssl req -new -x509 -keyout ca-key -out ca-cert -days 365 6# 3.Import client 7keytool -keystore client.truststore.jks -alias CAROOT -import -file ca-cert 8# 4.Import server 9keytool -keystore server.truststore.jks -alias CAROOT -import -file ca-cert 10# 5.Export 11keytool -keystore server.keystore.jks -alias dn1 -certreq -file cert-file 12# 6.Signed 13openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 365 -CAcreateserial -passin pass:123456 14# 7.Import ca-cert 15keytool -keystore server.keystore.jks -alias CARoot -import -file ca-cert 16# 8.Import cert-signed 17keytool -keystore server.keystore.jks -alias dn1 -import -file cert-signed 2.4 SASL认证流程   在Kafka系统中,SASL机制包含三种,它们分别是Kerberos、PLAIN、SCRAM。以PLAIN认证为示例,下面给你们介绍PLAIN认证流程。

2.4.1 配置Server   首先,在$KAFKA_HOME/config目录中新建一个文件,名为kafka_server_jaas.conf,配置内容以下:

1KafkaServer { 2 org.apache.kafka.common.security.plain.PlainLoginModule required 3 username="smartloli" 4 password="smartloli-secret" 5 user_admin="smartloli-secret"; 6}; 7 8Client { 9 org.apache.kafka.common.security.plain.PlainLoginModule required 10 username="smartloli" 11 password="smartloli-secret"; 12};   而后在Kafka启动脚本(kafka-server-start.sh)中添加配置文件路径,设置内容以下:

1[hadoop@dn1 bin]$ vi kafka-server-start.sh 2 3# Add jaas file 4export KAFKA_OPTS="-Djava.security.auth.login.config=/data/soft/new/kafka/config/kafka_server_jaas.conf" 接下来,配置server.properties文件,内容以下:

1# Set ip & port 2listeners=SASL_PLAINTEXT://dn1:9092 3advertised.listeners=SASL_PLAINTEXT://dn1:9092 4# Set protocol 5security.inter.broker.protocol=SASL_PLAINTEXT 6sasl.enabled.mechanisms=PLAIN 7sasl.mechanism.inter.broker.protocol=PLAIN 8 9# Add acl 10allow.everyone.if.no.acl.found=true 11auto.create.topics.enable=false 12delete.topic.enable=true 13advertised.host.name=dn1 14super.users=User:admin 15 16# Add class 17authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer 2.4.2 配置Client   当Kafka Server端配置启用了SASL/PLAIN,那么Client链接的时候须要配置认证信息,Client配置一个kafka_client_jaas.conf文件,内容以下:

1KafkaClient { 2 org.apache.kafka.common.security.plain.PlainLoginModule required 3 username="admin" 4 password="admin-secret"; 5};  而后,在producer.properties和consumer.properties文件中设置认证协议,内容以下:

1security.protocol=SASL_PLAINTEXT 2sasl.mechanism=PLAIN 最后,在kafka-console-producer.sh脚本和kafka-console-producer.sh脚本中添加JAAS文件的路径,内容以下:

1# For example: kafka-console-producer.sh 2hadoop@dn1 bin]$ vi kafka-console-producer.sh 3 4# Add jaas file 5export KAFKA_OPTS="-Djava.security.auth.login.config=/data/soft/new/kafka
6/config/kafka_client_jaas.conf"

2.5 ACL操做   在配置好SASL后,启动Zookeeper集群和Kafka集群以后,就可使用kafka-acls.sh脚原本操做ACL机制。

  (1)查看:在kafka-acls.sh脚本中传入list参数来查看ACL受权新

1[hadoop@dn1 bin]$ kafka-acls.sh --list --authorizer-properties zookeeper.connect=dn1:2181

(2)建立:建立待受权主题以前,在kafka-acls.sh脚本中指定JAAS文件路径,而后在执行建立操做

1[hadoop@dn1 bin]$ kafka-topics.sh --create --zookeeper dn1:2181 --replication-factor 1 --partitions 1 --topic kafka_acl_topic (3)生产者受权:对生产者执行受权操做

1[hadoop@dn1 ~]$ kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=dn1:2181 --add --allow-principalUser:producer --operation Write --topic kafka_acl_topic (4)消费者受权:对生产者执行受权后,经过消费者来进行验证

1[hadoop@dn1 ~]$ kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=dn1:2181 --add --allow-principalUser:consumer --operation Read --topic kafka_acl_topic (5)删除:经过remove参数来回收相关权限

1[hadoop@dn1 bin]$ kafka-acls.sh --authorizer-properties zookeeper.connect=dn1:2181 --remove --allow-principal User:producer --operation Write --topic kafka_acl_topic3 3.总结   在处理一些核心的业务数据时,Kafka的ACL机制仍是很是重要的,对核心业务主题进行权限管控,可以避免没必要要的风险。

欢迎工做一到五年的Java工程师朋友们加入Java程序员开发: 854393687 群内提供免费的Java架构学习资料(里面有高可用、高并发、高性能及分布式、Jvm性能调优、Spring源码,MyBatis,Netty,Redis,Kafka,Mysql,Zookeeper,Tomcat,Docker,Dubbo,Nginx等多个知识点的架构资料)合理利用本身每一分每一秒的时间来学习提高本身,不要再用"没有时间“来掩饰本身思想上的懒惰!趁年轻,使劲拼,给将来的本身一个交代!

相关文章
相关标签/搜索