version:1.9.6api
导航:服务器
1.寻找入口app
2.构建命令行框架
3.建立服务链 CreateServerChainfrontend
4.启动服务 s.SecureServingInfo.Serve(s.Handler, s.ShutdownTimeout, internalStopCh)tcp
1.命令行入口ide
cmd/kube-apiserver/apiserver.go函数
命令行解析框架 &cobra 须要先去了解一下:oop
https://o-my-chenjian.com/2017/09/20/Using-Cobra-With-Golang/
post
入口函数:cmd/kube-apiserver/apiserver.go
经过cmd的Execute启动服务。
func main(){
... command := app.NewAPIServerCommand() ... if err := command.Execute(); err != nil { fmt.Fprintf(os.Stderr, "error: %v\n", err) os.Exit(1) } }
2. 构建命令行
cmd 结构体
cmd := &cobra.Command{ Use: "kube-apiserver", Long: `The Kubernetes API server validates and configures data for the api objects which include pods, services, replicationcontrollers, and others. The API Server services REST operations and provides the frontend to the cluster's shared state through which all other components interact.`, RunE: func(cmd *cobra.Command, args []string) error { verflag.PrintAndExitIfRequested() utilflag.PrintFlags(cmd.Flags()) // set default options completedOptions, err := Complete(s) if err != nil { return err } // validate options if errs := completedOptions.Validate(); len(errs) != 0 { return utilerrors.NewAggregate(errs) } return Run(completedOptions, stopCh) }, }
各组件程序都是用 cobra
来管理、解析命令行参数的,main 包下面还有 app 包,app 包才是包含建立 cobra 命令逻辑的地方,因此其实 main 包的逻辑特别简单,主要是调用执行函数就能够了。
app.NewAPIServerCommand(server.SetupSignalHandler()) 返回*cobra.Command, 执行 command.Execute()最终会调用 Command结构体中定义的Run函数
上面的代码中RunE是运行而且返回Error的意思
咱们能够看到RunE中返回了Run(completedOptions, stopCh)
3. 建立服务链
4. 建立服务
// CreateServerChain creates the apiservers connected via delegation.
func CreateServerChain(completedOptions completedServerRunOptions, stopCh <-chan struct{}) (*genericapiserver.GenericAPIServer, error) {
这里面建立了一个 server
,通过 PrepareRun()
返回 preparedGenericAPIServer
并最终调用其方法 Run()
GenericAPIServer 结构体:
// GenericAPIServer contains state for a Kubernetes cluster api server. type GenericAPIServer struct { ..... // admissionControl is used to build the RESTStorage that backs an API Group. admissionControl admission.Interface // "Outputs" // Handler holds the handlers being used by this API server Handler *APIServerHandler 。。。。。 // DiscoveryGroupManager serves /apis DiscoveryGroupManager discovery.GroupManager // Enable swagger and/or OpenAPI if these configs are non-nil. openAPIConfig *openapicommon.Config // PostStartHooks are each called after the server has started listening, in a separate go func for each // with no guarantee of ordering between them. The map key is a name used for error reporting. // It may kill the process with a panic if it wishes to by returning an error. postStartHookLock sync.Mutex postStartHooks map[string]postStartHookEntry postStartHooksCalled bool disabledPostStartHooks sets.String preShutdownHookLock sync.Mutex preShutdownHooks map[string]preShutdownHookEntry preShutdownHooksCalled bool 。。。。。 // HandlerChainWaitGroup allows you to wait for all chain handlers finish after the server shutdown. HandlerChainWaitGroup *utilwaitgroup.SafeWaitGroup }
// Run spawns the secure http server. It only returns if stopCh is closed // or the secure port cannot be listened on initially. func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error { err := s.NonBlockingRun(stopCh) if err != nil { return err } <-stopCh err = s.RunPreShutdownHooks() if err != nil { return err } // Wait for all requests to finish, which are bounded by the RequestTimeout variable. s.HandlerChainWaitGroup.Wait() return nil }
咱们看到它又调用了 s.NonBlockingRun()
,看方法名就知道是非阻塞运行即里面会建立新的 goroutine 最终运行 http 服务器,提供 http
接口给其它 kubernetes 组件调用,也是 kubernetes 集群控制的核心机制。而后到 <-stopCh
这里阻塞,若是这个 channel 被 close,
这里就会中止阻塞并处理关闭逻辑最后函数执行结束,s.NonBlockingRun()
这个函数也传入了 stopCh
,一样也是出于相似的考虑,让程序优雅关闭,
stopCh
最初是 NewAPIServerCommand()
中建立的:
stopCh := server.SetupSignalHandler()
很容易看出来这个 channel 跟系统信号量绑定了,即 ctrl + c 或 kill 通知程序关闭的时候会 close 这个 channel ,而后调用 的地方就会中止阻塞,
作关闭程序须要的一些清理操做实现优雅关闭
<-stopCh
var onlyOneSignalHandler = make(chan struct{}) // SetupSignalHandler registered for SIGTERM and SIGINT. A stop channel is returned // which is closed on one of these signals. If a second signal is caught, the program // is terminated with exit code 1. func SetupSignalHandler() <-chan struct{} { close(onlyOneSignalHandler) // panics when called twice stop := make(chan struct{}) c := make(chan os.Signal, 2) signal.Notify(c, shutdownSignals...) go func() { <-c close(stop) <-c os.Exit(1) // second signal. Exit directly. }() return stop }
咱们再来看看 NonBlockingRun()
这个函数的实现
// NonBlockingRun spawns the secure http server. An error is // returned if the secure port cannot be listened on. func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}) error { ... // Use an internal stop channel to allow cleanup of the listeners on error. internalStopCh := make(chan struct{}) if s.SecureServingInfo != nil && s.Handler != nil { if err := s.SecureServingInfo.Serve(s.Handler, s.ShutdownTimeout, internalStopCh); err != nil { close(internalStopCh) return err } } ... return nil }
能够看到又调用了 s.SecureServingInfo.Serve()
来启动 http 服务器,继续深刻进去
// Serve runs the secure http server. It fails only if certificates cannot be loaded or the initial listen call fails. // The actual server loop (stoppable by closing stopCh) runs in a go routine, i.e. Serve does not block. // It returns a stoppedCh that is closed when all non-hijacked active requests have been processed. func (s *SecureServingInfo) Serve(handler http.Handler, shutdownTimeout time.Duration, stopCh <-chan struct{}) (<-chan struct{}, error) { if s.Listener == nil { return nil, fmt.Errorf("listener must not be nil") } secureServer := &http.Server{ Addr: s.Listener.Addr().String(), Handler: handler, MaxHeaderBytes: 1 << 20, TLSConfig: &tls.Config{ NameToCertificate: s.SNICerts, // Can't use SSLv3 because of POODLE and BEAST // Can't use TLSv1.0 because of POODLE and BEAST using CBC cipher // Can't use TLSv1.1 because of RC4 cipher usage MinVersion: tls.VersionTLS12, // enable HTTP2 for go's 1.7 HTTP Server NextProtos: []string{"h2", "http/1.1"}, }, } ....... return RunServer(secureServer, s.Listener, shutdownTimeout, stopCh) }
这一步建立了http.Server, 而且调用RunServer
// RunServer listens on the given port if listener is not given, // then spawns a go-routine continuously serving until the stopCh is closed. // It returns a stoppedCh that is closed when all non-hijacked active requests // have been processed. // This function does not block // TODO: make private when insecure serving is gone from the kube-apiserver func RunServer( server *http.Server, ln net.Listener, shutDownTimeout time.Duration, stopCh <-chan struct{}, ) (<-chan struct{}, error) { if ln == nil { return nil, fmt.Errorf("listener must not be nil") } // Shutdown server gracefully. stoppedCh := make(chan struct{}) go func() { defer close(stoppedCh) <-stopCh ctx, cancel := context.WithTimeout(context.Background(), shutDownTimeout) server.Shutdown(ctx) cancel() }() go func() { defer utilruntime.HandleCrash() var listener net.Listener listener = tcpKeepAliveListener{ln.(*net.TCPListener)} if server.TLSConfig != nil { listener = tls.NewListener(listener, server.TLSConfig) } err := server.Serve(listener) msg := fmt.Sprintf("Stopped listening on %s", ln.Addr().String()) select { case <-stopCh: klog.Info(msg) default: panic(fmt.Sprintf("%s due to error: %v", msg, err)) } }() return stoppedCh, nil }
最终看到在后面那个新的 goroutine 中,调用了server.Serve(listener) 来启动 http 服务器,正常启动的状况下会一直阻塞在这里。
至此,咱们初步把 kube-apiserver 源码的主线理清楚了,具体还有不少细节咱们后面再继续深刻。要理清思路咱们就须要尽可能先屏蔽细节,寻找咱们想知道的逻辑路线。
据说学习是从模仿开始的: 感谢源做者 https://cloud.tencent.com/developer/article/1326541