用户认证功能,是一个成熟组件不可或缺的功能。在0.9版本之前kafka是没有用户认证模块的(或者说只有SSL),好在kafka0.9版本之后逐渐发布了多种用户认证功能,弥补了这一缺陷(这里仅介绍SASL)。java
本篇会先介绍当前kafka的四种认证方式,而后过一遍部署SASL/PLAIN认证功能的流程。最后再介绍如如何使用kafka2.x新推出的callback api,对SASL/PLAIN功能进行二次开发。python
须要先明确的一点是,用户认证和权限控制是两码事。用户认证是确认这个用户可否访问当前的系统,而权限控制是控制用户对当前系统中各类资源的访问权限。用户认证就是今天要讲的内容,而kafka的权限控制,则是对应bin/kafka-acls.sh
工具所提供的一系列功能,这里不详细展开。apache
标题特意说明kafka2.x是由于kafka2.0的时候推出一种新的用户认证方式,SASL/OAUTHBEARER,在此前的版本是不存在这个东西的。那么加上这个以后,kafka目前共有4种常见的认证方式。api
其实除了上述四种用户认证功能以外,还有一个叫Delegation Token的东西。这个东西说一个轻量级的工具,是对现有SASL的一个补充,可以提升用户认证的性能(主要针对Kerberos的认证方式)。算是比较高级的用法,通常也用不到,因此也不会多介绍,有兴趣能够看这里Authentication using Delegation Tokens。安全
SASL/GSSAPI框架
若是已经有kerberos的环境,那么会比较适合使用这种方式,只须要让管理员分配好principal和对应的keytab,而后在配置中添加对应的选项就能够了。须要注意的是,通常采用这种方案的话,zookeeper也须要配置kerberos认证。jvm
SASL/PLAINide
这种方式其实就是一个用户名/密码的认证方式,不过它有不少缺陷,好比用户名密码是存储在文件中,不能动态添加,明文等等!这些特性决定了它比较鸡肋,但好处是足够简单,这使得咱们能够方便地对它进行二次开发。本篇文章后续会介绍SASL/PLAIN的部署方式和二次开发的例子(基于kafka2.x)。工具
SASL/SCRAM性能
针对PLAIN方式的不足而提供的另外一种认证方式。这种方式的用户名/密码是存储中zookeeper的,所以可以支持动态添加用户。该种认证方式还会使用sha256或sha512对密码加密,安全性相对会高一些。
并且配置起来和SASL/PLAIN差很少一样简单,添加用户/密码的命令官网也有提供,我的比较推荐使用这种方式。不过有些客户端是不支持这个方式认证登录的,好比python的kafka客户端,这点须要提早调研好。
具体的部署方法官网或网上有不少,这里很少介绍,贴下官网的Authentication using SASL/SCRAM。
SASL/OAUTHBEARER
SASL/OAUTHBEARER是基于OAUTH2.0的一个新的认证框架,这里先说下什么是OAUTH吧,引用维基百科。
OAuth是一个开放标准,容许用户让第三方应用访问该用户在某一网站上存储的私密的资源(如照片,视频,联系人列表),而无需将用户名和密码提供给第三方应用。而 OAUTH2.0算是OAUTH的一个增强版。
说白了,SASL/OAUTHBEARER就是一套让用户使用第三方认证工具认证的标准,一般是须要本身实现一些token认证和建立的接口,因此会比较繁琐。
详情能够经过这个kip了解KIP-255
说了这么多,接下来就说实战了,先介绍下如何配置SASL/PLAIN。
kafka_server_jaas.conf
这里简单介绍下SASL/PLAIN的部署方式,另外除了SASL/OAUTHBEARER,其余几种应该也是相似的部署方式,基本都是大同小异。
PS:本配置版本适用于kafka2.x,且无需配置zk认证
kafka的用户认证,是基于java的jaas。因此咱们须要先添加jaas服务端的配置文件。在kafka_home/config/kafka_server_jaas.conf
中添加如下配置信息:
KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-secret" user_admin="admin-secret" user_alice="alice-secret"; };
注意最后一个属性后面须要加封号!配置是不难理解的,第一行指定PlainLoginModule
,算是声明这是一个SASL/PLAIN的认证类型,若是是其余的,那么就须要reqired其余的类。username
和password
则是用于集群内部broker的认证用的。
这里会让人疑惑的,应该是user_admin
和user_alice
这两个属性了。这个实际上是用来定义用户名和密码的,形式是这样:user_userName=password。因此这里实际上是定义了用户admin和用户alice到密码。
这一点能够在源码的PlainServerCallbackHandler
类中找到对应的信息,kafka源码中显示,对用户认证的时候,就会到jaas配置文件中,经过user_username属性获取对应username用户的密码,再进行校验。固然这样也致使了该配置文件只有重启才会生效,即没法动态添加用户。
说回来,写完配置后,须要在kafka的配置中添加jaas文件的路径。在kafka_home/bin/kafka-run-class.sh
中,找到下面的配置,修改KAFKA_OPTS到配置信息。
# Generic jvm settings you want to add if [ -z "$KAFKA_OPTS" ]; then KAFKA_OPTS="" fi
将上述到KAFKA_OPTS修改成
KAFKA_OPTS="-Djava.security.auth.login.config=kafka_home/config/kafka_server_jaas.conf"
server.properties
而后修改kafka_home/config/server.properties
:
listeners=SASL_PLAINTEXT://host.name:port security.inter.broker.protocol=SASL_PLAINTEXT sasl.mechanism.inter.broker.protocol=PLAIN sasl.enabled.mechanisms=PLAIN
其中SASL_PLAINTEXT的意思,是明文传输的意思,若是是SSL,那么应该是SASL_SSL。
这样就算是配置好kafka broker了,接下来启动kafka,观察输出日志,没有错误通常就没问题了。
以producer为例,只须要在kafka_home/config/producer.properties
中添加jaas认证信息,以及用户名密码:
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="alice" \ password="alice-secret"; security.protocol=SASL_SSL sasl.mechanism=PLAIN
而后使用console producer验证:
bin/kafka-console-producer.sh --broker-list kafka:9092 --topic test --producer.config config/producer.properties
通常可以发送数据就说明部署完成了~
前面小节介绍了kafka sasl_plain的部署方式,但这种方式的诸多弊病决定了它并不适合用于生产环境。这里咱们先介绍kafka2的新认证接口,而后演示下如何使用新的api自定义。
这一api提出的背景,是由于最开始的api(即SaslServer
),不方便对用户认证进行拓展。这个问题在开发SASL/SCRAM功能的时候尤为突出。按官方的说法,要添加SASL/SCRAM功能,须要重写SaslServer
类。
因此官方重写了这方面的功能,使用回调的方式实现了这部分的功能模块。使得开发者能够方便得对用户认证模块进行拓展或修改。
而且新增长了四个自定义认证的配置,分别是:
这几个配置默认都是null,须要填写的内容是自定义的类的路径+名称。咱们此次只须要关注sasl服务端类的配置,即sasl.server.callback.handler.class
。
这部分的内容具体是在KIP-86。
先详细介绍下sasl.server.callback.handler.class
配置。这个配置在使用的时候,须要以小写方式指定SASL的类型。举个例子,若是是SASL_PLAINTEXT,那么就须要这样:
listener.name.sasl_plaintext.plain.sasl.server.callback.handler.class=com.example.CustomPlainCallbackHandler
即以listener.name.sasl_plaintext.plain.sasl开头。而后在kafka中,SASL_PLAINTEXT默认实现的callback handler是PlainServerCallbackHandler
,实现了AuthenticateCallbackHandler
接口。这个的逻辑其实还蛮简单的,咱们能够看看它重点的方法和代码。
public class PlainServerCallbackHandler implements AuthenticateCallbackHandler { private static final String JAAS_USER_PREFIX = "user_"; //jaas配置信息,初始化一次,这就是为何plain没法添加用户 private List<AppConfigurationEntry> jaasConfigEntries; @Override public void configure(Map<String, ?> configs, String mechanism, List<AppConfigurationEntry> jaasConfigEntries) { this.jaasConfigEntries = jaasConfigEntries; } //核心类,获取用户密码后,调用authenticate方法 @Override public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException { String username = null; for (Callback callback: callbacks) { if (callback instanceof NameCallback) username = ((NameCallback) callback).getDefaultName(); else if (callback instanceof PlainAuthenticateCallback) { PlainAuthenticateCallback plainCallback = (PlainAuthenticateCallback) callback; boolean authenticated = authenticate(username, plainCallback.password()); plainCallback.authenticated(authenticated); } else throw new UnsupportedCallbackException(callback); } } //用户密码是经过获取jaas文件的属性,属性名就是JAAS_USER_PREFIX变量当前缀+username protected boolean authenticate(String username, char[] password) throws IOException { if (username == null) return false; else { String expectedPassword = JaasContext.configEntryOption(jaasConfigEntries, JAAS_USER_PREFIX + username, PlainLoginModule.class.getName()); return expectedPassword != null && Arrays.equals(password, expectedPassword.toCharArray()); } } @Override public void close() throws KafkaException { } }
前面说plain方式不支持动态添加用户,user_username验证密码,看代码就一清二楚。既然知道这个后,那要自定义校验逻辑就很简单了。
只须要继承PlainServerCallbackHandler
这个类,而后重写authenticate
方法实现本身的逻辑就实现自定义了。
好比我想让用户名和密码相同的就验证经过,那么能够这样:
public class MyPlainServerCallbackHandler extends PlainServerCallbackHandler{ @Override protected boolean authenticate(String username, char[] password) throws IOException { if (username == null) return false; else { return expectedPassword != null && Arrays.equals(password, username.toCharArray()); } } }
而后中server.properpose中添加server callback信息,就能够了
listener.name.sasl_plaintext.plain.sasl.server.callback.handler.class=com.example.MyPlainServerCallbackHandler
对了,几得从新编译打包,替换掉kafka-client掉jar包,若是修改了一些全局信息(好比build.gradle引入新的依赖),那最好kafka全套jar包都换一下。
以上,若是以为有用,不妨点个赞吧~