有关RocketMQ ACL的使用请查看上一篇《RocketMQ ACL使用指南》,本文从源码的角度,分析一下RocketMQ ACL的实现原理。java
> 备注:RocketMQ在4.4.0时引入了ACL机制,本文代码基于RocketMQ4.5.0版本。算法
根据RocketMQ ACL使用手册,咱们应该首先看一下Broker服务器在开启ACL机制时如何加载配置文件,并如何工做的。apache
Broker端ACL的入口代码为:BrokerController#initialAcl数组
private void initialAcl() { if (!this.brokerConfig.isAclEnable()) { // [@1](https://my.oschina.net/u/1198) log.info("The broker dose not enable acl"); return; } List<accessvalidator> accessValidators = ServiceProvider.load(ServiceProvider.ACL_VALIDATOR_ID, AccessValidator.class); // @2 if (accessValidators == null || accessValidators.isEmpty()) { log.info("The broker dose not load the AccessValidator"); return; } for (AccessValidator accessValidator: accessValidators) { // [@3](https://my.oschina.net/u/2648711) final AccessValidator validator = accessValidator; this.registerServerRPCHook(new RPCHook() { [@Override](https://my.oschina.net/u/1162528) public void doBeforeRequest(String remoteAddr, RemotingCommand request) { //Do not catch the exception validator.validate(validator.parse(request, remoteAddr)); // @4 } @Override public void doAfterResponse(String remoteAddr, RemotingCommand request, RemotingCommand response) { } }); } }
本方法的实现共4个关键点。 代码@1:首先判断Broker是否开启了acl,经过配置参数aclEnable指定,默认为false。服务器
代码@2:使用相似SPI机制,加载配置的AccessValidator,该方法返回一个列表,其实现逻辑时读取META-INF/service/org.apache.rocketmq.acl.AccessValidator文件中配置的访问验证器,默认配置内容以下: session
代码@3:遍历配置的访问验证器(AccessValidator),并向Broker处理服务器注册钩子函数,RPCHook的doBeforeRequest方法会在服务端接收到请求,将其请求解码后,执行处理请求以前被调用;RPCHook的doAfterResponse方法会在处理完请求后,将结果返回以前被调用,其调用如图所示:ide
代码@4:在RPCHook#doBeforeRequest方法中调用AccessValidator#validate, 在真实处理命令以前,先执行ACL的验证逻辑,若是拥有该操做的执行权限,则放行,不然抛出AclException。函数
接下来,咱们将重点放到Broker默认实现的访问验证器:PlainAccessValidator。源码分析
接下来咱们重点看一下PlainAccessValidator的parse方法与validate方法的实现细节。在讲解该方法以前,咱们首先认识一下RocketMQ封装访问资源的PlainAccessResource。ui
咱们对其属性一一作个介绍:
public PlainAccessValidator() { aclPlugEngine = new PlainPermissionLoader(); }
构造函数,直接建立PlainPermissionLoader对象,从命名上来看,应该是触发acl规则的加载,即解析plain_acl.yml,接下来会重点探讨,即acl启动流程之配置文件的解析。
该方法的做用就是从请求命令中解析出本次访问所须要的访问权限,最终构建AccessResource对象,为后续的校验权限作准备。
PlainAccessResource accessResource = new PlainAccessResource(); if (remoteAddr != null && remoteAddr.contains(":")) { accessResource.setWhiteRemoteAddress(remoteAddr.split(":")[0]); } else { accessResource.setWhiteRemoteAddress(remoteAddr); }
Step1:首先建立PlainAccessResource,从远程地址中提取出远程访问IP地址。
if (request.getExtFields() == null) { throw new AclException("request's extFields value is null"); } accessResource.setRequestCode(request.getCode()); accessResource.setAccessKey(request.getExtFields().get(SessionCredentials.ACCESS_KEY)); accessResource.setSignature(request.getExtFields().get(SessionCredentials.SIGNATURE)); accessResource.setSecretToken(request.getExtFields().get(SessionCredentials.SECURITY_TOKEN));
Step2:若是请求头中的扩展字段为空,则抛出异常,若是不为空,则从请求头中读取requestCode、accessKey(请求用户名)、签名字符串(signature)、secretToken。
try { switch (request.getCode()) { case RequestCode.SEND_MESSAGE: accessResource.addResourceAndPerm(request.getExtFields().get("topic"), Permission.PUB); break; case RequestCode.SEND_MESSAGE_V2: accessResource.addResourceAndPerm(request.getExtFields().get("b"), Permission.PUB); break; case RequestCode.CONSUMER_SEND_MSG_BACK: accessResource.addResourceAndPerm(request.getExtFields().get("originTopic"), Permission.PUB); accessResource.addResourceAndPerm(getRetryTopic(request.getExtFields().get("group")), Permission.SUB); break; case RequestCode.PULL_MESSAGE: accessResource.addResourceAndPerm(request.getExtFields().get("topic"), Permission.SUB); accessResource.addResourceAndPerm(getRetryTopic(request.getExtFields().get("consumerGroup")), Permission.SUB); break; case RequestCode.QUERY_MESSAGE: accessResource.addResourceAndPerm(request.getExtFields().get("topic"), Permission.SUB); break; case RequestCode.HEART_BEAT: HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class); for (ConsumerData data : heartbeatData.getConsumerDataSet()) { accessResource.addResourceAndPerm(getRetryTopic(data.getGroupName()), Permission.SUB); for (SubscriptionData subscriptionData : data.getSubscriptionDataSet()) { accessResource.addResourceAndPerm(subscriptionData.getTopic(), Permission.SUB); } } break; case RequestCode.UNREGISTER_CLIENT: final UnregisterClientRequestHeader unregisterClientRequestHeader = (UnregisterClientRequestHeader) request .decodeCommandCustomHeader(UnregisterClientRequestHeader.class); accessResource.addResourceAndPerm(getRetryTopic(unregisterClientRequestHeader.getConsumerGroup()), Permission.SUB); break; case RequestCode.GET_CONSUMER_LIST_BY_GROUP: final GetConsumerListByGroupRequestHeader getConsumerListByGroupRequestHeader = (GetConsumerListByGroupRequestHeader) request .decodeCommandCustomHeader(GetConsumerListByGroupRequestHeader.class); accessResource.addResourceAndPerm(getRetryTopic(getConsumerListByGroupRequestHeader.getConsumerGroup()), Permission.SUB); break; case RequestCode.UPDATE_CONSUMER_OFFSET: final UpdateConsumerOffsetRequestHeader updateConsumerOffsetRequestHeader = (UpdateConsumerOffsetRequestHeader) request .decodeCommandCustomHeader(UpdateConsumerOffsetRequestHeader.class); accessResource.addResourceAndPerm(getRetryTopic(updateConsumerOffsetRequestHeader.getConsumerGroup()), Permission.SUB); accessResource.addResourceAndPerm(updateConsumerOffsetRequestHeader.getTopic(), Permission.SUB); break; default: break; } } catch (Throwable t) { throw new AclException(t.getMessage(), t); }
Step3:根据请求命令,设置本次请求须要拥有的权限,上述代码比较简单,就是从请求中得出本次操做的Topic、消息组名称,为了方便区分topic与消费组,消费组使用消费者对应的重试主题,当成资源的Key,从这里也能够看出,当前版本须要进行ACL权限验证的请求命令以下:
// Content SortedMap<string, string> map = new TreeMap<string, string>(); for (Map.Entry<string, string> entry : request.getExtFields().entrySet()) { if (!SessionCredentials.SIGNATURE.equals(entry.getKey())) { map.put(entry.getKey(), entry.getValue()); } } accessResource.setContent(AclUtils.combineRequestContent(request, map)); return accessResource;
Step4:对扩展字段进行排序,便于生成签名字符串,而后将扩展字段与请求体(body)写入content字段。完成从请求头中解析出本次请求须要验证的权限。
public void validate(AccessResource accessResource) { aclPlugEngine.validate((PlainAccessResource) accessResource); }
验证权限,即根据本次请求须要的权限与当前用户所拥有的权限进行对比,若是符合,则正常执行;不然抛出AclException。
为了揭开配置文件的解析与验证,咱们将目光投入到PlainPermissionLoader。
该类的主要职责:加载权限,即解析acl主要配置文件plain_acl.yml。
下面对其核心属性与核心方法一一介绍:
public PlainPermissionLoader() { load(); watch(); }
在构造方法中调用load与watch方法。
Map<string, plainaccessresource> plainAccessResourceMap = new HashMap<>(); List<remoteaddressstrategy> globalWhiteRemoteAddressStrategy = new ArrayList<>(); String path = fileHome + File.separator + fileName; JSONObject plainAclConfData = AclUtils.getYamlDataObject(path,JSONObject.class);
Step1:初始化plainAccessResourceMap(用户配置的访问资源,即权限容器)、globalWhiteRemoteAddressStrategy:全局IP白名单访问策略。配置文件,默认为${ROCKETMQ_HOME}/conf/plain_acl.yml。
JSONArray globalWhiteRemoteAddressesList = plainAclConfData.getJSONArray("globalWhiteRemoteAddresses"); if (globalWhiteRemoteAddressesList != null && !globalWhiteRemoteAddressesList.isEmpty()) { for (int i = 0; i < globalWhiteRemoteAddressesList.size(); i++) { globalWhiteRemoteAddressStrategy.add(remoteAddressStrategyFactory. getRemoteAddressStrategy(globalWhiteRemoteAddressesList.getString(i))); } }
Step2:globalWhiteRemoteAddresses:全局白名单,类型为数组。根据配置的规则,使用remoteAddressStrategyFactory获取一个访问策略,下文会重点介绍其配置规则。
JSONArray accounts = plainAclConfData.getJSONArray("accounts"); if (accounts != null && !accounts.isEmpty()) { List<plainaccessconfig> plainAccessConfigList = accounts.toJavaList(PlainAccessConfig.class); for (PlainAccessConfig plainAccessConfig : plainAccessConfigList) { PlainAccessResource plainAccessResource = buildPlainAccessResource(plainAccessConfig); plainAccessResourceMap.put(plainAccessResource.getAccessKey(),plainAccessResource); } } this.globalWhiteRemoteAddressStrategy = globalWhiteRemoteAddressStrategy; this.plainAccessResourceMap = plainAccessResourceMap;
Step3:解析plain_acl.yml文件中的另一个根元素accounts,用户定义的权限信息。从PlainAccessConfig的定义来看,accounts标签下支持以下标签:
load方法主要完成acl配置文件的解析,将用户定义的权限加载到内存中。
private void watch() { try { String watchFilePath = fileHome + fileName; FileWatchService fileWatchService = new FileWatchService(new String[] {watchFilePath}, new FileWatchService.Listener() { @Override public void onChanged(String path) { log.info("The plain acl yml changed, reload the context"); load(); } }); fileWatchService.start(); log.info("Succeed to start AclWatcherService"); this.isWatchStart = true; } catch (Exception e) { log.error("Failed to start AclWatcherService", e); } }
监听器,默认以500ms的频率判断文件的内容是否变化。在文件内容发生变化后调用load()方法,从新加载配置文件。那FileWatchService是如何判断两个文件的内容发生了变化呢?
FileWatchService#hash private String hash(String filePath) throws IOException, NoSuchAlgorithmException { Path path = Paths.get(filePath); md.update(Files.readAllBytes(path)); byte[] hash = md.digest(); return UtilAll.bytes2string(hash); }
获取文件md5签名来作对比,这里为何不在启动时先记录上一次文件的修改时间,而后先判断其修改时间是否变化,再判断其内容是否真正发生变化。
// Check the global white remote addr for (RemoteAddressStrategy remoteAddressStrategy : globalWhiteRemoteAddressStrategy) { if (remoteAddressStrategy.match(plainAccessResource)) { return; } }
Step1:首先使用全局白名单对资源进行验证,只要一个规则匹配,则返回,表示认证成功。
if (plainAccessResource.getAccessKey() == null) { throw new AclException(String.format("No accessKey is configured")); } if (!plainAccessResourceMap.containsKey(plainAccessResource.getAccessKey())) { throw new AclException(String.format("No acl config for %s", plainAccessResource.getAccessKey())); } Step2:若是请求信息中,没有设置用户名,则抛出未配置AccessKey异常;若是Broker中并为配置该用户的配置信息,则抛出AclException。 // Check the white addr for accesskey PlainAccessResource ownedAccess = plainAccessResourceMap.get(plainAccessResource.getAccessKey()); if (ownedAccess.getRemoteAddressStrategy().match(plainAccessResource)) { return; }
Step3:若是用户配置的白名单与待访问资源规则匹配的话,则直接发认证经过。
// Check the signature String signature = AclUtils.calSignature(plainAccessResource.getContent(), ownedAccess.getSecretKey()); if (!signature.equals(plainAccessResource.getSignature())) { throw new AclException(String.format("Check signature failed for accessKey=%s", plainAccessResource.getAccessKey())); }
Step4:验证签名。
checkPerm(plainAccessResource, ownedAccess);
Step5:调用checkPerm方法,验证须要的权限与拥有的权限是否匹配。
if (Permission.needAdminPerm(needCheckedAccess.getRequestCode()) && !ownedAccess.isAdmin()) { throw new AclException(String.format("Need admin permission for request code=%d, but accessKey=%s is not", needCheckedAccess.getRequestCode(), ownedAccess.getAccessKey())); }
Step6:若是当前的请求命令属于必须是Admin用户才能访问的权限,而且当前用户并非管理员角色,则抛出异常,以下命令须要admin角色才能进行的操做:
Map<string, byte> needCheckedPermMap = needCheckedAccess.getResourcePermMap(); Map<string, byte> ownedPermMap = ownedAccess.getResourcePermMap(); if (needCheckedPermMap == null) { // If the needCheckedPermMap is null,then return return; } if (ownedPermMap == null && ownedAccess.isAdmin()) { // If the ownedPermMap is null and it is an admin user, then return return; }
Step7:若是该请求不须要进行权限验证,则经过认证,若是当前用户的角色是管理员,而且没有配置用户权限,则认证经过,返回。
for (Map.Entry<string, byte> needCheckedEntry : needCheckedPermMap.entrySet()) { String resource = needCheckedEntry.getKey(); Byte neededPerm = needCheckedEntry.getValue(); boolean isGroup = PlainAccessResource.isRetryTopic(resource); if (ownedPermMap == null || !ownedPermMap.containsKey(resource)) { // Check the default perm byte ownedPerm = isGroup ? ownedAccess.getDefaultGroupPerm() : ownedAccess.getDefaultTopicPerm(); if (!Permission.checkPermission(neededPerm, ownedPerm)) { throw new AclException(String.format("No default permission for %s", PlainAccessResource.printStr(resource, isGroup))); } continue; } if (!Permission.checkPermission(neededPerm, ownedPermMap.get(resource))) { throw new AclException(String.format("No default permission for %s", PlainAccessResource.printStr(resource, isGroup))); } }
Step8:遍历须要权限与拥有的权限进行对比,若是配置对应的权限,则判断是否匹配;若是未配置权限,则判断默认权限时是否容许,不容许,则抛出AclException。
验证逻辑就介绍到这里了,下面给出其匹配流程图:
上述阐述了从Broker服务器启动、加载acl配置文件流程、动态监听配置文件、服务端权限验证流程,接下来咱们看一下客户端关于ACL须要处理的事情。
回顾一下,咱们引入ACL机制后,客户端的代码示例以下:
其在建立DefaultMQProducer时,注册AclClientRPCHook钩子,会在向服务端发送远程命令先后执行其钩子函数,接下来咱们重点分析一下AclClientRPCHook。
public void doBeforeRequest(String remoteAddr, RemotingCommand request) { byte[] total = AclUtils.combineRequestContent(request, parseRequestContent(request, sessionCredentials.getAccessKey(), sessionCredentials.getSecurityToken())); // @1 String signature = AclUtils.calSignature(total, sessionCredentials.getSecretKey()); // @2 request.addExtField(SIGNATURE, signature); // @3 request.addExtField(ACCESS_KEY, sessionCredentials.getAccessKey()); // The SecurityToken value is unneccessary,user can choose this one. if (sessionCredentials.getSecurityToken() != null) { request.addExtField(SECURITY_TOKEN, sessionCredentials.getSecurityToken()); } }
代码@1:将Request请求参数进行排序,并加入accessKey。
代码@2:对排好序的请参数,使用用户配置的密码生成签名,并最近到扩展字段Signature,而后服务端也会按照相同的算法生成Signature,若是相同,则表示签名验证成功(相似于实现登陆的效果)。
代码@3:将Signature、AccessKey等加入到请求头的扩展字段中,服务端拿到这些元数据,结合请求头中的信息,根据配置的权限,进行权限校验。
关于ACL客户端生成签名是一种通用套路,就不在细讲了。
源码分析ACL的实现就介绍到这里了,下文将介绍RocketMQ 消息轨迹的使用与实现原理分析。若是你们以为文章写的还不错的话,期待帮忙点赞,谢谢。
> 做者简介:《RocketMQ技术内幕》做者,RocketMQ 社区布道师,维护公众号:中间件兴趣圈,可扫描以下二维码与做者进行互动。
</string,></string,></string,></plainaccessconfig></remoteaddressstrategy></string,></string,></string,></string,></string,></string,></accessvalidator>