Acl表示对一个资源的访问权限。它由Resource和Acl组成。java
Resource表示一个具体的资源。node
Acl表示权限,由主体principal,是否容许permissionType,主机host,操做operation组成。json
// ResourceType表示资源类型,name则表示资源标识符 case class Resource(resourceType: ResourceType, name: String) { override def toString: String = { resourceType.name + Resource.Separator + name } }
以一个名为test的Topic为例,用Resource表示这个资源session
new Resource(ResourceType.Topic, "test")
object ResourceType { def fromString(resourceType: String): ResourceType = { // 从values找到name相等的type val rType = values.find(rType => rType.name.equalsIgnoreCase(resourceType)) rType.getOrElse(throw new KafkaException(resourceType + " not a valid resourceType name. The valid names are " + values.mkString(","))) } // 取值序列 def values: Seq[ResourceType] = List(Topic, Group, Cluster, TransactionalId) def fromJava(operation: JResourceType): ResourceType = fromString(operation.toString.replaceAll("_", "")) }
ResourceType只有四种内置的类型数据结构
case object Topic extends ResourceType { val name = "Topic" val error = Errors.TOPIC_AUTHORIZATION_FAILED val toJava = JResourceType.TOPIC } case object Group extends ResourceType { val name = "Group" val error = Errors.GROUP_AUTHORIZATION_FAILED val toJava = JResourceType.GROUP } case object Cluster extends ResourceType { val name = "Cluster" val error = Errors.CLUSTER_AUTHORIZATION_FAILED val toJava = JResourceType.CLUSTER } case object TransactionalId extends ResourceType { val name = "TransactionalId" val error = Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED val toJava = JResourceType.TRANSACTIONAL_ID }
object Operation { def fromString(operation: String): Operation = { // 从values找到name相等的值 val op = values.find(op => op.name.equalsIgnoreCase(operation)) op.getOrElse(throw new KafkaException(operation + " not a valid operation name. The valid names are " + values.mkString(","))) } def fromJava(operation: AclOperation): Operation = fromString(operation.toString.replaceAll("_", "")) // 取值集合 def values: Seq[Operation] = List(Read, Write, Create, Delete, Alter, Describe, ClusterAction, AlterConfigs, DescribeConfigs, IdempotentWrite, All) }
Opearation只有下面几种内置的类型ide
// 读操做 case object Read extends Operation { val name = "Read" val toJava = AclOperation.READ } // 写操做 case object Write extends Operation { val name = "Write" val toJava = AclOperation.WRITE } // 新建操做 case object Create extends Operation { val name = "Create" val toJava = AclOperation.CREATE } // 删除操做 case object Delete extends Operation { val name = "Delete" val toJava = AclOperation.DELETE } // 修改操做 case object Alter extends Operation { val name = "Alter" val toJava = AclOperation.ALTER } // 描述操做 case object Describe extends Operation { val name = "Describe" val toJava = AclOperation.DESCRIBE } // 集群操做 case object ClusterAction extends Operation { val name = "ClusterAction" val toJava = AclOperation.CLUSTER_ACTION } // 描述配置操做 case object DescribeConfigs extends Operation { val name = "DescribeConfigs" val toJava = AclOperation.DESCRIBE_CONFIGS } // 修改配置操做 case object AlterConfigs extends Operation { val name = "AlterConfigs" val toJava = AclOperation.ALTER_CONFIGS } // case object IdempotentWrite extends Operation { val name = "IdempotentWrite" val toJava = AclOperation.IDEMPOTENT_WRITE } // 表示全部的操做 case object All extends Operation { val name = "All" val toJava = AclOperation.ALL }
object PermissionType { def fromString(permissionType: String): PermissionType = { val pType = values.find(pType => pType.name.equalsIgnoreCase(permissionType)) pType.getOrElse(throw new KafkaException(permissionType + " not a valid permissionType name. The valid names are " + values.mkString(","))) } // 从values找到name相等的值 def fromJava(permissionType: AclPermissionType): PermissionType = fromString(permissionType.toString) // 取值集合 def values: Seq[PermissionType] = List(Allow, Deny) }
内置的PermissionType,只有两种,Allow表示容许,Deny表示拒绝。函数
case object Allow extends PermissionType { val name = "Allow" val toJava = AclPermissionType.ALLOW } case object Deny extends PermissionType { val name = "Deny" val toJava = AclPermissionType.DENY }
KafkaPrincipal默认是以User类型,来区分的。ui
public class KafkaPrincipal implements Principal { public static final String SEPARATOR = ":"; public static final String USER_TYPE = "User"; public final static KafkaPrincipal ANONYMOUS = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "ANONYMOUS"); // 主体类型 private String principalType; // 标识符 private String name; public KafkaPrincipal(String principalType, String name) { if (principalType == null || name == null) { throw new IllegalArgumentException("principalType and name can not be null"); } this.principalType = principalType; this.name = name; } public static KafkaPrincipal fromString(String str) { if (str == null || str.isEmpty()) { throw new IllegalArgumentException("expected a string in format principalType:principalName but got " + str); } // 以:字符切割 String[] split = str.split(SEPARATOR, 2); if (split == null || split.length != 2) { throw new IllegalArgumentException("expected a string in format principalType:principalName but got " + str); } return new KafkaPrincipal(split[0], split[1]); public String toString() { return principalType + SEPARATOR + name; } }
case class Acl(principal: KafkaPrincipal, permissionType: PermissionType, host: String, operation: Operation) { // 转为map类型。后面会再转为json类型,存到zookeeper的节点中 def toMap(): Map[String, Any] = { Map(Acl.PrincipalKey -> principal.toString, Acl.PermissionTypeKey -> permissionType.name, Acl.OperationKey -> operation.name, Acl.HostsKey -> host) } } object Acl { val WildCardPrincipal: KafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "*") val WildCardHost: String = "*" val AllowAllAcl = new Acl(WildCardPrincipal, Allow, WildCardHost, All) val PrincipalKey = "principal" val PermissionTypeKey = "permissionType" val OperationKey = "operation" val HostsKey = "host" val VersionKey = "version" val CurrentVersion = 1 val AclsKey = "acls" /** aclJson数据存储在zookeeper中,它的格式以下 { "version": 1, "acls": [ { "host":"host1", "permissionType": "Deny", "operation": "Read", "principal": "User:alice" } ] } */ def fromJson(aclJson: String): Set[Acl] = { if (aclJson == null || aclJson.isEmpty) return collection.immutable.Set.empty[Acl] var acls: collection.mutable.HashSet[Acl] = new collection.mutable.HashSet[Acl]() Json.parseFull(aclJson) match { case Some(m) => val aclMap = m.asInstanceOf[Map[String, Any]] //the acl json version. require(aclMap(VersionKey) == CurrentVersion) // 获取aclJson的acls的值 val aclSet: List[Map[String, Any]] = aclMap(AclsKey).asInstanceOf[List[Map[String, Any]]] aclSet.foreach(item => { val principal: KafkaPrincipal = KafkaPrincipal.fromString(item(PrincipalKey).asInstanceOf[String]) val permissionType: PermissionType = PermissionType.fromString(item(PermissionTypeKey).asInstanceOf[String]) val operation: Operation = Operation.fromString(item(OperationKey).asInstanceOf[String]) val host: String = item(HostsKey).asInstanceOf[String] // 构建Acl,而且添加到acls里 acls += new Acl(principal, permissionType, host, operation) }) case None => } acls.toSet } }
实现了Authorizer接口,主要提供了Acl的管理this
class SimpleAclAuthorizer extends Authorizer with Logging { private val aclCache = new scala.collection.mutable.HashMap[Resource, VersionedAcls] // 初始化配置 override def configure(javaConfigs: util.Map[String, _]) { val configs = javaConfigs.asScala val props = new java.util.Properties() configs.foreach { case (key, value) => props.put(key, value.toString) } // 从配置中获取super.users的值,这是一个字符串。 // 格式为User:user1;User:user2,用户之间用;隔开,一个用户是User:username的格式。 superUsers = configs.get(SimpleAclAuthorizer.SuperUsersProp).collect { case str: String if str.nonEmpty => str.split(";").map(s => KafkaPrincipal.fromString(s.trim)).toSet }.getOrElse(Set.empty[KafkaPrincipal]) // 这个配置表示,当没有找到对应的Acl规则时,默认是否容许 shouldAllowEveryoneIfNoAclIsFound = configs.get(SimpleAclAuthorizer.AllowEveryoneIfNoAclIsFoundProp).exists(_.toString.toBoolean) // 初始化zookeeper链接 val kafkaConfig = KafkaConfig.fromProps(props, doLog = false) val zkUrl = configs.get(SimpleAclAuthorizer.ZkUrlProp).map(_.toString).getOrElse(kafkaConfig.zkConnect) val zkConnectionTimeoutMs = configs.get(SimpleAclAuthorizer.ZkConnectionTimeOutProp).map(_.toString.toInt).getOrElse(kafkaConfig.zkConnectionTimeoutMs) val zkSessionTimeOutMs = configs.get(SimpleAclAuthorizer.ZkSessionTimeOutProp).map(_.toString.toInt).getOrElse(kafkaConfig.zkSessionTimeoutMs) zkUtils = ZkUtils(zkUrl, sessionTimeout = zkSessionTimeOutMs, connectionTimeout = zkConnectionTimeoutMs, kafkaConfig.zkEnableSecureAcls) // 保证Acl节点存在 zkUtils.makeSurePersistentPathExists(SimpleAclAuthorizer.AclZkPath) // 从zookeeper中读取数据,初始化 loadCache() // 保证Acl节点存在aclCache zkUtils.makeSurePersistentPathExists(SimpleAclAuthorizer.AclChangedZkPath) // 注册监听时间,当节点有变更时,会自行调用AclChangedNotificationHandler回调函数,更新aclCache aclChangeListener = new ZkNodeChangeNotificationListener(zkUtils, SimpleAclAuthorizer.AclChangedZkPath, SimpleAclAuthorizer.AclChangedPrefix, AclChangedNotificationHandler) aclChangeListener.init() } private def loadCache() { inWriteLock(lock) { // zkUtils.getChildren 返回子节点列表,子节点的数据类型为String // 返回"/acls"节点的子节点 val resourceTypes = zkUtils.getChildren(SimpleAclAuthorizer.AclZkPath) for (rType <- resourceTypes) { // 根据string实例化ResourceType val resourceType = ResourceType.fromString(rType) // 返回"/acls/resourceName"节点的子节点 val resourceTypePath = SimpleAclAuthorizer.AclZkPath + "/" + resourceType.name val resourceNames = zkUtils.getChildren(resourceTypePath) for (resourceName <- resourceNames) { // 根据type和name实例化Resource,而后从zookeeper中读取到对应的Acl列表 val versionedAcls = getAclsFromZk(Resource(resourceType, resourceName.toString)) // 更新aclCache updateCache(new Resource(resourceType, resourceName), versionedAcls) } } } } def toResourcePath(resource: Resource): String = { // 根据Resource找到zookeeper中对应的节点路径 SimpleAclAuthorizer.AclZkPath + "/" + resource.resourceType + "/" + resource.name } private def getAclsFromZk(resource: Resource): VersionedAcls = { // 读取Resource对应节点的数据 val (aclJson, stat) = zkUtils.readDataMaybeNull(toResourcePath(resource)) // 调用Acl.fromJson解析数据,返回VersionedAcls。VersionedAcls定义在下面 VersionedAcls(aclJson.map(Acl.fromJson).getOrElse(Set()), stat.getVersion) } // 更新aclCache private def updateCache(resource: Resource, versionedAcls: VersionedAcls) { if (versionedAcls.acls.nonEmpty) { aclCache.put(resource, versionedAcls) } else { aclCache.remove(resource) } } }
VersionedAcls的定义scala
object SimpleAclAuthorizer { // VersionedAcls只是Acl的列表和zkVersion的版本号 private case class VersionedAcls(acls: Set[Acl], zkVersion: Int) }
SimpleAclAuthorizer还有一个重要的方法authorize,用于检查权限
class SimpleAclAuthorizer extends Authorizer with Logging { override def authorize(session: Session, operation: Operation, resource: Resource): Boolean = { val principal = session.principal val host = session.clientAddress.getHostAddress // 获取resource对应的Acl列表和该resource的type的默认Acl列表 // WildCardResource表示匹配全部 val acls = getAcls(resource) ++ getAcls(new Resource(resource.resourceType, Resource.WildCardResource)) // 从上面resource找到全部的Acls中,查找是否有deny的acl val denyMatch = aclMatch(operation, resource, principal, host, Deny, acls) val allowOps = operation match { // Read, Write, Delete, Alter动做包含了Describe case Describe => Set[Operation](Describe, Read, Write, Delete, Alter) // AlterConfigs包含了DescribeConfigs case DescribeConfigs => Set[Operation](DescribeConfigs, AlterConfigs) // 其他的不修改 case _ => Set[Operation](operation) } // 遍历allowOps列表,查找是否有明确指定Allow的acl val allowMatch = allowOps.exists(operation => aclMatch(operation, resource, principal, host, Allow, acls)) // 有如下三种条件,知足其一,则认为有权限 // 是不是super user val authorized = isSuperUser(operation, resource, principal, host) || // 若是acls没有找到,查看默认配置 isEmptyAclAndAuthorized(operation, resource, principal, host, acls) || // 若是没有找到deny的acl,而且还有allow的acl (!denyMatch && allowMatch) logAuditMessage(principal, authorized, operation, resource, host) authorized } private def aclMatch(operations: Operation, resource: Resource, principal: KafkaPrincipal, host: String, permissionType: PermissionType, acls: Set[Acl]): Boolean = { acls.find { acl => // // permissionType相等 acl.permissionType == permissionType && // principal相等,或者principal为WildCardPrincipal,表示匹配全部 (acl.principal == principal || acl.principal == Acl.WildCardPrincipal) && // operation相等,或者operation为All,表示匹配全部 (operations == acl.operation || acl.operation == All) && // host相等,或者host为WildCardHost,表示匹配全部 (acl.host == host || acl.host == Acl.WildCardHost) }.exists { acl => authorizerLogger.debug(s"operation = $operations on resource = $resource from host = $host is $permissionType based on acl = $acl") // 找到后,则返回true。没有,则返回false true } } // 是否principal为super user def isSuperUser(operation: Operation, resource: Resource, principal: KafkaPrincipal, host: String): Boolean = { // superUsers是否包含principal if (superUsers.contains(principal)) { authorizerLogger.debug(s"principal = $principal is a super user, allowing operation without checking acls.") true } else false } def isEmptyAclAndAuthorized(operation: Operation, resource: Resource, principal: KafkaPrincipal, host: String, acls: Set[Acl]): Boolean = { if (acls.isEmpty) { authorizerLogger.debug(s"No acl found for resource $resource, authorized = $shouldAllowEveryoneIfNoAclIsFound") // 返回配置的值 shouldAllowEveryoneIfNoAclIsFound } else false } }
由于acl数据持久化到zookeeper中,因此当zookeeper中的数据发生改变时,应该还有监听的做用。这个是经过zookeeper的watch来实现的。
acl的更新涉及到zookeeper的两个地方。一个是Resource节点,存储acls。另外一个是持久顺序节点,它的子节点记录了每次Resource的更新。
aclChangeListener = new ZkNodeChangeNotificationListener(zkUtils, SimpleAclAuthorizer.AclChangedZkPath, SimpleAclAuthorizer.AclChangedPrefix, AclChangedNotificationHandler) aclChangeListener.init() class ZkNodeChangeNotificationListener(private val zkUtils: ZkUtils, private val seqNodeRoot: String, private val seqNodePrefix: String, private val notificationHandler: NotificationHandler, private val changeExpirationMs: Long = 15 * 60 * 1000, private val time: Time = Time.SYSTEM) extends Logging { private var lastExecutedChange = -1L private val isClosed = new AtomicBoolean(false) // 初始化 def init() { zkUtils.makeSurePersistentPathExists(seqNodeRoot) // 监听seqNodeRoot节点的子节点变化。 // seqNodeRoot就是上面所说的顺序节点 zkUtils.zkClient.subscribeChildChanges(seqNodeRoot, NodeChangeListener) zkUtils.zkClient.subscribeStateChanges(ZkStateChangeListener) processAllNotifications() } def processAllNotifications() { // 获取seqNodeRoot的子节点。子节点存储了resource val changes = zkUtils.zkClient.getChildren(seqNodeRoot) // 而且从小到大排序 processNotifications(changes.asScala.sorted) } private def processNotifications(notifications: Seq[String]) { if (notifications.nonEmpty) { info(s"Processing notification(s) to $seqNodeRoot") try { val now = time.milliseconds for (notification <- notifications) { // 获取当前节点的顺序号 val changeId = changeNumber(notification) if (changeId > lastExecutedChange) { // 若是changeId比上次更新的id大,则表示这是新的纪录 val changeZnode = seqNodeRoot + "/" + notification // 读取当前节点的数据,表示Resource的字符串 val (data, _) = zkUtils.readDataMaybeNull(changeZnode) // 调用notificationHandler的processNotification方法 data.map(notificationHandler.processNotification(_)).getOrElse { logger.warn(s"read null data from $changeZnode when processing notification $notification") } } // 更新lastExecutedChange lastExecutedChange = changeId } purgeObsoleteNotifications(now, notifications) } catch { case e: ZkInterruptedException => if (!isClosed.get) throw e } } } // 由于它是顺序节点的子节点,因此名称后缀会有自增数字 // 相似于acl_changes_0000000001, acl_changes_0000000002 private def changeNumber(name: String): Long = name.substring(seqNodePrefix.length).toLong // 监听子节点变化 object NodeChangeListener extends IZkChildListener { override def handleChildChange(path: String, notifications: java.util.List[String]) { try { import scala.collection.JavaConverters._ if (notifications != null) // 调用processNotifications方法 processNotifications(notifications.asScala.sorted) } catch { case e: Exception => error(s"Error processing notification change for path = $path and notification= $notifications :", e) } } } }
上面processNotifications方法,调用了AclChangedNotificationHandler 的processNotification方法。
object AclChangedNotificationHandler extends NotificationHandler { override def processNotification(notificationMessage: String) { // 经过字符串,实例化Resource val resource: Resource = Resource.fromString(notificationMessage) inWriteLock(lock) { // 从zookeeper中读取该resource的acls val versionedAcls = getAclsFromZk(resource) // 更新aclCache updateCache(resource, versionedAcls) } }
本章先介绍了与acl相关的数据结构。Resource表明资源,Acl表示访问规则。
而后介绍了acl的管理,SimpleAclAuthorizer类。其中涉及到了zookeeper的数据持久化,aclCache的更新。二者之间的同步,经过了zookeeper的watch机制来实现。