Pulsar自带的Authentication认证方式有不少种:TLS/Basic/JWT Token/Athenz/Sasl,可是均存在安全性或复杂性的一些问题,且有时候咱们须要和已有的帐户系统作集成,以保持一致的产品体验,此时须要自行开发认证插件。这里介绍一个使用签名机制加强安全性的认证插件开发方案。数据库
Pulsar认证提供了比较好的扩展机制,经过实现几个预约义的接口类,就能够方便的插入本身开发的认证明现。这些接口主要包括如下4个:apache
#客户端包装认证参数 org.apache.pulsar.client.api.Authentication org.apache.pulsar.client.api.AuthenticationDataProvider #服务端验证认证参数 org.apache.pulsar.broker.authentication.AuthenticationProvider org.apache.pulsar.broker.authentication.AuthenticationState
此外默认的多种认证方式的代码能够提供很是丰富的认证明现参考。api
假设咱们已经有了一个帐号系统,里面存储有帐号名(accessKey)、加盐hash过的密码(accessSecret)等认证须要的信息;咱们须要再开发一个专用的接口,供咱们实现的Pulsar认证插件调用。为避免认证接口被他人滥用,经过Header中的Auth参数作简单比对校验。安全
{ "uid":"123456", "userRole": "ur-123456", #命名规则能够自行以为定 }
生成参数ide
// check auth params survival time long currentTimeSeconds = System.currentTimeMillis() / 1000; long authTimeSeconds = Long.parseLong(authParams.getTimeStr()); if ((authTimeSeconds + this.akSignatureSurvivalSeconds) < currentTimeSeconds) { throw new AuthenticationException("auth out of survival time"); }
TokenAuthenticationState.isExpired 判断是否过时ui
服务端一般在Broker和Proxy的配置文件中配置this
# 配置Broker的认证插件和参数 authenticationEnabled=true authenticationProviders=net.tabalt.pulsar.broker.authentication.AuthenticationProviderTabaltAK tabaltAKServerUrl=http://127.0.0.1/ak #AK服务地址 tabaltAKServerAuth=test-auth #AK服务接口认证Token,经过header传递给认证服务 tabaltAKSignatureSurvivalSeconds=3600 #签名存活秒数 # 配置Broker做为客户端请求其余Broker的认证插件和参数,此处须要配置一个超级帐号 brokerClientAuthenticationPlugin=net.tabalt.pulsar.client.auth.AuthenticationTabaltAK brokerClientAuthenticationParameters={"accessKey":"test_access_key","accessSecret":"test_access_secret"}
AuthenticationTabaltAK authAK = new AuthenticationTabaltAK("test_access_key", "test_access_secret"); PulsarClient client = PulsarClient.builder() .serviceUrl("pulsar://127.0.0.1:6650") .authentication(authAK).build(); Producer<byte[]> producer = client.newProducer().topic("my-topic").create(); ...