这篇文章将介绍Golang
并发编程中经常使用到一种编程模式:context
。本文将从为何须要context
出发,深刻了解context
的实现原理,以及了解如何使用context
。golang
在并发程序中,因为超时、取消操做或者一些异常状况,每每须要进行抢占操做或者中断后续操做。熟悉channel
的朋友应该都见过使用done channel
来处理此类问题。好比如下这个例子:web
func main() {
messages := make(chan int, 10)
done := make(chan bool)
defer close(messages)
// consumer
go func() {
ticker := time.NewTicker(1 * time.Second)
for _ = range ticker.C {
select {
case <-done:
fmt.Println("child process interrupt...")
return
default:
fmt.Printf("send message: %d\n", <-messages)
}
}
}()
// producer
for i := 0; i < 10; i++ {
messages <- i
}
time.Sleep(5 * time.Second)
close(done)
time.Sleep(1 * time.Second)
fmt.Println("main process exit!")
}
复制代码
上述例子中定义了一个buffer
为0的channel done
, 子协程运行着定时任务。若是主协程须要在某个时刻发送消息通知子协程中断任务退出,那么就可让子协程监听这个done channel
,一旦主协程关闭done channel
,那么子协程就能够推出了,这样就实现了主协程通知子协程的需求。这很好,可是这也是有限的。编程
若是咱们能够在简单的通知上附加传递额外的信息来控制取消:为何取消,或者有一个它必需要完成的最终期限,更或者有多个取消选项,咱们须要根据额外的信息来判断选择执行哪一个取消选项。安全
考虑下面这种状况:假如主协程中有多个任务1, 2, …m,主协程对这些任务有超时控制;而其中任务1又有多个子任务1, 2, …n,任务1对这些子任务也有本身的超时控制,那么这些子任务既要感知主协程的取消信号,也须要感知任务1的取消信号。网络
若是仍是使用done channel
的用法,咱们须要定义两个done channel
,子任务们须要同时监听这两个done channel
。嗯,这样其实好像也还行哈。可是若是层级更深,若是这些子任务还有子任务,那么使用done channel
的方式将会变得很是繁琐且混乱。并发
咱们须要一种优雅的方案来实现这样一种机制:app
这个时候context
就派上用场了。咱们首先看看context
的结构设计和实现原理。函数
先看Context
接口结构,看起来很是简单。post
type Context interface {
Deadline() (deadline time.Time, ok bool)
Done() <-chan struct{}
Err() error
Value(key interface{}) interface{}
}
复制代码
Context
接口包含四个方法:测试
Deadline
返回绑定当前context
的任务被取消的截止时间;若是没有设按期限,将返回ok == false
。Done
当绑定当前context
的任务被取消时,将返回一个关闭的channel
;若是当前context
不会被取消,将返回nil
。Err
若是Done
返回的channel
没有关闭,将返回nil
;若是Done
返回的channel
已经关闭,将返回非空的值表示任务结束的缘由。若是是context
被取消,Err
将返回Canceled
;若是是context
超时,Err
将返回DeadlineExceeded
。Value
返回context
存储的键值对中当前key
对应的值,若是没有对应的key
,则返回nil
。能够看到Done
方法返回的channel
正是用来传递结束信号以抢占并中断当前任务;Deadline
方法指示一段时间后当前goroutine
是否会被取消;以及一个Err
方法,来解释goroutine
被取消的缘由;而Value
则用于获取特定于当前任务树的额外信息。而context
所包含的额外信息键值对是如何存储的呢?其实能够想象一颗树,树的每一个节点可能携带一组键值对,若是当前节点上没法找到key
所对应的值,就会向上去父节点里找,直到根节点,具体后面会说到。
再来看看context
包中的其余关键内容。
emptyCtx
是一个int
类型的变量,但实现了context
的接口。emptyCtx
没有超时时间,不能取消,也不能存储任何额外信息,因此emptyCtx
用来做为context
树的根节点。
// An emptyCtx is never canceled, has no values, and has no deadline. It is not
// struct{}, since vars of this type must have distinct addresses.
type emptyCtx int
func (*emptyCtx) Deadline() (deadline time.Time, ok bool) {
return
}
func (*emptyCtx) Done() <-chan struct{} {
return nil
}
func (*emptyCtx) Err() error {
return nil
}
func (*emptyCtx) Value(key interface{}) interface{} {
return nil
}
func (e *emptyCtx) String() string {
switch e {
case background:
return "context.Background"
case todo:
return "context.TODO"
}
return "unknown empty Context"
}
var (
background = new(emptyCtx)
todo = new(emptyCtx)
)
func Background() Context {
return background
}
func TODO() Context {
return todo
}
复制代码
但咱们通常不会直接使用emptyCtx
,而是使用由emptyCtx
实例化的两个变量,分别能够经过调用Background
和TODO
方法获得,但这两个context
在实现上是同样的。那么Background
和TODO
方法获得的context
有什么区别呢?能够看一下官方的解释:
// Background returns a non-nil, empty Context. It is never canceled, has no
// values, and has no deadline. It is typically used by the main function,
// initialization, and tests, and as the top-level Context for incoming
// requests.
// TODO returns a non-nil, empty Context. Code should use context.TODO when
// it's unclear which Context to use or it is not yet available (because the
// surrounding function has not yet been extended to accept a Context
// parameter).
复制代码
Background
和TODO
只是用于不一样场景下:Background
一般被用于主函数、初始化以及测试中,做为一个顶层的context
,也就是说通常咱们建立的context
都是基于Background
;而TODO
是在不肯定使用什么context
的时候才会使用。
下面将介绍两种不一样功能的基础context
类型:valueCtx
和cancelCtx
。
type valueCtx struct {
Context
key, val interface{}
}
func (c *valueCtx) Value(key interface{}) interface{} {
if c.key == key {
return c.val
}
return c.Context.Value(key)
}
复制代码
valueCtx
利用一个Context
类型的变量来表示父节点context
,因此当前context
继承了父context
的全部信息;valueCtx
类型还携带一组键值对,也就是说这种context
能够携带额外的信息。valueCtx
实现了Value
方法,用以在context
链路上获取key
对应的值,若是当前context
上不存在须要的key
,会沿着context
链向上寻找key
对应的值,直到根节点。
WithValue
用以向context
添加键值对:
func WithValue(parent Context, key, val interface{}) Context {
if key == nil {
panic("nil key")
}
if !reflect.TypeOf(key).Comparable() {
panic("key is not comparable")
}
return &valueCtx{parent, key, val}
}
复制代码
这里添加键值对不是在原context
结构体上直接添加,而是以此context
做为父节点,从新建立一个新的valueCtx
子节点,将键值对添加在子节点上,由此造成一条context
链。获取value
的过程就是在这条context
链上由尾部上前搜寻:
type cancelCtx struct {
Context
mu sync.Mutex // protects following fields
done chan struct{} // created lazily, closed by first cancel call
children map[canceler]struct{} // set to nil by the first cancel call
err error // set to non-nil by the first cancel call
}
type canceler interface {
cancel(removeFromParent bool, err error)
Done() <-chan struct{}
}
复制代码
跟valueCtx
相似,cancelCtx
中也有一个context
变量做为父节点;变量done
表示一个channel
,用来表示传递关闭信号;children
表示一个map
,存储了当前context
节点下的子节点;err
用于存储错误信息表示任务结束的缘由。
再来看一下cancelCtx
实现的方法:
func (c *cancelCtx) Done() <-chan struct{} {
c.mu.Lock()
if c.done == nil {
c.done = make(chan struct{})
}
d := c.done
c.mu.Unlock()
return d
}
func (c *cancelCtx) Err() error {
c.mu.Lock()
err := c.err
c.mu.Unlock()
return err
}
func (c *cancelCtx) cancel(removeFromParent bool, err error) {
if err == nil {
panic("context: internal error: missing cancel error")
}
c.mu.Lock()
if c.err != nil {
c.mu.Unlock()
return // already canceled
}
// 设置取消缘由
c.err = err
设置一个关闭的channel或者将done channel关闭,用以发送关闭信号
if c.done == nil {
c.done = closedchan
} else {
close(c.done)
}
// 将子节点context依次取消
for child := range c.children {
// NOTE: acquiring the child's lock while holding parent's lock.
child.cancel(false, err)
}
c.children = nil
c.mu.Unlock()
if removeFromParent {
// 将当前context节点从父节点上移除
removeChild(c.Context, c)
}
}
复制代码
能够发现cancelCtx
类型变量其实也是canceler
类型,由于cancelCtx
实现了canceler
接口。Done
方法和Err
方法不必说了,cancelCtx
类型的context
在调用cancel
方法时会设置取消缘由,将done channel
设置为一个关闭channel
或者关闭channel
,而后将子节点context
依次取消,若是有须要还会将当前节点从父节点上移除。
WithCancel
函数用来建立一个可取消的context
,即cancelCtx
类型的context
。WithCancel
返回一个context
和一个CancelFunc
,调用CancelFunc
便可触发cancel
操做。直接看源码:
type CancelFunc func()
func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
c := newCancelCtx(parent)
propagateCancel(parent, &c)
return &c, func() { c.cancel(true, Canceled) }
}
// newCancelCtx returns an initialized cancelCtx.
func newCancelCtx(parent Context) cancelCtx {
// 将parent做为父节点context生成一个新的子节点
return cancelCtx{Context: parent}
}
func propagateCancel(parent Context, child canceler) {
if parent.Done() == nil {
// parent.Done()返回nil代表父节点以上的路径上没有可取消的context
return // parent is never canceled
}
// 获取最近的类型为cancelCtx的祖先节点
if p, ok := parentCancelCtx(parent); ok {
p.mu.Lock()
if p.err != nil {
// parent has already been canceled
child.cancel(false, p.err)
} else {
if p.children == nil {
p.children = make(map[canceler]struct{})
}
// 将当前子节点加入最近cancelCtx祖先节点的children中
p.children[child] = struct{}{}
}
p.mu.Unlock()
} else {
go func() {
select {
case <-parent.Done():
child.cancel(false, parent.Err())
case <-child.Done():
}
}()
}
}
func parentCancelCtx(parent Context) (*cancelCtx, bool) {
for {
switch c := parent.(type) {
case *cancelCtx:
return c, true
case *timerCtx:
return &c.cancelCtx, true
case *valueCtx:
parent = c.Context
default:
return nil, false
}
}
}
复制代码
以前说到cancelCtx
取消时,会将后代节点中全部的cancelCtx
都取消,propagateCancel
即用来创建当前节点与祖先节点这个取消关联逻辑。
parent.Done()
返回nil
,代表父节点以上的路径上没有可取消的context
,不须要处理;context
链上找到到cancelCtx
类型的祖先节点,则判断这个祖先节点是否已经取消,若是已经取消就取消当前节点;不然将当前节点加入到祖先节点的children
列表。parent.Done()
和child.Done()
,一旦parent.Done()
返回的channel
关闭,即context
链中某个祖先节点context
被取消,则将当前context
也取消。这里或许有个疑问,为何是祖先节点而不是父节点?这是由于当前context
链多是这样的:
当前cancelCtx
的父节点context
并非一个可取消的context
,也就无法记录children
。
timerCtx
是一种基于cancelCtx
的context
类型,从字面上就能看出,这是一种能够定时取消的context
。
type timerCtx struct {
cancelCtx
timer *time.Timer // Under cancelCtx.mu.
deadline time.Time
}
func (c *timerCtx) Deadline() (deadline time.Time, ok bool) {
return c.deadline, true
}
func (c *timerCtx) cancel(removeFromParent bool, err error) {
将内部的cancelCtx取消
c.cancelCtx.cancel(false, err)
if removeFromParent {
// Remove this timerCtx from its parent cancelCtx's children.
removeChild(c.cancelCtx.Context, c)
}
c.mu.Lock()
if c.timer != nil {
取消计时器
c.timer.Stop()
c.timer = nil
}
c.mu.Unlock()
}
复制代码
timerCtx
内部使用cancelCtx
实现取消,另外使用定时器timer
和过时时间deadline
实现定时取消的功能。timerCtx
在调用cancel
方法,会先将内部的cancelCtx
取消,若是须要则将本身从cancelCtx
祖先节点上移除,最后取消计时器。
WithDeadline
返回一个基于parent
的可取消的context
,而且其过时时间deadline
不晚于所设置时间d
。
func WithDeadline(parent Context, d time.Time) (Context, CancelFunc) {
if cur, ok := parent.Deadline(); ok && cur.Before(d) {
// The current deadline is already sooner than the new one.
return WithCancel(parent)
}
c := &timerCtx{
cancelCtx: newCancelCtx(parent),
deadline: d,
}
// 创建新建context与可取消context祖先节点的取消关联关系
propagateCancel(parent, c)
dur := time.Until(d)
if dur <= 0 {
c.cancel(true, DeadlineExceeded) // deadline has already passed
return c, func() { c.cancel(false, Canceled) }
}
c.mu.Lock()
defer c.mu.Unlock()
if c.err == nil {
c.timer = time.AfterFunc(dur, func() {
c.cancel(true, DeadlineExceeded)
})
}
return c, func() { c.cancel(true, Canceled) }
}
复制代码
parent
有过时时间而且过时时间早于给定时间d
,那么新建的子节点context
无需设置过时时间,使用WithCancel
建立一个可取消的context
便可;parent
和过时时间d
建立一个定时取消的timerCtx
,并创建新建context
与可取消context
祖先节点的取消关联关系,接下来判断当前时间距离过时时间d
的时长dur
:dur
小于0,即当前已通过了过时时间,则直接取消新建的timerCtx
,缘由为DeadlineExceeded
;timerCtx
设置定时器,一旦到达过时时间即取消当前timerCtx
。与WithDeadline
相似,WithTimeout
也是建立一个定时取消的context
,只不过WithDeadline
是接收一个过时时间点,而WithTimeout
接收一个相对当前时间的过时时长timeout
:
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) {
return WithDeadline(parent, time.Now().Add(timeout))
}
复制代码
首先使用context
实现文章开头done channel
的例子来示范一下如何更优雅实现协程间取消信号的同步:
func main() {
messages := make(chan int, 10)
// producer
for i := 0; i < 10; i++ {
messages <- i
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
// consumer
go func(ctx context.Context) {
ticker := time.NewTicker(1 * time.Second)
for _ = range ticker.C {
select {
case <-ctx.Done():
fmt.Println("child process interrupt...")
return
default:
fmt.Printf("send message: %d\n", <-messages)
}
}
}(ctx)
defer close(messages)
defer cancel()
select {
case <-ctx.Done():
time.Sleep(1 * time.Second)
fmt.Println("main process exit!")
}
}
复制代码
这个例子中,只要让子线程监听主线程传入的ctx
,一旦ctx.Done()
返回空channel
,子线程便可取消执行任务。但这个例子还没法展示context
的传递取消信息的强大优点。
阅读过net/http
包源码的朋友可能注意到在实现http server
时就用到了context
, 下面简单分析一下。
一、首先Server
在开启服务时会建立一个valueCtx
,存储了server
的相关信息,以后每创建一条链接就会开启一个协程,并携带此valueCtx
。
func (srv *Server) Serve(l net.Listener) error {
...
var tempDelay time.Duration // how long to sleep on accept failure
baseCtx := context.Background() // base is always background, per Issue 16220
ctx := context.WithValue(baseCtx, ServerContextKey, srv)
for {
rw, e := l.Accept()
...
tempDelay = 0
c := srv.newConn(rw)
c.setState(c.rwc, StateNew) // before Serve can return
go c.serve(ctx)
}
}
复制代码
二、创建链接以后会基于传入的context
建立一个valueCtx
用于存储本地地址信息,以后在此基础上又建立了一个cancelCtx
,而后开始从当前链接中读取网络请求,每当读取到一个请求则会将该cancelCtx
传入,用以传递取消信号。一旦链接断开,便可发送取消信号,取消全部进行中的网络请求。
func (c *conn) serve(ctx context.Context) {
c.remoteAddr = c.rwc.RemoteAddr().String()
ctx = context.WithValue(ctx, LocalAddrContextKey, c.rwc.LocalAddr())
...
ctx, cancelCtx := context.WithCancel(ctx)
c.cancelCtx = cancelCtx
defer cancelCtx()
...
for {
w, err := c.readRequest(ctx)
...
serverHandler{c.server}.ServeHTTP(w, w.req)
...
}
}
复制代码
三、读取到请求以后,会再次基于传入的context
建立新的cancelCtx
,并设置到当前请求对象req
上,同时生成的response
对象中cancelCtx
保存了当前context
取消方法。
func (c *conn) readRequest(ctx context.Context) (w *response, err error) {
...
req, err := readRequest(c.bufr, keepHostHeader)
...
ctx, cancelCtx := context.WithCancel(ctx)
req.ctx = ctx
...
w = &response{
conn: c,
cancelCtx: cancelCtx,
req: req,
reqBody: req.Body,
handlerHeader: make(Header),
contentLength: -1,
closeNotifyCh: make(chan bool, 1),
// We populate these ahead of time so we're not
// reading from req.Header after their Handler starts
// and maybe mutates it (Issue 14940)
wants10KeepAlive: req.wantsHttp10KeepAlive(),
wantsClose: req.wantsClose(),
}
...
return w, nil
}
复制代码
这样处理的目的主要有如下几点:
一旦请求超时,便可中断当前请求;
在处理构建response
过程当中若是发生错误,可直接调用response
对象的cancelCtx
方法结束当前请求;
在处理构建response
完成以后,调用response
对象的cancelCtx
方法结束当前请求。
在整个server
处理流程中,使用了一条context
链贯穿Server
、Connection
、Request
,不只将上游的信息共享给下游任务,同时实现了上游可发送取消信号取消全部下游任务,而下游任务自行取消不会影响上游任务。
context
主要用于父子任务之间的同步取消信号,本质上是一种协程调度的方式。另外在使用context
时有两点值得注意:上游任务仅仅使用context
通知下游任务再也不须要,但不会直接干涉和中断下游任务的执行,由下游任务自行决定后续的处理操做,也就是说context
的取消操做是无侵入的;context
是线程安全的,由于context
自己是不可变的(immutable
),所以能够放心地在多个协程中传递使用。