对于常常和 Kubernetes
打交道的 YAML 工程师来讲,最经常使用的命令就是 kubectl exec
了,经过它能够直接在容器内执行命令来调试应用程序。若是你不知足于只是用用而已,想了解 kubectl exec
的工做原理,那么本文值得你仔细读一读。本文将经过参考 kubectl
、API Server
、Kubelet
和容器运行时接口(CRI)Docker API 中的相关代码来了解该命令是如何工做的。node
kubectl exec 的工做原理用一张图就能够表示:linux
先来看一个例子:nginx
🐳 → kubectl version --short Client Version: v1.15.0 Server Version: v1.15.3 🐳 → kubectl run nginx --image=nginx --port=80 --generator=run-pod/v1 pod/nginx created 🐳 → kubectl get po NAME READY STATUS RESTARTS AGE nginx 1/1 Running 0 6s 🐳 → kubectl exec nginx -- date Sat Jan 25 18:47:52 UTC 2020 🐳 → kubectl exec -it nginx -- /bin/bash root@nginx:/#
第一个 kubectl exec 在容器内执行了 date
命令,第二个 kubectl exec 使用 -i
和 -t
参数进入了容器的交互式 shell。git
重复第二个 kubectl exec 命令,打印更详细的日志:github
🐳 → kubectl -v=7 exec -it nginx -- /bin/bash I0125 10:51:55.434043 28053 loader.go:359] Config loaded from file: /home/isim/.kube/kind-config-linkerd I0125 10:51:55.438595 28053 round_trippers.go:416] GET https://127.0.0.1:38545/api/v1/namespaces/default/pods/nginx I0125 10:51:55.438607 28053 round_trippers.go:423] Request Headers: I0125 10:51:55.438611 28053 round_trippers.go:426] Accept: application/json, */* I0125 10:51:55.438615 28053 round_trippers.go:426] User-Agent: kubectl/v1.15.0 (linux/amd64) kubernetes/e8462b5 I0125 10:51:55.445942 28053 round_trippers.go:441] Response Status: 200 OK in 7 milliseconds I0125 10:51:55.451050 28053 round_trippers.go:416] POST https://127.0.0.1:38545/api/v1/namespaces/default/pods/nginx/exec?command=%2Fbin%2Fbash&container=nginx&stdin=true&stdout=true&tty=true I0125 10:51:55.451063 28053 round_trippers.go:423] Request Headers: I0125 10:51:55.451067 28053 round_trippers.go:426] X-Stream-Protocol-Version: v4.channel.k8s.io I0125 10:51:55.451090 28053 round_trippers.go:426] X-Stream-Protocol-Version: v3.channel.k8s.io I0125 10:51:55.451096 28053 round_trippers.go:426] X-Stream-Protocol-Version: v2.channel.k8s.io I0125 10:51:55.451100 28053 round_trippers.go:426] X-Stream-Protocol-Version: channel.k8s.ioI0125 10:51:55.451121 28053 round_trippers.go:426] User-Agent: kubectl/v1.15.0 (linux/amd64) kubernetes/e8462b5 I0125 10:51:55.465690 28053 round_trippers.go:441] Response Status: 101 Switching Protocols in 14 milliseconds root@nginx:/#
这里有两个重要的 HTTP 请求:docker
GET
请求用来获取 Pod 信息。exec
在容器内执行命令。子资源(subresource)隶属于某个 K8S 资源,表示为父资源下方的子路径,例如
/logs
、/status
、/scale
、/exec
等。其中每一个子资源支持的操做根据对象的不一样而改变。shell
最后 API Server 返回了 101 Ugrade
响应,向客户端表示已切换到 SPDY
协议。json
SPDY 容许在单个 TCP 链接上复用独立的 stdin/stdout/stderr/spdy-error 流。api
请求首先会到底 API Server,先来看看 API Server 是如何注册 rest.ExecRest
处理器来处理子资源请求 /exec
的。这个处理器用来肯定 exec
要进入的节点。缓存
API Server 启动过程当中作的第一件事就是指挥内嵌的 GenericAPIServer
加载早期的遗留 API(legacy API):
if c.ExtraConfig.APIResourceConfigSource.VersionEnabled(apiv1.SchemeGroupVersion) { // ... if err := m.InstallLegacyAPI(&c, c.GenericConfig.RESTOptionsGetter, legacyRESTStorageProvider); err != nil { return nil, err } }
在 API 加载过程当中,会将类型 LegacyRESTStorage
实例化,建立一个 storage.PodStorage
实例:
podStorage, err := podstore.NewStorage( restOptionsGetter, nodeStorage.KubeletConnectionInfo, c.ProxyTransport, podDisruptionClient, ) if err != nil { return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err }
随后 storeage.PodStorage
实例会被添加到 map restStorageMap
中。注意,该 map 将路径 pods/exec
映射到了 podStorage
的 rest.ExecRest
处理器。
restStorageMap := map[string]rest.Storage{ "pods": podStorage.Pod, "pods/attach": podStorage.Attach, "pods/status": podStorage.Status, "pods/log": podStorage.Log, "pods/exec": podStorage.Exec, "pods/portforward": podStorage.PortForward, "pods/proxy": podStorage.Proxy, "pods/binding": podStorage.Binding, "bindings": podStorage.LegacyBinding,
podstorage
为 pod 和子资源提供了CURD
逻辑和策略的抽象。更多详细信息请查看内嵌的 genericregistry.Store
map restStorageMap
会成为实例 apiGroupInfo
的一部分,添加到 GenericAPIServer
中:
if err := s.installAPIResources(apiPrefix, apiGroupInfo, openAPIModels); err != nil { return err } // Install the version handler. // Add a handler at /<apiPrefix> to enumerate the supported api versions. s.Handler.GoRestfulContainer.Add(discovery.NewLegacyRootAPIHandler(s.discoveryAddresses, s.Serializer, apiPrefix).WebService())
其中 GoRestfulContainer.ServeMux 会将传入的请求 URL 映射到不一样的处理器。
接下来重点观察处理器 therest.ExecRest
的工做原理,它的 Connect()
方法会调用函数 pod.ExecLocation() 来肯定 pod 中容器的 exec
子资源的 URL
:
// Connect returns a handler for the pod exec proxy func (r *ExecREST) Connect(ctx context.Context, name string, opts runtime.Object, responder rest.Responder) (http.Handler, error) { execOpts, ok := opts.(*api.PodExecOptions) if !ok { return nil, fmt.Errorf("invalid options object: %#v", opts) } location, transport, err := pod.ExecLocation(r.Store, r.KubeletConn, ctx, name, execOpts) if err != nil { return nil, err } return newThrottledUpgradeAwareProxyHandler(location, transport, false, true, true, responder), nil }
函数 pod.ExecLocation()
返回的 URL 被 API Server 用来决定链接到哪一个节点。
下面接着分析节点上的 Kubelet
源码。
到了 Kubelet
这边,咱们须要关心两点:
exec
处理器的?Docker API
如何交互?Kubelet 的初始化过程很是复杂,主要涉及到两个函数:
dockershim
包来初始化 CRI
。当 Kubelet 启动时,它的 RunKubelet() 函数会调用私有函数 startKubelet()
来启动 kubelet.Kubelet
实例的 ListenAndServe()
方法,而后该方法会调用函数 ListenAndServeKubeletServer()
,使用构造函数 NewServer()
来安装 『debugging』处理器:
// NewServer initializes and configures a kubelet.Server object to handle HTTP requests. func NewServer( // ... criHandler http.Handler) Server { // ... if enableDebuggingHandlers { server.InstallDebuggingHandlers(criHandler) if enableContentionProfiling { goruntime.SetBlockProfileRate(1) } } else { server.InstallDebuggingDisabledHandlers() } return server }
InstallDebuggingHandlers()
函数使用 getExec()
处理器来注册 HTTP 请求模式:
// InstallDebuggingHandlers registers the HTTP request patterns that serve logs or run commands/containers func (s *Server) InstallDebuggingHandlers(criHandler http.Handler) { // ... ws = new(restful.WebService) ws. Path("/exec") ws.Route(ws.GET("/{podNamespace}/{podID}/{containerName}"). To(s.getExec). Operation("getExec")) ws.Route(ws.POST("/{podNamespace}/{podID}/{containerName}"). To(s.getExec). Operation("getExec")) ws.Route(ws.GET("/{podNamespace}/{podID}/{uid}/{containerName}"). To(s.getExec). Operation("getExec")) ws.Route(ws.POST("/{podNamespace}/{podID}/{uid}/{containerName}"). To(s.getExec). Operation("getExec")) s.restfulCont.Add(ws)
其中 getExec()
处理器又会调用 s.host
实例中的 GetExec()
方法:
// getExec handles requests to run a command inside a container. func (s *Server) getExec(request *restful.Request, response *restful.Response) { // ... podFullName := kubecontainer.GetPodFullName(pod) url, err := s.host.GetExec(podFullName, params.podUID, params.containerName, params.cmd, *streamOpts) if err != nil { streaming.WriteError(err, response.ResponseWriter) return } // ... }
s.host
被实例化为 kubelet.Kubelet
类型的一个实例,它嵌套引用了 StreamingRuntime
接口,该接口又被实例化为 kubeGenericRuntimeManager
的实例,即运行时管理器。该运行时管理器是 Kubelet 与 Docker API
交互的关键组件,GetExec()
方法就是由它实现的:
// GetExec gets the endpoint the runtime will serve the exec request from. func (m *kubeGenericRuntimeManager) GetExec(id kubecontainer.ContainerID, cmd []string, stdin, stdout, stderr, tty bool) (*url.URL, error) { // ... resp, err := m.runtimeService.Exec(req) if err != nil { return nil, err } return url.Parse(resp.Url) }
GetExec()
又会调用 runtimeService.Exec()
方法,进一步挖掘你会发现 runtimeService
是 CRI 包中定义的接口。kuberuntime.kubeGenericRuntimeManager
的 runtimeService
被实例化为 kuberuntime.instrumentedRuntimeService
类型,由它来实现 runtimeService.Exec()
方法:
func (in instrumentedRuntimeService) Exec(req *runtimeapi.ExecRequest) (*runtimeapi.ExecResponse, error) { const operation = "exec" defer recordOperation(operation, time.Now()) resp, err := in.service.Exec(req) recordError(operation, err) return resp, err }
instrumentedRuntimeService 实例的嵌套服务对象被实例化为 theremote.RemoteRuntimeService
类型的实例。该类型实现了 Exec()
方法:
// Exec prepares a streaming endpoint to execute a command in the container, and returns the address. func (r *RemoteRuntimeService) Exec(req *runtimeapi.ExecRequest) (*runtimeapi.ExecResponse, error) { ctx, cancel := getContextWithTimeout(r.timeout) defer cancel() resp, err := r.runtimeClient.Exec(ctx, req) if err != nil { klog.Errorf("Exec %s '%s' from runtime service failed: %v", req.ContainerId, strings.Join(req.Cmd, " "), err) return nil, err } if resp.Url == "" { errorMessage := "URL is not set" klog.Errorf("Exec failed: %s", errorMessage) return nil, errors.New(errorMessage) } return resp, nil }
Exec()
方法会向 /runtime.v1alpha2.RuntimeService/Exec
发起一个 gRPC
调用来让运行时端准备一个流式通讯的端点,该端点用于在容器中执行命令(关于如何将 Docker shim
设置为 gRPC 服务端的更多信息请参考下一小节)。
gRPC 服务端经过调用 RuntimeServiceServer.Exec()
方法来处理请求,该方法由 dockershim.dockerService
结构体实现:
// Exec prepares a streaming endpoint to execute a command in the container, and returns the address. func (ds *dockerService) Exec(_ context.Context, req *runtimeapi.ExecRequest) (*runtimeapi.ExecResponse, error) { if ds.streamingServer == nil { return nil, streaming.NewErrorStreamingDisabled("exec") } _, err := checkContainerStatus(ds.client, req.ContainerId) if err != nil { return nil, err } return ds.streamingServer.GetExec(req) }
第 10 行的 ThestreamingServer
是一个 streaming.Server 接口,它在构造函数 dockershim.NewDockerService()
中被实例化:
// create streaming server if configured. if streamingConfig != nil { var err error ds.streamingServer, err = streaming.NewServer(*streamingConfig, ds.streamingRuntime) if err != nil { return nil, err } }
来看一下 GetExec()
方法的实现方式:
func (s *server) GetExec(req *runtimeapi.ExecRequest) (*runtimeapi.ExecResponse, error) { if err := validateExecRequest(req); err != nil { return nil, err } token, err := s.cache.Insert(req) if err != nil { return nil, err } return &runtimeapi.ExecResponse{ Url: s.buildURL("exec", token), }, nil }
能够看到这里只是向客户端返回一个简单的 token 组合成的 URL, 之因此生成一个 token 是由于用户的命令中可能包含各类各样的字符,各类长度的字符,须要格式化为一个简单的 token。 该 token 会缓存在本地,后面真正的 exec 请求会携带这个 token,经过该 token 找到以前的具体请求。其中 restful.WebService
实例会将 pod exec
请求路由到这个端点:
// InstallDebuggingHandlers registers the HTTP request patterns that serve logs or run commands/containers func (s *Server) InstallDebuggingHandlers(criHandler http.Handler) { // ... ws = new(restful.WebService) ws. Path("/exec") ws.Route(ws.GET("/{podNamespace}/{podID}/{containerName}"). To(s.getExec). Operation("getExec")) ws.Route(ws.POST("/{podNamespace}/{podID}/{containerName}"). To(s.getExec). Operation("getExec")) ws.Route(ws.GET("/{podNamespace}/{podID}/{uid}/{containerName}"). To(s.getExec). Operation("getExec")) ws.Route(ws.POST("/{podNamespace}/{podID}/{uid}/{containerName}"). To(s.getExec). Operation("getExec")) s.restfulCont.Add(ws)
PreInitRuntimeService()
函数做为 gRPC 服务端,负责建立并启动 Docker shim。在将dockershim.dockerService
类型实例化时,让其嵌套的 streamingRuntime
实例引用 dockershim.NativeExecHandler
的实例(该实例实现了 dockershim.ExecHandler 接口)。
ds := &dockerService{ // ... streamingRuntime: &streamingRuntime{ client: client, execHandler: &NativeExecHandler{}, }, // ... }
使用 Docker 的 exec
API 在容器中执行命令的核心实现就是 NativeExecHandler.ExecInContainer()
方法:
func (*NativeExecHandler) ExecInContainer(client libdocker.Interface, container *dockertypes.ContainerJSON, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error { // ... startOpts := dockertypes.ExecStartCheck{Detach: false, Tty: tty} streamOpts := libdocker.StreamOptions{ InputStream: stdin, OutputStream: stdout, ErrorStream: stderr, RawTerminal: tty, ExecStarted: execStarted, } err = client.StartExec(execObj.ID, startOpts, streamOpts) if err != nil { return err } // ...
这里就是最终 Kubelet
调用 Docker exec
API 的地方。
最后须要搞清楚的是 streamingServer
处理器如何处理 exec
请求。首先须要找到它的 exec
处理器,咱们直接从构造函数 streaming.NewServer()
开始往下找,由于这是将 /exec/{token}
路径绑定到 serveExec
处理器的地方:
ws := &restful.WebService{} endpoints := []struct { path string handler restful.RouteFunction }{ {"/exec/{token}", s.serveExec}, {"/attach/{token}", s.serveAttach}, {"/portforward/{token}", s.servePortForward}, }
全部发送到 dockershim.dockerService
实例的请求最终都会在 streamingServer
处理器上完成,由于 dockerService.ServeHTTP() 方法会调用 streamingServer
实例的 ServeHTTP()
方法。
serveExec
处理器会调用 remoteCommand.ServeExec() 函数,这个函数又是干吗的呢?它会调用前面提到的 Executor.ExecInContainer()
方法,而 ExecInContainer()
方法是知道如何与 Docker exec
API 通讯的:
// ServeExec handles requests to execute a command in a container. After // creating/receiving the required streams, it delegates the actual execution // to the executor. func ServeExec(w http.ResponseWriter, req *http.Request, executor Executor, podName string, uid types.UID, container string, cmd []string, streamOpts *Options, idleTimeout, streamCreationTimeout time.Duration, supportedProtocols []string) { // ... err := executor.ExecInContainer(podName, uid, container, cmd, ctx.stdinStream, ctx.stdoutStream, ctx.stderrStream, ctx.tty, ctx.resizeChan, 0) if err != nil { // ... } else { // ... } }
本文经过解读 kubectl
、API Server
和 CRI
的源码,帮助你们理解 kubectl exec
命令的工做原理,固然,这里并无涉及到 Docker exec
API 的细节,也没有涉及到 docker exec
的工做原理。
首先,kubectl 向 API Server 发出了 GET
和 POST
请求,API Server 返回了 101 Ugrade
响应,向客户端表示已切换到 SPDY
协议。
随后 API Server 使用 storage.PodStorage
和 rest.ExecRest
来提供处理器的映射和执行逻辑,其中 rest.ExecRest
处理器决定 exec
要进入的节点。
最后 Kubelet 向 Docker shim
请求一个流式端点 URL,并将 exec
请求转发到 Docker exec
API。kubelet 再将这个 URL 以 Redirect
的方式返回给 API Server,请求就会重定向到到对应 Streaming Server 上发起的 exec
请求,并维护长链。
虽然本文只关注了 kubectl exec 命令,但其余的子命令(例如 attach
、port-forward
、log
等等)也遵循了相似的实现模式:
Kubernetes 1.18.2 1.17.5 1.16.9 1.15.12离线安装包发布地址http://store.lameleg.com ,欢迎体验。 使用了最新的sealos v3.3.6版本。 做了主机名解析配置优化,lvscare 挂载/lib/module解决开机启动ipvs加载问题, 修复lvscare社区netlink与3.10内核不兼容问题,sealos生成百年证书等特性。更多特性 https://github.com/fanux/sealos 。欢迎扫描下方的二维码加入钉钉群 ,钉钉群已经集成sealos的机器人实时能够看到sealos的动态。