使用golang net/http库发送http请求,最后都是调用 transport的 RoundTrip方法golang
type RoundTripper interface { RoundTrip(*Request) (*Response, error) }
RoundTrip executes a single HTTP transaction, returning the Response for the request req.
(RoundTrip 表明一个http事务,给一个请求返回一个响应)
说白了,就是你给它一个request,它给你一个response并发
下面咱们来看一下他的实现,对应源文件net/http/transport.go
,我感受这里是http package里面的精髓所在,go里面一个struct就跟一个类同样,transport这个类长这样的app
type Transport struct { idleMu sync.Mutex wantIdle bool // user has requested to close all idle conns idleConn map[connectMethodKey][]*persistConn idleConnCh map[connectMethodKey]chan *persistConn reqMu sync.Mutex reqCanceler map[*Request]func() altMu sync.RWMutex altProto map[string]RoundTripper // nil or map of URI scheme => RoundTripper //Dial获取一个tcp 链接,也就是net.Conn结构,你就记住能够往里面写request //而后从里面搞到response就好了 Dial func(network, addr string) (net.Conn, error) }
篇幅所限, https和代理相关的我就忽略了, 两个 map
为 idleConn
、idleConnCh
,idleConn
是保存从 connectMethodKey (表明着不一样的协议 不一样的host,也就是不一样的请求)到 persistConn 的映射, idleConnCh
用来在并发http请求的时候在多个 goroutine 里面相互发送持久链接,也就是说, 这些持久链接是能够重复利用的, 你的http请求用某个persistConn
用完了,经过这个channel
发送给其余http请求使用这个persistConn
,而后咱们找到transport
的RoundTrip
方法tcp
func (t *Transport) RoundTrip(req *Request) (resp *Response, err error) { ... pconn, err := t.getConn(req, cm) if err != nil { t.setReqCanceler(req, nil) req.closeBody() return nil, err } return pconn.roundTrip(treq) }
前面对输入的错误处理部分咱们忽略, 其实就2步,先获取一个TCP长链接,所谓TCP长链接就是三次握手创建链接后不close
而是一直保持重复使用(节约环保) 而后调用这个持久链接persistConn 这个struct的roundTrip方法ide
咱们跟踪第一步高并发
func (t *Transport) getConn(req *Request, cm connectMethod) (*persistConn, error) { if pc := t.getIdleConn(cm); pc != nil { // set request canceler to some non-nil function so we // can detect whether it was cleared between now and when // we enter roundTrip t.setReqCanceler(req, func() {}) return pc, nil } type dialRes struct { pc *persistConn err error } dialc := make(chan dialRes) //定义了一个发送 persistConn的channel prePendingDial := prePendingDial postPendingDial := postPendingDial handlePendingDial := func() { if prePendingDial != nil { prePendingDial() } go func() { if v := <-dialc; v.err == nil { t.putIdleConn(v.pc) } if postPendingDial != nil { postPendingDial() } }() } cancelc := make(chan struct{}) t.setReqCanceler(req, func() { close(cancelc) }) // 启动了一个goroutine, 这个goroutine 获取里面调用dialConn搞到 // persistConn, 而后发送到上面创建的channel dialc里面, go func() { pc, err := t.dialConn(cm) dialc <- dialRes{pc, err} }() idleConnCh := t.getIdleConnCh(cm) select { case v := <-dialc: // dialc 咱们的 dial 方法先搞到经过 dialc通道发过来了 return v.pc, v.err case pc := <-idleConnCh: // 这里表明其余的http请求用完了归还的persistConn经过idleConnCh这个 // channel发送来的 handlePendingDial() return pc, nil case <-req.Cancel: handlePendingDial() return nil, errors.New("net/http: request canceled while waiting for connection") case <-cancelc: handlePendingDial() return nil, errors.New("net/http: request canceled while waiting for connection") } }
这里面的代码写的颇有讲究 , 上面代码里面我也注释了, 定义了一个发送 persistConn
的channel dialc
, 启动了一个goroutine
, 这个goroutine
获取里面调用dialConn
搞到persistConn
, 而后发送到dialc
里面,主协程goroutine
在 select
里面监听多个channel
,看看哪一个通道里面先发过来 persistConn
,就用哪一个,而后return
。oop
这里要注意的是 idleConnCh
这个通道里面发送来的是其余的http请求用完了归还的persistConn
, 若是从这个通道里面搞到了,dialc
这个通道也等着发呢,不能浪费,就经过handlePendingDial
这个方法把dialc
通道里面的persistConn
也发到idleConnCh
,等待后续给其余http请求使用。 post
还有就是,读者能够翻一下代码,每一个新建的persistConn的时候都把tcp链接里地输入流,和输出流用br(br *bufio.Reader
),和bw(bw *bufio.Writer
)包装了一下,往bw写就写到tcp输入流里面了,读输出流也是经过br读,并启动了读循环和写循环学习
pconn.br = bufio.NewReader(noteEOFReader{pconn.conn, &pconn.sawEOF}) pconn.bw = bufio.NewWriter(pconn.conn) go pconn.readLoop() go pconn.writeLoop()
咱们跟踪第二步pconn.roundTrip
调用这个持久链接persistConn 这个struct的roundTrip
方法。
先瞄一下 persistConn
这个structthis
type persistConn struct { t *Transport cacheKey connectMethodKey conn net.Conn tlsState *tls.ConnectionState br *bufio.Reader // 从tcp输出流里面读 sawEOF bool // whether we've seen EOF from conn; owned by readLoop bw *bufio.Writer // 写到tcp输入流 reqch chan requestAndChan // 主goroutine 往channnel里面写,读循环从 // channnel里面接受 writech chan writeRequest // 主goroutine 往channnel里面写 // 写循环从channel里面接受 closech chan struct{} // 通知关闭tcp链接的channel writeErrCh chan error lk sync.Mutex // guards following fields numExpectedResponses int closed bool // whether conn has been closed broken bool // an error has happened on this connection; marked broken so it's not reused. canceled bool // whether this conn was broken due a CancelRequest // mutateHeaderFunc is an optional func to modify extra // headers on each outbound request before it's written. (the // original Request given to RoundTrip is not modified) mutateHeaderFunc func(Header) }
里面是各类channel, 用的是出神入化, 各位要好好理解一下, 我这里画一下
这里有三个goroutine,分别用三个圆圈表示, channel用箭头表示
有两个channel writeRequest
和 requestAndChan
type writeRequest struct { req *transportRequest ch chan<- error }
主goroutine 往writeRequest里面写,写循环从writeRequest里面接受
type responseAndError struct { res *Response err error } type requestAndChan struct { req *Request ch chan responseAndError addedGzip bool }
主goroutine 往requestAndChan里面写,读循环从requestAndChan里面接受。
注意这里的channel都是双向channel,也就是channel 的struct里面有一个chan类型的字段, 好比 reqch chan requestAndChan
这里的 requestAndChan 里面的 ch chan responseAndError
。
这个是很牛叉,主 goroutine 经过 reqch 发送requestAndChan 给读循环,而后读循环搞到response后经过 requestAndChan 里面的通道responseAndError把response返给主goroutine,因此我画了一个双向箭头。
咱们研究一下代码,我理解下来其实就是三个goroutine经过channel互相协做的过程。
主循环:
func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) { ... 忽略 // Write the request concurrently with waiting for a response, // in case the server decides to reply before reading our full // request body. writeErrCh := make(chan error, 1) pc.writech <- writeRequest{req, writeErrCh} //把request发送给写循环 resc := make(chan responseAndError, 1) pc.reqch <- requestAndChan{req.Request, resc, requestedGzip} //发送给读循环 var re responseAndError var respHeaderTimer <-chan time.Time cancelChan := req.Request.Cancel WaitResponse: for { select { case err := <-writeErrCh: if isNetWriteError(err) { //写循环经过这个channel报告错误 select { case re = <-resc: pc.close() break WaitResponse case <-time.After(50 * time.Millisecond): // Fall through. } } if err != nil { re = responseAndError{nil, err} pc.close() break WaitResponse } if d := pc.t.ResponseHeaderTimeout; d > 0 { timer := time.NewTimer(d) defer timer.Stop() // prevent leaks respHeaderTimer = timer.C } case <-pc.closech: // 若是长链接挂了, 这里的channel有数据, 进入这个case, 进行处理 select { case re = <-resc: if fn := testHookPersistConnClosedGotRes; fn != nil { fn() } default: re = responseAndError{err: errClosed} if pc.isCanceled() { re = responseAndError{err: errRequestCanceled} } } break WaitResponse case <-respHeaderTimer: pc.close() re = responseAndError{err: errTimeout} break WaitResponse // 若是timeout,这里的channel有数据, break掉for循环 case re = <-resc: break WaitResponse // 获取到读循环的response, break掉 for循环 case <-cancelChan: pc.t.CancelRequest(req.Request) cancelChan = nil } } if re.err != nil { pc.t.setReqCanceler(req.Request, nil) } return re.res, re.err }
这段代码主要就干了三件事
主goroutine ->requestAndChan -> 读循环goroutine
主goroutine ->writeRequest-> 写循环goroutine
主goroutine 经过select 监听各个channel上的数据, 好比请求取消, timeout,长链接挂了,写流出错,读流出错, 都是其余goroutine 发送过来的, 跟中断同样,而后相应处理,上面也提到了,有些channel是主goroutine经过channel发送给其余goroutine的struct里面包含的channel, 好比 case err := <-writeErrCh:
case re = <-resc:
读循环代码:
func (pc *persistConn) readLoop() { ... 忽略 alive := true for alive { ... 忽略 rc := <-pc.reqch var resp *Response if err == nil { resp, err = ReadResponse(pc.br, rc.req) if err == nil && resp.StatusCode == 100 { //100 Continue 初始的请求已经接受,客户应当继续发送请求的其 // 余部分 resp, err = ReadResponse(pc.br, rc.req) // 读pc.br(tcp输出流)中的数据,这里的代码在response里面 //解析statusCode,头字段, 转成标准的内存中的response 类型 // http在tcp数据流里面,head和body以 /r/n/r/n分开, 各个头 // 字段 以/r/n分开 } } if resp != nil { resp.TLS = pc.tlsState } ...忽略 //上面处理一些http协议的一些逻辑行为, rc.ch <- responseAndError{resp, err} //把读到的response返回给 //主goroutine .. 忽略 //忽略部分, 处理cancel req中断, 发送idleConnCh归还pc(持久链接)到持久链接池中(map) pc.close() }
无关代码忽略,这段代码主要干了一件事情
读循环goroutine 经过channel requestAndChan 接受主goroutine发送的request(
rc := <-pc.reqch
), 并从tcp输出流中读取response, 而后反序列化到结构体中, 最后经过channel 返给主goroutine (rc.ch <- responseAndError{resp, err}
)
func (pc *persistConn) writeLoop() { for { select { case wr := <-pc.writech: //接受主goroutine的 request if pc.isBroken() { wr.ch <- errors.New("http: can't write HTTP request on broken connection") continue } err := wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra) //写入tcp输入流 if err == nil { err = pc.bw.Flush() } if err != nil { pc.markBroken() wr.req.Request.closeBody() } pc.writeErrCh <- err wr.ch <- err // 出错的时候返给主goroutineto case <-pc.closech: return } } }
写循环就更简单了,select channel中主gouroutine的request,而后写入tcp输入流,若是出错了,channel 通知调用者。
总体看下来,过程都很简单,可是代码中有不少值得咱们学习的地方,好比高并发请求如何复用tcp链接,这里是链接池的作法,若是使用多个 goroutine相互协做完成一个http请求,出现错误的时候如何通知调用者中断错误,代码风格也有不少能够借鉴的地方。
我打算写一个系列,全面剖析go标准库里面的精彩之处,分享给你们。