K8s中的命令执行由apiserver、kubelet、cri、docker等组件共同完成, 其中最复杂的就是协议切换以及各类流拷贝相关,让咱们一块儿来看下关键实现,虽然代码比较多,可是不会开发应该也能看懂,祝你好运node
K8s中的命令执行中有不少协议相关的处理, 咱们先一块儿看下这些协议处理相关的基础概念web
HTTP/1.1中容许在同一个连接上经过Header头中的Connection配合Upgrade来实现协议的转换,简单来讲就是容许在经过HTTP创建的连接之上使用其余的协议来进行通讯,这也是k8s中命令中实现协议升级的关键docker
在HTTP协议中除了咱们常见的HTTP1.1,还支持websocket/spdy等协议,那服务端和客户端如何在http之上完成不一样协议的切换呢,首先第一个要素就是这里的101(Switching Protocal)状态码, 即服务端告知客户端咱们切换到Uprage定义的协议上来进行通讯(复用当前连接)json
SPDY协议是google开发的TCP会话层协议, SPDY协议中将Http的Request/Response称为Stream,并支持TCP的连接复用,同时多个stream之间经过Stream-id来进行标记,简单来讲就是支持在单个连接同时进行多个请求响应的处理,而且互不影响,k8s中的命令执行主要也就是经过stream来进行消息传递的后端
在Linux中进程执行一般都会包含三个FD:标准输入、标准输出、标准错误, k8s中的命令执行会将对应的FD进行重定向,从而获取容器的命令的输出,重定向到哪呢?固然是咱们上面提到过的stream了(由于对docker并不熟悉,因此这个地方并不保证Docker部分的准确性)api
在client与server之间经过101状态码、connection、upragde等完成基于当前连接的转换以后, 当前连接上传输的数据就不在是以前的http1.1协议了,此时就要将对应的http连接转成对应的协议进行转换,在k8s命令执行的过程当中,会获取将对应的request和response,都经过http的Hijacker接口获取底层的tcp连接,从而继续完成请求的转发缓存
在经过Hijacker获取到两个底层的tcp的readerwriter以后,就能够直接经过io.copy在两个流上完成对应数据的拷贝,这样就不须要在apiserver这个地方进行协议的转换,而是直接经过tcp的流对拷就能够实现请求和结果的转发微信
基础大概就介绍这些,接下来咱们一块儿去看看其底层的具体实现,咱们从kubectl部分开始来逐层分析websocket
Kubectl执行命令主要分为两部分Pod合法性检测和命令执行, Pod合法性检测主要是获取对应Pod的状态,检测是否在运行, 这里咱们重点关注下命令执行部分restful
命令执行的核心分为两个步骤:1.经过SPDY协议创建连接 2)构建Stream创建连接
func (*DefaultRemoteExecutor) Execute(method string, url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool, terminalSizeQueue remotecommand.TerminalSizeQueue) error { exec, err := remotecommand.NewSPDYExecutor(config, method, url) if err != nil { return err } return exec.Stream(remotecommand.StreamOptions{ Stdin: stdin, Stdout: stdout, Stderr: stderr, Tty: tty, TerminalSizeQueue: terminalSizeQueue, }) }
咱们能够看到这个地方拼接的Url /pods/{namespace}/{podName}/exec其实就是对应apiserver上面pod的subresource接口,而后咱们就能够去看apiserver端的请求处理了
// 建立一个exec req := restClient.Post(). Resource("pods"). Name(pod.Name). Namespace(pod.Namespace). SubResource("exec") req.VersionedParams(&corev1.PodExecOptions{ Container: containerName, Command: p.Command, Stdin: p.Stdin, Stdout: p.Out != nil, Stderr: p.ErrOut != nil, TTY: t.Raw, }, scheme.ParameterCodec) return p.Executor.Execute("POST", req.URL(), p.Config, p.In, p.Out, p.ErrOut, t.Raw, sizeQueue)
在exec.Stream主要是经过Headers传递要创建的Stream的类型,与server端进行协商
// set up stdin stream if p.Stdin != nil { headers.Set(v1.StreamType, v1.StreamTypeStdin) p.remoteStdin, err = conn.CreateStream(headers) if err != nil { return err } } // set up stdout stream if p.Stdout != nil { headers.Set(v1.StreamType, v1.StreamTypeStdout) p.remoteStdout, err = conn.CreateStream(headers) if err != nil { return err } } // set up stderr stream if p.Stderr != nil && !p.Tty { headers.Set(v1.StreamType, v1.StreamTypeStderr) p.remoteStderr, err = conn.CreateStream(headers) if err != nil { return err } }
APIServer在命令执行的过程当中扮演了代理的角色,其负责将Kubectl和kubelet之间的请求来进行转发,注意这个转发主要是基于tcp的流对拷完成的,由于kubectl和kubelet之间的通讯,其实是spdy协议,让咱们一块儿看下关键实现吧
Exec的SPDY请求会首先发送到Connect接口, Connection接口负责跟后端的kubelet进行连接的创建,而且进行响应结果的返回,在Connection接口中,首先会经过Pod获取到对应的Node信息,而且构建Location即后端的Kubelet的连接地址和transport
func (r *ExecREST) Connect(ctx context.Context, name string, opts runtime.Object, responder rest.Responder) (http.Handler, error) { execOpts, ok := opts.(*api.PodExecOptions) if !ok { return nil, fmt.Errorf("invalid options object: %#v", opts) } // 返回对应的地址,以及创建连接 location, transport, err := pod.ExecLocation(r.Store, r.KubeletConn, ctx, name, execOpts) if err != nil { return nil, err } return newThrottledUpgradeAwareProxyHandler(location, transport, false, true, true, responder), nil }
在获取地址主要是构建后端的location信息,这里会经过kubelet上报来的信息获取到对应的node的host和Port信息,而且拼装出pod的最终指向路径即这里的Path字段/exec/{namespace}/{podName}/{containerName}
loc := &url.URL{ Scheme: nodeInfo.Scheme, Host: net.JoinHostPort(nodeInfo.Hostname, nodeInfo.Port), // node的端口 Path: fmt.Sprintf("/%s/%s/%s/%s", path, pod.Namespace, pod.Name, container), // 路径 RawQuery: params.Encode(), }
协议提高主要是经过UpgradeAwareHandler控制器进行实现, 该handler接收到请求以后会首先尝试进行协议提高,其主要是检测http头里面的Connection的值是否是Upragde来实现, 从以前kubelet的分析中能够知道这里确定是true
func newThrottledUpgradeAwareProxyHandler(location *url.URL, transport http.RoundTripper, wrapTransport, upgradeRequired, interceptRedirects bool, responder rest.Responder) *proxy.UpgradeAwareHandler { handler := proxy.NewUpgradeAwareHandler(location, transport, wrapTransport, upgradeRequired, proxy.NewErrorResponder(responder)) handler.InterceptRedirects = interceptRedirects && utilfeature.DefaultFeatureGate.Enabled(genericfeatures.StreamingProxyRedirects) handler.RequireSameHostRedirects = utilfeature.DefaultFeatureGate.Enabled(genericfeatures.ValidateProxyRedirects) handler.MaxBytesPerSec = capabilities.Get().PerConnectionBandwidthLimitBytesPerSec return handler } func (h *UpgradeAwareHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { // 若是协议提高成功,则由该协议完成 if h.tryUpgrade(w, req) { return } // 省略N多代码 }
协议提高处理的逻辑比较多,这里分为几个小节来进行依次说明, 主要是先从HTTP连接中获取请求,并进行转发,而后同时持有两个连接,而且在连接上进行TCP流的拷贝
协议提高的第一步就是与后端的kubelet创建连接了,这里会将kubelet发过来的请求进行拷贝,而且发送给后端的kubelet, 而且这里也会获取到一个与kubelet创建的http的连接,后面进行流对拷的时候须要用到, 注意实际上这个http请求响应的状态码,是101,即kubelet上其实是构建了一个spdy协议的handler来进行通讯的
// 构建http请求 req, err := http.NewRequest(method, location.String(), body) if err != nil { return nil, nil, err } req.Header = header // 发送请求创建连接 intermediateConn, err = dialer.Dial(req) if err != nil { return nil, nil, err } // Peek at the backend response. rawResponse.Reset() respReader := bufio.NewReader(io.TeeReader( io.LimitReader(intermediateConn, maxResponseSize), // Don't read more than maxResponseSize bytes. rawResponse)) // Save the raw response. // 读取响应信息 resp, err := http.ReadResponse(respReader, nil)
这个请求其实是spdy协议的,在经过Hijack获取到底层的连接以后,须要先将上面的请求转发给kubelet从而触发kubelet发送后面的Stream请求创建连接,就是这里的Write将kubelet的结果转发
requestHijackedConn, _, err := requestHijacker.Hijack() // Forward raw response bytes back to client. if len(rawResponse) > 0 { klog.V(6).Infof("Writing %d bytes to hijacked connection", len(rawResponse)) if _, err = requestHijackedConn.Write(rawResponse); err != nil { utilruntime.HandleError(fmt.Errorf("Error proxying response from backend to client: %v", err)) } }
通过上面的两步操做,apiserver上就拥有来了两个http连接,由于协议不是http的因此apiserver不能直接进行操做,而只能采用流对拷的方式来进行请求和响应的转发
// 双向拷贝连接 go func() { var writer io.WriteCloser if h.MaxBytesPerSec > 0 { writer = flowrate.NewWriter(backendConn, h.MaxBytesPerSec) } else { writer = backendConn } _, err := io.Copy(writer, requestHijackedConn) if err != nil && !strings.Contains(err.Error(), "use of closed network connection") { klog.Errorf("Error proxying data from client to backend: %v", err) } close(writerComplete) }() go func() { var reader io.ReadCloser if h.MaxBytesPerSec > 0 { reader = flowrate.NewReader(backendConn, h.MaxBytesPerSec) } else { reader = backendConn } _, err := io.Copy(requestHijackedConn, reader) if err != nil && !strings.Contains(err.Error(), "use of closed network connection") { klog.Errorf("Error proxying data from backend to client: %v", err) } close(readerComplete) }()
Kubelet上的命令执行主要是依赖于CRI.RuntimeService来执行的,kubelet只负责对应请求的转发,并最终构建一个转发后续请求的Stream代理,就完成了他的使命
主流程主要是获取要执行的命令,而后检测对应的Pod新,并调用host.GetExec返回一个对应的URL,而后后续的请求就由proxyStream来完成, 咱们一步步开始深刻
func (s *Server) getExec(request *restful.Request, response *restful.Response) { // 获取执行命令 params := getExecRequestParams(request) streamOpts, err := remotecommandserver.NewOptions(request.Request) // 获取pod的信息 pod, ok := s.host.GetPodByName(params.podNamespace, params.podName) podFullName := kubecontainer.GetPodFullName(pod) url, err := s.host.GetExec(podFullName, params.podUID, params.containerName, params.cmd, *streamOpts) proxyStream(response.ResponseWriter, request.Request, url) }
host.GetExec最终会调用到runtimeService即cri.RuntimeService的Exec接口来进行请求的执行,该接口会返回一个地址即/exec/{token},此时并无执行真正的命令只是建立了一个命令执行请求而已
func (m *kubeGenericRuntimeManager) GetExec(id kubecontainer.ContainerID, cmd []string, stdin, stdout, stderr, tty bool) (*url.URL, error) { // 省略请求构造 // 执行命令 resp, err := m.runtimeService.Exec(req) return url.Parse(resp.Url) }
最终其实就是调用cri的的exec接口, 咱们先忽略该接口具体返回的啥,将kubelet剩余的逻辑看完
func (c *runtimeServiceClient) Exec(ctx context.Context, in *ExecRequest, opts ...grpc.CallOption) (*ExecResponse, error) { err := c.cc.Invoke(ctx, "/runtime.v1alpha2.RuntimeService/Exec", in, out, opts...) }
这里咱们能够发现,又是咱们以前见过的UpgradeAwareHandler,不过此次的url是后端exec执行返回的url了,而后剩下部分就跟apiserver里面的差很少,在两个http连接之间进行流对拷
咱们想一下这个地方Request和Response,实际上是对应的apiserver与kubelet创建的连接,这个连接上是spdy的头,记住这个地方, 则此时又跟后端继续创建连接,后端其实也是一个spdy协议的server, 至此咱们还差最后一个部分就是返回的那个连接究竟是啥,对应的控制器又是谁,进行下一节cri部分
// proxyStream proxies stream to url. func proxyStream(w http.ResponseWriter, r *http.Request, url *url.URL) { // TODO(random-liu): Set MaxBytesPerSec to throttle the stream. handler := proxy.NewUpgradeAwareHandler(url, nil /*transport*/, false /*wrapTransport*/, true /*upgradeRequired*/, &responder{}) handler.ServeHTTP(w, r) }
CRI.RuntimeService负责最终的命令执行,也是命令执行真正执行的位置,其中也涉及到不少的协议处理相关的操做,让咱们一块儿来看下关键实现吧
在上面咱们调用了RuntimeService的Exec接口,在kubelet中最终发现以下代码,建立了一个DockerServer并启动
ds, err := dockershim.NewDockerService(kubeDeps.DockerClientConfig, crOptions.PodSandboxImage, streamingConfig, dockerServer := dockerremote.NewDockerServer(remoteRuntimeEndpoint, ds) if err := dockerServer.Start(); err != nil { return err }
其中在Start函数里面,注册了下面两个RuntimeService,写过grpc的朋友都知道,这个其实就是注册对应rpc接口的实现,其实最终咱们调用的是DockerService的接口
runtimeapi.RegisterRuntimeServiceServer(s.server, s.service) runtimeapi.RegisterImageServiceServer(s.server, s.service)
Exec最终的实现能够发现其实是调用streamingServer的GetExec接口,返回了一个/exec/{token}的接口
func (ds *dockerService) Exec(_ context.Context, req *runtimeapi.ExecRequest) (*runtimeapi.ExecResponse, error) { // 执行Exec请求 return ds.streamingServer.GetExec(req) }
咱们继续追踪streamingServer能够看到GetExec接口实现以下, 最终build了一个url=/exec/{token},注意这里实际上存储了当前的Request请求在cache中
func (s *server) GetExec(req *runtimeapi.ExecRequest) (*runtimeapi.ExecResponse, error) { // 生成token token, err := s.cache.Insert(req) return &runtimeapi.ExecResponse{ Url: s.buildURL("exec", token), }, nil }
首先经过token来获取以前缓存的Request,而后经过exec请求命令,构建StreamOpts,并最终调用ServeExec进行执行,接下来就是最不容易看懂的部分了,前方高能
func (s *server) serveExec(req *restful.Request, resp *restful.Response) { // 获取token token := req.PathParameter("token") // 缓存请求 cachedRequest, ok := s.cache.Consume(token) // 构建exec参数s exec, ok := cachedRequest.(*runtimeapi.ExecRequest) streamOpts := &remotecommandserver.Options{ Stdin: exec.Stdin, Stdout: exec.Stdout, Stderr: exec.Stderr, TTY: exec.Tty, } // 构建ServerExec执行请求 remotecommandserver.ServeExec( resp.ResponseWriter, req.Request, s.runtime, "", // unused: podName "", // unusued: podUID exec.ContainerId, exec.Cmd, streamOpts, s.config.StreamIdleTimeout, s.config.StreamCreationTimeout, s.config.SupportedRemoteCommandProtocols) }
ServerExec关键步骤就两个:1)建立stream 2)执行请求, 比较复杂的主要是集中在建立stream部分,咱们注意下ExecInContainer的参数部分,传入了经过建立流获取的ctx的相关文件描述符的Stream, createStreams里面的实现有两种协议websocket和https,这里咱们主要分析https(咱们使用kubectl使用的就是https协议)
func ServeExec(w http.ResponseWriter, req *http.Request, executor Executor, podName string, uid types.UID, container string, cmd []string, streamOpts *Options, idleTimeout, streamCreationTimeout time.Duration, supportedProtocols []string) { // 建立serveExec ctx, ok := createStreams(req, w, streamOpts, supportedProtocols, idleTimeout, streamCreationTimeout) defer ctx.conn.Close() // 获取执行,这是一个阻塞的过程,err会获取当前的执行是否成功, 这里将ctx里面的信息,都传入进去,对应的其实就是各类流 err := executor.ExecInContainer(podName, uid, container, cmd, ctx.stdinStream, ctx.stdoutStream, ctx.stderrStream, ctx.tty, ctx.resizeChan, 0) }
Stream的创建我将其归纳成下面几个步骤:1)进行https的握手 2)协议升级为spdy 3)等待stream的创建,咱们依次来看
1.完成https的握手
protocol, err := httpstream.Handshake(req, w, supportedStreamProtocols)
2.协议提高
// 流管道 streamCh := make(chan streamAndReply) upgrader := spdy.NewResponseUpgrader() // 构建spdy连接 conn := upgrader.UpgradeResponse(w, req, func(stream httpstream.Stream, replySent <-chan struct{}) error { // 当新请求创建以后,会追加到streamch streamCh <- streamAndReply{Stream: stream, replySent: replySent} return nil })
这里有一个关键机制就是后面func回调函数的传递和streamch的传递,这里创建一个连接以后会建立一个Server,而且传入了一个控制器就是func回调函数,该函数每当创建一个连接以后,若是获取到对应的stream就追加到StreamCh中,下面就是最复杂的网络处理部分了,由于太复杂,因此仍是单独开一节吧
整体流程上看起来简单,主要是先根据请求来进行协议切换,而后返回101,而且基于当前的连接构建SPDY的请求处理,而后等待kubectl经过apiserver发送的须要创建的Stream,就完成了彼此通讯流stream的创建
首先第一步会先进行协议提高的响应,这里咱们注意几个关键部分,spdy协议,以及101状态码
// 协议 hijacker, ok := w.(http.Hijacker) if !ok { errorMsg := fmt.Sprintf("unable to upgrade: unable to hijack response") http.Error(w, errorMsg, http.StatusInternalServerError) return nil } w.Header().Add(httpstream.HeaderConnection, httpstream.HeaderUpgrade) // sydy协议 w.Header().Add(httpstream.HeaderUpgrade, HeaderSpdy31) w.WriteHeader(http.StatusSwitchingProtocols)
spdyConn, err := NewServerConnection(connWithBuf, newStreamHandler)
最终会经过newConnection负责新连接的创建
func NewServerConnection(conn net.Conn, newStreamHandler httpstream.NewStreamHandler) (httpstream.Connection, error) { // 建立一个新的连接, 经过一个已经存在的网络连接 spdyConn, err := spdystream.NewConnection(conn, true) return newConnection(spdyConn, newStreamHandler), nil }
这里咱们能够看到是启动一个后台的server来进行连接请求的处理
func newConnection(conn *spdystream.Connection, newStreamHandler httpstream.NewStreamHandler) httpstream.Connection { c := &connection{conn: conn, newStreamHandler: newStreamHandler} // 当创建连接后,进行syn请求建立流的时候,会调用newSpdyStream go conn.Serve(c.newSpdyStream) return c }
1.首先会启动多个goroutine来负责请求的处理,这里的worker数量是5个,队列大小是20,
frameQueues := make([]*PriorityFrameQueue, FRAME_WORKERS) for i := 0; i < FRAME_WORKERS; i++ { frameQueues[i] = NewPriorityFrameQueue(QUEUE_SIZE) // Ensure frame queue is drained when connection is closed go func(frameQueue *PriorityFrameQueue) { <-s.closeChan frameQueue.Drain() }(frameQueues[i]) wg.Add(1) go func(frameQueue *PriorityFrameQueue) { // let the WaitGroup know this worker is done defer wg.Done() s.frameHandler(frameQueue, newHandler) }(frameQueues[i]) }
2.监听synStreamFrame,分流frame,会按照frame的streamID来进行hash选择对应的frameQueues队列
case *spdy.SynStreamFrame: if s.checkStreamFrame(frame) { priority = frame.Priority partition = int(frame.StreamId % FRAME_WORKERS) debugMessage("(%p) Add stream frame: %d ", s, frame.StreamId) // 添加到对应的StreamId对应的frame里面 s.addStreamFrame(frame) } else { debugMessage("(%p) Rejected stream frame: %d ", s, frame.StreamId) continue // 最终会讲frame push到上面的优先级队列里面 frameQueues[partition].Push(readFrame, priority)
3.读取frame进行并把读取到的stream经过newHandler传递给上面的StreamCH
func (s *Connection) frameHandler(frameQueue *PriorityFrameQueue, newHandler StreamHandler) { for { popFrame := frameQueue.Pop() if popFrame == nil { return } var frameErr error switch frame := popFrame.(type) { case *spdy.SynStreamFrame: frameErr = s.handleStreamFrame(frame, newHandler) } }
消费的流到下一节
Stream的等待创建主要是经过Headers里面的StreamType来实现,这里面会讲对应的stdinStream和对应的spdy里面的stream绑定,其余类型也是这样
func (*v3ProtocolHandler) waitForStreams(streams <-chan streamAndReply, expectedStreams int, expired <-chan time.Time) (*context, error) { ctx := &context{} receivedStreams := 0 replyChan := make(chan struct{}) stop := make(chan struct{}) defer close(stop) WaitForStreams: for { select { case stream := <-streams: streamType := stream.Headers().Get(api.StreamType) switch streamType { case api.StreamTypeError: ctx.writeStatus = v1WriteStatusFunc(stream) go waitStreamReply(stream.replySent, replyChan, stop) case api.StreamTypeStdin: ctx.stdinStream = stream go waitStreamReply(stream.replySent, replyChan, stop) case api.StreamTypeStdout: ctx.stdoutStream = stream go waitStreamReply(stream.replySent, replyChan, stop) case api.StreamTypeStderr: ctx.stderrStream = stream go waitStreamReply(stream.replySent, replyChan, stop) case api.StreamTypeResize: ctx.resizeStream = stream go waitStreamReply(stream.replySent, replyChan, stop) default: runtime.HandleError(fmt.Errorf("unexpected stream type: %q", streamType)) } case <-replyChan: receivedStreams++ if receivedStreams == expectedStreams { break WaitForStreams } case <-expired: // TODO find a way to return the error to the user. Maybe use a separate // stream to report errors? return nil, errors.New("timed out waiting for client to create streams") } } return ctx, nil }
跟踪调用链最终能够看到以下的调用,最终指向了execHandler.ExecInContainer接口用于在容器中执行命令
func (a *criAdapter) ExecInContainer(podName string, podUID types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error { // 执行command return a.Runtime.Exec(container, cmd, in, out, err, tty, resize) } func (r *streamingRuntime) Exec(containerID string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error { // 执行容器 return r.exec(containerID, cmd, in, out, err, tty, resize, 0) } // Internal version of Exec adds a timeout. func (r *streamingRuntime) exec(containerID string, cmd []string, in io.Reader, out, errw io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error { // exechandler return r.execHandler.ExecInContainer(r.client, container, cmd, in, out, errw, tty, resize, timeout) }
命令的指向的主流程主要分为两个部分:1)建立exec执行任务 2)启动exec执行任务
func (*NativeExecHandler) ExecInContainer(client libdocker.Interface, container *dockertypes.ContainerJSON, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error { // 在容器中执行命令 done := make(chan struct{}) defer close(done) // 执行命令 createOpts := dockertypes.ExecConfig{ Cmd: cmd, AttachStdin: stdin != nil, AttachStdout: stdout != nil, AttachStderr: stderr != nil, Tty: tty, } // 建立执行命令任务 execObj, err := client.CreateExec(container.ID, createOpts) startOpts := dockertypes.ExecStartCheck{Detach: false, Tty: tty} // 这里咱们能够看到咱们将前面获取到的stream的封装,都做为FD传入到容器的执行命令里面去了 streamOpts := libdocker.StreamOptions{ InputStream: stdin, OutputStream: stdout, ErrorStream: stderr, RawTerminal: tty, ExecStarted: execStarted, } // 执行命令 err = client.StartExec(execObj.ID, startOpts, streamOpts) ticker := time.NewTicker(2 * time.Second) defer ticker.Stop() count := 0 for { // 获取执行结果 inspect, err2 := client.InspectExec(execObj.ID) if !inspect.Running { if inspect.ExitCode != 0 { err = &dockerExitError{inspect} } break } <-ticker.C } return err }
Docker的命令执行接口调用
func (cli *Client) ContainerExecCreate(ctx context.Context, container string, config types.ExecConfig) (types.IDResponse, error) { resp, err := cli.post(ctx, "/containers/"+container+"/exec", nil, config, nil) return response, err }
命令执行的核心实现主要是两个步骤:1)首先发送exec执行请求 2)启动对应的exec并获取结果, 复杂的仍是SPDY相关的Stream的逻辑
func (d *kubeDockerClient) StartExec(startExec string, opts dockertypes.ExecStartCheck, sopts StreamOptions) error { // 启动执行命令, 获取结果 resp, err := d.client.ContainerExecAttach(ctx, startExec, dockertypes.ExecStartCheck{ Detach: opts.Detach, Tty: opts.Tty, }) // 将输入流拷贝到输出流, 这里会讲resp里面的结果拷贝到outputSTream里面 return d.holdHijackedConnection(sopts.RawTerminal || opts.Tty, sopts.InputStream, sopts.OutputStream, sopts.ErrorStream, resp) }
cli.postHijacked(ctx, "/exec/"+execID+"/start", nil, config, headers)
这里的HiHijackConn功能跟以前介绍的相似,其核心也是经过创建http连接,而后进行协议提高,其conn就是底层的tcp连接,同时还给对应的连接设置了Keepliave当前是30s, 到此咱们就又有了一个基于spdy双向通讯的连接
func (cli *Client) postHijacked(ctx context.Context, path string, query url.Values, body interface{}, headers map[string][]string) (types.HijackedResponse, error) { conn, err := cli.setupHijackConn(ctx, req, "tcp") return types.HijackedResponse{Conn: conn, Reader: bufio.NewReader(conn)}, err }
至此在kubelet上面咱们获取到了与后端执行命令的Stream还有与apiserver创建的Stream, 此时就只须要将两个流直接进行拷贝,就能够实现数据的传输了
func (d *kubeDockerClient) holdHijackedConnection(tty bool, inputStream io.Reader, outputStream, errorStream io.Writer, resp dockertypes.HijackedResponse) error { receiveStdout := make(chan error) if outputStream != nil || errorStream != nil { // 将响应结果拷贝到outputstream里面 go func() { receiveStdout <- d.redirectResponseToOutputStream(tty, outputStream, errorStream, resp.Reader) }() } stdinDone := make(chan struct{}) go func() { if inputStream != nil { io.Copy(resp.Conn, inputStream) } resp.CloseWrite() close(stdinDone) }() return nil }
在发生完成执行命令之后,会每隔2s钟进行一次执行状态的检查,若是发现执行完成,则就直接退出
func (*NativeExecHandler) ExecInContainer(client libdocker.Interface, container *dockertypes.ContainerJSON, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error { ticker := time.NewTicker(2 * time.Second) defer ticker.Stop() count := 0 for { // 获取执行结果 inspect, err2 := client.InspectExec(execObj.ID) if err2 != nil { return err2 } if !inspect.Running { if inspect.ExitCode != 0 { err = &dockerExitError{inspect} } break } <-ticker.C } return err } func (cli *Client) ContainerExecInspect(ctx context.Context, execID string) (types.ContainerExecInspect, error) { resp, err := cli.get(ctx, "/exec/"+execID+"/json", nil, nil) return response, err }
整个命令执行的过程其实仍是蛮复杂的,主要是在于网络协议切换那部分,咱们能够看到其实在整个过程当中,都是基于SPDY协议来进行的,而且在CRI.RuntimeService那部分咱们也能够看到Stream的请求处理其实也是多goroutine并发的,仰慕一下大牛的设计,有什么写的不对的地方,欢迎一块儿讨论,谢谢大佬们能看到这里
kubernetes学习笔记地址: https://www.yuque.com/baxiaos...
微信号:baxiaoshi2020![]()
关注公告号阅读更多源码分析文章![]()
本文由博客一文多发平台 OpenWrite 发布!