本节全部的代码基于最新的1.13.4版本。html
同Kubernetes全部的组件启动代码一致,apiserver启动使用的是cobra
的命令行方式 node
Run
方法。
Run
方法比较简单
Server端的建立集中在CreateServerChain
方法。方法代码以下:git
// CreateServerChain creates the apiservers connected via delegation.
// CreateServerChain建立经过委托链接的apiservers,建立一系列的server
func CreateServerChain(completedOptions completedServerRunOptions, stopCh <-chan struct{}) (*genericapiserver.GenericAPIServer, error) {
nodeTunneler, proxyTransport, err := CreateNodeDialer(completedOptions)
if err != nil {
return nil, err
}
// 1.建立kubeAPIServerConfig配置
kubeAPIServerConfig, insecureServingInfo, serviceResolver, pluginInitializer, admissionPostStartHook, err := CreateKubeAPIServerConfig(completedOptions, nodeTunneler, proxyTransport)
if err != nil {
return nil, err
}
// If additional API servers are added, they should be gated.
// 2.判断是否配置了扩展API server,建立apiExtensionsConfig配置
apiExtensionsConfig, err := createAPIExtensionsConfig(*kubeAPIServerConfig.GenericConfig, kubeAPIServerConfig.ExtraConfig.VersionedInformers, pluginInitializer, completedOptions.ServerRunOptions, completedOptions.MasterCount,
serviceResolver, webhook.NewDefaultAuthenticationInfoResolverWrapper(proxyTransport, kubeAPIServerConfig.GenericConfig.LoopbackClientConfig))
if err != nil {
return nil, err
}
// apiExtensionsServer,可扩展的API server
// 3.启动扩展的API server
apiExtensionsServer, err := createAPIExtensionsServer(apiExtensionsConfig, genericapiserver.NewEmptyDelegate())
if err != nil {
return nil, err
}
// 4.启动最核心的kubeAPIServer
kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, apiExtensionsServer.GenericAPIServer, admissionPostStartHook)
if err != nil {
return nil, err
}
// otherwise go down the normal path of standing the aggregator up in front of the API server
// this wires up openapi
kubeAPIServer.GenericAPIServer.PrepareRun()
// This will wire up openapi for extension api server
apiExtensionsServer.GenericAPIServer.PrepareRun()
// aggregator comes last in the chain
// 5.聚合层的配置aggregatorConfig
aggregatorConfig, err := createAggregatorConfig(*kubeAPIServerConfig.GenericConfig, completedOptions.ServerRunOptions, kubeAPIServerConfig.ExtraConfig.VersionedInformers, serviceResolver, proxyTransport, pluginInitializer)
if err != nil {
return nil, err
}
// 6.aggregatorServer,聚合服务器,对全部的服务器访问的整合
aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, apiExtensionsServer.Informers)
if err != nil {
// we don't need special handling for innerStopCh because the aggregator server doesn't create any go routines
return nil, err
}
// 7.启动非安全端口的server
if insecureServingInfo != nil {
insecureHandlerChain := kubeserver.BuildInsecureHandlerChain(aggregatorServer.GenericAPIServer.UnprotectedHandler(), kubeAPIServerConfig.GenericConfig)
if err := insecureServingInfo.Serve(insecureHandlerChain, kubeAPIServerConfig.GenericConfig.RequestTimeout, stopCh); err != nil {
return nil, err
}
}
// 8.返回GenericAPIServer,后续启动安全端口的server
return aggregatorServer.GenericAPIServer, nil
}
复制代码
建立过程主要有如下步骤:
一、根据配置构造apiserver的配置,调用方法CreateKubeAPIServerConfig
;
二、根据配置构造扩展的apiserver的配置,调用方法为createAPIExtensionsConfig
;
三、建立server,包括扩展的apiserver和原生的apiserver,调用方法为createAPIExtensionsServer
和CreateKubeAPIServer
。主要就是将各个handler的路由方法注册到Container中去,彻底遵循go-restful的设计模式,即将处理方法注册到Route中去,同一个根路径下的Route注册到WebService中去,WebService注册到Container中,Container负责分发。访问的过程为Container-->WebService-->Route。更加详细的go-restful使用能够参考其代码;
四、聚合server的配置和和建立。主要就是将原生的apiserver和扩展的apiserver的访问进行整合,添加后续的一些处理接口。调用方法为createAggregatorConfig
和createAggregatorServer
;
五、建立完成,返回配置的server信息。
以上几个步骤,最核心的就是apiserver如何建立,即如何按照go-restful
的模式,添加路由和相应的处理方法,以CreateKubeAPIServer
方法为例,createAPIExtensionsServer
相似。github
CreateKubeAPIServer
方法以下web
func CreateKubeAPIServer(kubeAPIServerConfig *master.Config, delegateAPIServer genericapiserver.DelegationTarget, admissionPostStartHook genericapiserver.PostStartHookFunc) (*master.Master, error) {
kubeAPIServer, err := kubeAPIServerConfig.Complete().New(delegateAPIServer)
if err != nil {
return nil, err
}
kubeAPIServer.GenericAPIServer.AddPostStartHookOrDie("start-kube-apiserver-admission-initializer", admissionPostStartHook)
return kubeAPIServer, nil
}
复制代码
经过Complete
方法完成配置的最终合法化,New
方法生成kubeAPIServer的配置,进入New
方法,数据库
// New returns a new instance of Master from the given config.
// Certain config fields will be set to a default value if unset.
// Certain config fields must be specified, including:
// KubeletClientConfig
// 经过给定的配置,返回一个新的Master实例。对于部分未配置的选项,可使用默认配置;可是对于KubeletClientConfig这样的配置,必须手动指定
func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*Master, error) {
if reflect.DeepEqual(c.ExtraConfig.KubeletClientConfig, kubeletclient.KubeletClientConfig{}) {
return nil, fmt.Errorf("Master.New() called with empty config.KubeletClientConfig")
}
// 1.初始化,建立go-restful的Container,初始化apiServerHandler
s, err := c.GenericConfig.New("kube-apiserver", delegationTarget)
if err != nil {
return nil, err
}
if c.ExtraConfig.EnableLogsSupport {
routes.Logs{}.Install(s.Handler.GoRestfulContainer)
}
m := &Master{
GenericAPIServer: s,
}
// install legacy rest storage
// /api开头的版本api注册到Container中去,如Pod、Namespace等资源
if c.ExtraConfig.APIResourceConfigSource.VersionEnabled(apiv1.SchemeGroupVersion) {
legacyRESTStorageProvider := corerest.LegacyRESTStorageProvider{
StorageFactory: c.ExtraConfig.StorageFactory,
ProxyTransport: c.ExtraConfig.ProxyTransport,
KubeletClientConfig: c.ExtraConfig.KubeletClientConfig,
EventTTL: c.ExtraConfig.EventTTL,
ServiceIPRange: c.ExtraConfig.ServiceIPRange,
ServiceNodePortRange: c.ExtraConfig.ServiceNodePortRange,
LoopbackClientConfig: c.GenericConfig.LoopbackClientConfig,
ServiceAccountIssuer: c.ExtraConfig.ServiceAccountIssuer,
ServiceAccountMaxExpiration: c.ExtraConfig.ServiceAccountMaxExpiration,
APIAudiences: c.GenericConfig.Authentication.APIAudiences,
}
m.InstallLegacyAPI(&c, c.GenericConfig.RESTOptionsGetter, legacyRESTStorageProvider)
}
// The order here is preserved in discovery.
// If resources with identical names exist in more than one of these groups (e.g. "deployments.apps"" and "deployments.extensions"),
// the order of this list determines which group an unqualified resource name (e.g. "deployments") should prefer.
// This priority order is used for local discovery, but it ends up aggregated in `k8s.io/kubernetes/cmd/kube-apiserver/app/aggregator.go
// with specific priorities.
// TODO: describe the priority all the way down in the RESTStorageProviders and plumb it back through the various discovery
// handlers that we have.
// /apis开头版本的api注册到Container中
restStorageProviders := []RESTStorageProvider{
auditregistrationrest.RESTStorageProvider{},
authenticationrest.RESTStorageProvider{Authenticator: c.GenericConfig.Authentication.Authenticator, APIAudiences: c.GenericConfig.Authentication.APIAudiences},
authorizationrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer, RuleResolver: c.GenericConfig.RuleResolver},
autoscalingrest.RESTStorageProvider{},
batchrest.RESTStorageProvider{},
certificatesrest.RESTStorageProvider{},
coordinationrest.RESTStorageProvider{},
extensionsrest.RESTStorageProvider{},
networkingrest.RESTStorageProvider{},
policyrest.RESTStorageProvider{},
rbacrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer},
schedulingrest.RESTStorageProvider{},
settingsrest.RESTStorageProvider{},
storagerest.RESTStorageProvider{},
// keep apps after extensions so legacy clients resolve the extensions versions of shared resource names.
// See https://github.com/kubernetes/kubernetes/issues/42392
appsrest.RESTStorageProvider{},
admissionregistrationrest.RESTStorageProvider{},
eventsrest.RESTStorageProvider{TTL: c.ExtraConfig.EventTTL},
}
m.InstallAPIs(c.ExtraConfig.APIResourceConfigSource, c.GenericConfig.RESTOptionsGetter, restStorageProviders...)
if c.ExtraConfig.Tunneler != nil {
m.installTunneler(c.ExtraConfig.Tunneler, corev1client.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig).Nodes())
}
m.GenericAPIServer.AddPostStartHookOrDie("ca-registration", c.ExtraConfig.ClientCARegistrationHook.PostStartHook)
return m, nil
}
复制代码
包含如下步骤:
一、按照go-restful
的模式,调用c.GenericConfig.New
方法初始化化Container,即gorestfulContainer
,初始方法为NewAPIServerHandler
。初始化以后,添加路由。bootstrap
func installAPI(s *GenericAPIServer, c *Config) {
// 添加"/"与"/index.html"路由
if c.EnableIndex {
routes.Index{}.Install(s.listedPathProvider, s.Handler.NonGoRestfulMux)
}
// 添加"/swagger-ui/"路由
if c.SwaggerConfig != nil && c.EnableSwaggerUI {
routes.SwaggerUI{}.Install(s.Handler.NonGoRestfulMux)
}
// 添加"/debug"相关路由
if c.EnableProfiling {
routes.Profiling{}.Install(s.Handler.NonGoRestfulMux)
if c.EnableContentionProfiling {
goruntime.SetBlockProfileRate(1)
}
// so far, only logging related endpoints are considered valid to add for these debug flags.
routes.DebugFlags{}.Install(s.Handler.NonGoRestfulMux, "v", routes.StringFlagPutHandler(logs.GlogSetter))
}
// 添加"/metrics"路由
if c.EnableMetrics {
if c.EnableProfiling {
routes.MetricsWithReset{}.Install(s.Handler.NonGoRestfulMux)
} else {
routes.DefaultMetrics{}.Install(s.Handler.NonGoRestfulMux)
}
}
// 添加"/version"路由
routes.Version{Version: c.Version}.Install(s.Handler.GoRestfulContainer)
if c.EnableDiscovery {
s.Handler.GoRestfulContainer.Add(s.DiscoveryGroupManager.WebService())
}
}
复制代码
该方法中添加了包括/、/swagger-ui、/debug/*、/metrics、/version几条路由,经过访问apiserver便可看到相关的信息 后端
api开头的路由经过InstallLegacyAPI
方法添加。进入InstallLegacyAPI
方法,以下:设计模式
func (m *Master) InstallLegacyAPI(c *completedConfig, restOptionsGetter generic.RESTOptionsGetter, legacyRESTStorageProvider corerest.LegacyRESTStorageProvider) {
legacyRESTStorage, apiGroupInfo, err := legacyRESTStorageProvider.NewLegacyRESTStorage(restOptionsGetter)
if err != nil {
klog.Fatalf("Error building core storage: %v", err)
}
controllerName := "bootstrap-controller"
coreClient := corev1client.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig)
bootstrapController := c.NewBootstrapController(legacyRESTStorage, coreClient, coreClient, coreClient)
m.GenericAPIServer.AddPostStartHookOrDie(controllerName, bootstrapController.PostStartHook)
m.GenericAPIServer.AddPreShutdownHookOrDie(controllerName, bootstrapController.PreShutdownHook)
if err := m.GenericAPIServer.InstallLegacyAPIGroup(genericapiserver.DefaultLegacyAPIPrefix, &apiGroupInfo); err != nil {
klog.Fatalf("Error in registering group versions: %v", err)
}
}
复制代码
经过NewLegacyRESTStorage
方法建立各个资源的RESTStorage。RESTStorage是一个结构体,具体的定义在vendor/k8s.io/apiserver/pkg/registry/generic/registry/store.go
下,结构体内主要包含NewFunc
返回特定资源信息、NewListFunc
返回特定资源列表、CreateStrategy
特定资源建立时的策略、UpdateStrategy
更新时的策略以及DeleteStrategy
删除时的策略等重要方法。
在NewLegacyRESTStorage
内部,能够看到建立了多种资源的RESTStorage api
NewREST
方法构造相应的资源。待全部资源的store建立完成以后,使用
restStorageMap
的Map类型将每一个资源的路由和对应的store对应起来,方便后续去作路由的统一规划,代码以下:
restStorageMap := map[string]rest.Storage{
"pods": podStorage.Pod,
"pods/attach": podStorage.Attach,
"pods/status": podStorage.Status,
"pods/log": podStorage.Log,
"pods/exec": podStorage.Exec,
"pods/portforward": podStorage.PortForward,
"pods/proxy": podStorage.Proxy,
"pods/binding": podStorage.Binding,
"bindings": podStorage.Binding,
"podTemplates": podTemplateStorage,
"replicationControllers": controllerStorage.Controller,
"replicationControllers/status": controllerStorage.Status,
"services": serviceRest,
"services/proxy": serviceRestProxy,
"services/status": serviceStatusStorage,
"endpoints": endpointsStorage,
"nodes": nodeStorage.Node,
"nodes/status": nodeStorage.Status,
"nodes/proxy": nodeStorage.Proxy,
"events": eventStorage,
"limitRanges": limitRangeStorage,
"resourceQuotas": resourceQuotaStorage,
"resourceQuotas/status": resourceQuotaStatusStorage,
"namespaces": namespaceStorage,
"namespaces/status": namespaceStatusStorage,
"namespaces/finalize": namespaceFinalizeStorage,
"secrets": secretStorage,
"serviceAccounts": serviceAccountStorage,
"persistentVolumes": persistentVolumeStorage,
"persistentVolumes/status": persistentVolumeStatusStorage,
"persistentVolumeClaims": persistentVolumeClaimStorage,
"persistentVolumeClaims/status": persistentVolumeClaimStatusStorage,
"configMaps": configMapStorage,
"componentStatuses": componentstatus.NewStorage(componentStatusStorage{c.StorageFactory}.serversToValidate),
}
if legacyscheme.Scheme.IsVersionRegistered(schema.GroupVersion{Group: "autoscaling", Version: "v1"}) {
restStorageMap["replicationControllers/scale"] = controllerStorage.Scale
}
if legacyscheme.Scheme.IsVersionRegistered(schema.GroupVersion{Group: "policy", Version: "v1beta1"}) {
restStorageMap["pods/eviction"] = podStorage.Eviction
}
if serviceAccountStorage.Token != nil {
restStorageMap["serviceaccounts/token"] = serviceAccountStorage.Token
}
apiGroupInfo.VersionedResourcesStorageMap["v1"] = restStorageMap
复制代码
最终完成以api开头的全部资源的RESTStorage操做。
建立完以后,则开始进行路由的安装,执行InstallLegacyAPIGroup
方法,主要调用链为InstallLegacyAPIGroup-->installAPIResources-->InstallREST-->Install-->registerResourceHandlers
,最终核心的路由构造在registerResourceHandlers
方法内。这是一个很是复杂的方法,整个方法的代码在700行左右。方法的主要功能是经过上一步骤构造的RESTStorage判断该资源能够执行哪些操做(如create、update等),将其对应的操做存入到action,每个action对应一个标准的rest操做,如create对应的action操做为POST、update对应的action操做为PUT。最终根据actions数组依次遍历,对每个操做添加一个handler方法,注册到route中去,route注册到webservice中去,完美匹配go-restful的设计模式。
api开头的路由主要是对基础资源的路由实现,而对于其余附加的资源,如认证相关、网络相关等各类扩展的api资源,统一以apis开头命名,实现入口为InstallAPIs
。
InstallAPIs
与InstallLegacyAPIGroup
主要的区别是获取RESTStorage的方式。对于api开头的路由来讲,都是/api/v1这种统一的格式;而对于apis开头路由则不同,它包含了多种不一样的格式(Kubernetes代码内叫groupName),如/apis/apps、/apis/certificates.k8s.io等各类无规律的groupName。为此,kubernetes提供了一种RESTStorageProvider
的工厂模式的接口
// RESTStorageProvider is a factory type for REST storage.
type RESTStorageProvider interface {
GroupName() string
NewRESTStorage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter) (genericapiserver.APIGroupInfo, bool)
}
复制代码
全部以apis开头的路由的资源都须要实现该接口。GroupName()方法获取到的就是相似于/apis/apps、/apis/certificates.k8s.io这样的groupName,NewRESTStorage方法获取到的是相对应的RESTStorage封装后的信息。各类资源的NewRESTStorage接口实现如图:
经过CreateServerChain
建立完server后,继续调用GenericAPIServer
的Run方法完成最终的启动工做。首先经过PrepareRun
方法完成启动前的路由收尾工做,该方法主要完成了Swagger
和OpenAPI
路由的注册工做(Swagger
和OpenAPI
主要包含了Kubernetes API的全部细节与规范),并完成/healthz路由的注册工做。完成后,开始最终的server启动工做。
Run
方法里经过NonBlockingRun
方法启动安全的http server(非安全方式的启动在CreateServerChain
方法已经完成)
// Run spawns the secure http server. It only returns if stopCh is closed
// or the secure port cannot be listened on initially.
// Run方法会建立一个安全的http server。只有在stopCh关闭或最初没法监听安全端口时返回
func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
// NonBlockingRun建立一个安全的http server
err := s.NonBlockingRun(stopCh)
if err != nil {
return err
}
<-stopCh
// 接收到stopCh以后的处理动做
err = s.RunPreShutdownHooks()
if err != nil {
return err
}
// Wait for all requests to finish, which are bounded by the RequestTimeout variable.
s.HandlerChainWaitGroup.Wait()
return nil
}
复制代码
启动主要工做包括配置各类证书认证、时间参数、报文大小参数之类,以后经过调用net/http
库的启动方式启动,代码比较简洁,不一一列出了。
ApiServer中与权限相关的主要有三种机制,即经常使用的认证、鉴权和准入控制。对apiserver来讲,主要提供的就是rest风格的接口,因此各类权限最终仍是集中到对接口的权限判断上。
以最核心的kubeAPIServerConfig
举例,在CreateServerChain
方法中,调用了CreateKubeAPIServerConfig
的方法,该方法主要的做用是建立kubeAPIServer的配置。进入该方法,调用了buildGenericConfig
建立一些通用的配置,在NewConfig
下,返回了DefaultBuildHandlerChain
,该方法主要就是用来对apiserver rest接口的链式判断,即俗称的filter操做,先记录下,后续分析。
func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
handler := genericapifilters.WithAuthorization(apiHandler, c.Authorization.Authorizer, c.Serializer)
handler = genericfilters.WithMaxInFlightLimit(handler, c.MaxRequestsInFlight, c.MaxMutatingRequestsInFlight, c.LongRunningFunc)
handler = genericapifilters.WithImpersonation(handler, c.Authorization.Authorizer, c.Serializer)
handler = genericapifilters.WithAudit(handler, c.AuditBackend, c.AuditPolicyChecker, c.LongRunningFunc)
failedHandler := genericapifilters.Unauthorized(c.Serializer, c.Authentication.SupportsBasicAuth)
failedHandler = genericapifilters.WithFailedAuthenticationAudit(failedHandler, c.AuditBackend, c.AuditPolicyChecker)
handler = genericapifilters.WithAuthentication(handler, c.Authentication.Authenticator, failedHandler, c.Authentication.APIAudiences)
handler = genericfilters.WithCORS(handler, c.CorsAllowedOriginList, nil, nil, nil, "true")
handler = genericfilters.WithTimeoutForNonLongRunningRequests(handler, c.LongRunningFunc, c.RequestTimeout)
handler = genericfilters.WithWaitGroup(handler, c.LongRunningFunc, c.HandlerChainWaitGroup)
handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver)
handler = genericfilters.WithPanicRecovery(handler)
return handler
}
复制代码
配置文件建立完成后,再进行建立工做,进入到CreateKubeAPIServer
方法,在初始化go-restful的Container的方法内,能够看到
handlerChainBuilder
方法就是对返回的
DefaultBuildHandlerChain
方法的一种封装,并做为参数传入到
NewAPIServerHandler
方法内。进入
NewAPIServerHandler
方法,以下:
func NewAPIServerHandler(name string, s runtime.NegotiatedSerializer, handlerChainBuilder HandlerChainBuilderFn, notFoundHandler http.Handler) *APIServerHandler {
nonGoRestfulMux := mux.NewPathRecorderMux(name)
if notFoundHandler != nil {
nonGoRestfulMux.NotFoundHandler(notFoundHandler)
}
gorestfulContainer := restful.NewContainer()
gorestfulContainer.ServeMux = http.NewServeMux()
gorestfulContainer.Router(restful.CurlyRouter{}) // e.g. for proxy/{kind}/{name}/{*}
gorestfulContainer.RecoverHandler(func(panicReason interface{}, httpWriter http.ResponseWriter) {
logStackOnRecover(s, panicReason, httpWriter)
})
gorestfulContainer.ServiceErrorHandler(func(serviceErr restful.ServiceError, request *restful.Request, response *restful.Response) {
serviceErrorHandler(s, serviceErr, request, response)
})
director := director{
name: name,
goRestfulContainer: gorestfulContainer,
nonGoRestfulMux: nonGoRestfulMux,
}
return &APIServerHandler{
FullHandlerChain: handlerChainBuilder(director),
GoRestfulContainer: gorestfulContainer,
NonGoRestfulMux: nonGoRestfulMux,
Director: director,
}
}
复制代码
配置中经过将director
做为参数传到handlerChainBuilder
的回调方法内,完成对gorestfulContainer的handler的注册工做。其实director就是一个实现了http.Handler的变量。因此,整个的处理逻辑就是将类型为http.Handler的director做为参数,传递到链式filter
的DefaultBuildHandlerChain
方法内。经过DefaultBuildHandlerChain
对每个步骤的filter
操做,完成权限控制等之类的操做。如何经过net/http
包实现filter
的功能,能够参考这篇文章。完成相似于filter
的功能以后,后续就是作启动工做,包括证书验证、TLS认证之类的工做,不作过多赘述。主要看下filter
的DefaultBuildHandlerChain
方法是如何处理接口的鉴权操做。
Kubernetes中比较重要的用的比较多的可能就是RBAC了。在DefaultBuildHandlerChain
方法内,经过调用genericapifilters.WithAuthorization
方法,实现对每一个接口的权限的filter
操做。WithAuthorization
方法以下
func WithAuthorization(handler http.Handler, a authorizer.Authorizer, s runtime.NegotiatedSerializer) http.Handler {
if a == nil {
klog.Warningf("Authorization is disabled")
return handler
}
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
ctx := req.Context()
ae := request.AuditEventFrom(ctx)
attributes, err := GetAuthorizerAttributes(ctx)
if err != nil {
responsewriters.InternalError(w, req, err)
return
}
authorized, reason, err := a.Authorize(attributes)
// an authorizer like RBAC could encounter evaluation errors and still allow the request, so authorizer decision is checked before error here.
if authorized == authorizer.DecisionAllow {
audit.LogAnnotation(ae, decisionAnnotationKey, decisionAllow)
audit.LogAnnotation(ae, reasonAnnotationKey, reason)
handler.ServeHTTP(w, req)
return
}
if err != nil {
audit.LogAnnotation(ae, reasonAnnotationKey, reasonError)
responsewriters.InternalError(w, req, err)
return
}
klog.V(4).Infof("Forbidden: %#v, Reason: %q", req.RequestURI, reason)
audit.LogAnnotation(ae, decisionAnnotationKey, decisionForbid)
audit.LogAnnotation(ae, reasonAnnotationKey, reason)
responsewriters.Forbidden(ctx, attributes, w, req, reason, s)
})
}
复制代码
一、调用GetAuthorizerAttributes
方法获取配置的各类属性值;
二、调用Authorize
方法判断权限是否经过,不一样的权限实现其接口,完成鉴权任务;
handler.ServeHTTP
方法继续下一步的
filter
操做;不然,直接返回错误信息。
Authorize
方法最终调用
VisitRulesFor
方法实现权限的判断,方法在
kubernetes/pkg/registry/rbac/validation/rule.go
文件内。
VisitRulesFor
主要代码以下
func (r *DefaultRuleResolver) VisitRulesFor(user user.Info, namespace string, visitor func(source fmt.Stringer, rule *rbacv1.PolicyRule, err error) bool) {
if clusterRoleBindings, err := r.clusterRoleBindingLister.ListClusterRoleBindings(); err != nil {
if !visitor(nil, nil, err) {
return
}
} else {
sourceDescriber := &clusterRoleBindingDescriber{}
for _, clusterRoleBinding := range clusterRoleBindings {
subjectIndex, applies := appliesTo(user, clusterRoleBinding.Subjects, "")
if !applies {
continue
}
rules, err := r.GetRoleReferenceRules(clusterRoleBinding.RoleRef, "")
if err != nil {
if !visitor(nil, nil, err) {
return
}
continue
}
sourceDescriber.binding = clusterRoleBinding
sourceDescriber.subject = &clusterRoleBinding.Subjects[subjectIndex]
for i := range rules {
if !visitor(sourceDescriber, &rules[i], nil) {
return
}
}
}
}
if len(namespace) > 0 {
if roleBindings, err := r.roleBindingLister.ListRoleBindings(namespace); err != nil {
if !visitor(nil, nil, err) {
return
}
} else {
sourceDescriber := &roleBindingDescriber{}
for _, roleBinding := range roleBindings {
subjectIndex, applies := appliesTo(user, roleBinding.Subjects, namespace)
if !applies {
continue
}
rules, err := r.GetRoleReferenceRules(roleBinding.RoleRef, namespace)
if err != nil {
if !visitor(nil, nil, err) {
return
}
continue
}
sourceDescriber.binding = roleBinding
sourceDescriber.subject = &roleBinding.Subjects[subjectIndex]
for i := range rules {
if !visitor(sourceDescriber, &rules[i], nil) {
return
}
}
}
}
}
}
复制代码
主要工做就是对clusterRoleBinding
以及roleBinding
与配置的资源进行判断,比较清晰明了,这与咱们使用RBAC的思路基本一致。
ApiServer与数据库的交互主要指的是与etcd的交互。Kubernetes全部的组件不直接与etcd交互,都是经过请求apiserver,apiserver与etcd进行交互完成数据的最终落盘。
在以前的路由实现已经说过,apiserver最终实现的handler对应的后端数据是以Store的结构保存的。这里以api开头的路由举例,在NewLegacyRESTStorage
方法中,经过NewREST
或者NewStorage
会生成各类资源对应的Storage,以endpoints
为例,生成的方法以下
func NewREST(optsGetter generic.RESTOptionsGetter) *REST {
store := &genericregistry.Store{
NewFunc: func() runtime.Object { return &api.Endpoints{} },
NewListFunc: func() runtime.Object { return &api.EndpointsList{} },
DefaultQualifiedResource: api.Resource("endpoints"),
CreateStrategy: endpoint.Strategy,
UpdateStrategy: endpoint.Strategy,
DeleteStrategy: endpoint.Strategy,
TableConvertor: printerstorage.TableConvertor{TablePrinter: printers.NewTablePrinter().With(printersinternal.AddHandlers)},
}
options := &generic.StoreOptions{RESTOptions: optsGetter}
if err := store.CompleteWithOptions(options); err != nil {
panic(err) // TODO: Propagate error up
}
return &REST{store}
}
复制代码
主要看CompleteWithOptions
方法,在CompleteWithOptions
方法内,调用了RESTOptions的GetRESTOptions
方法,依次调用StorageWithCacher-->NewRawStorage-->Create
方法建立最终依赖的后端存储:
// Create creates a storage backend based on given config.
func Create(c storagebackend.Config) (storage.Interface, DestroyFunc, error) {
switch c.Type {
case "etcd2":
return nil, nil, fmt.Errorf("%v is no longer a supported storage backend", c.Type)
case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3:
return newETCD3Storage(c)
default:
return nil, nil, fmt.Errorf("unknown storage type: %s", c.Type)
}
}
复制代码
能够看到,经过Create
方法判断是建立etcd2或是etcd3的后端etcd版本,目前版本默认的是etcd3。
建立完成对应的存储以后,接下来要作的工做就是将对应的handler方法和最终的后台存储实现绑定起来(handler方法处理最终的数据须要落盘)。
还记着以前说的有个比较长的方法registerResourceHandlers
,用来处理具体的handler路由。再次回到该方法,
POST
方法为例,对应的是Create操做。
handler
参数的调用最终都会走到
createHandler
方法处,位于
kubernetes/vendor/k8s.io/apiserver/pkg/endpoints/handlers/crete.go
下。最核心的步骤即调用了Create方法
kubernetes/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go
下的
Create
方法。该方法主要包含
BeforeCreate
、
Storage.Create
、
AfterCreate
以及
Decorator
等主要方法。对应于
POST
操做,则最主要的方法为
Storage.Create
。因为目前使用基本都是etcd3,因此实现的方法以下
func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error {
if version, err := s.versioner.ObjectResourceVersion(obj); err == nil && version != 0 {
return errors.New("resourceVersion should not be set on objects to be created")
}
if err := s.versioner.PrepareObjectForStorage(obj); err != nil {
return fmt.Errorf("PrepareObjectForStorage failed: %v", err)
}
data, err := runtime.Encode(s.codec, obj)
if err != nil {
return err
}
key = path.Join(s.pathPrefix, key)
opts, err := s.ttlOpts(ctx, int64(ttl))
if err != nil {
return err
}
newData, err := s.transformer.TransformToStorage(data, authenticatedDataString(key))
if err != nil {
return storage.NewInternalError(err.Error())
}
txnResp, err := s.client.KV.Txn(ctx).If(
notFound(key),
).Then(
clientv3.OpPut(key, string(newData), opts...),
).Commit()
if err != nil {
return err
}
if !txnResp.Succeeded {
return storage.NewKeyExistsError(key, 0)
}
if out != nil {
putResp := txnResp.Responses[0].GetResponsePut()
return decode(s.codec, s.versioner, data, out, putResp.Header.Revision)
}
return nil
}
复制代码
主要操做为:
一、调用Encode
方法序列化;
二、调用path.Join
解析Key;
三、调用TransformToStorage
将数据类型进行转换;
四、调用客户端方法进行etcd的写入操做。
至此,完成handler处理与对应的etcd数据库操做的绑定,即完成整个路由后端的操做步骤。
对etcd操做更具体的能够参考这篇文章。
以上均为我的学习总结,若是错误欢迎指正!