深刻 Go 并发模型:Context

介绍

在Go服务器中,每一个传入请求都在其本身的goroutine中处理。 请求处理程序一般会启动其余goroutine来访问后端,例如数据库和RPC服务。 处理请求的goroutine集合一般须要访问特定于请求的值,例如最终用户的身份,受权令牌和请求的截止日期。 当请求被取消或超时时,处理该请求的全部goroutine都应该快速退出,以便系统能够回收他们正在使用的任何资源。html

Context是专门用来简化对于处理单个请求,多个goroutine之间数据共享、取消信号、超时处理等相关操做。翻译自 Go Concurrency Patterns: Contextgit

特性

  • Context是gorountine并发安全的。
  • 支持树状的上级控制一个或者多个下级,不支持反向控制和平级控制。
  • gorountine传递cancel信号,结束子gorountine 生命。
  • gorountine初始化启动子gorountine服务时,传入截止时刻或者超时时间来控制子gorountine。
  • gorountine结束,对应的全部子gorountine生命周期结束。

使用场景

  • 并发多服务调用状况下,好比一个请求进来,启动3个goroutine进行 RpcA 、RpcB 、RpcC三个服务的调用。这时候只要有其中一个服务错误,就返回错误,同时取消另外两个Rpc服务。能够经过 WithCancel 方法来实现。
  • 超时请求,好比对Http、Rpc进行超时限制,能够经过 WithDeadline 和 WithTimeout 来实现。
  • 携带数据,好比一个请求的用户信息,通常业务场景,咱们会有一个专门的中间件来校验用户信息,而后把用户信息注入到context中,或者共享给派生出来的多个goroutine使用,能够经过 WithValue 方法实现。

官方示例

package main
import (
	"context"
	"fmt"
)

func main() {
	gen := func(ctx context.Context) <-chan int {
		dst := make(chan int)
		n := 1
		go func() {
			for {
				select {
				case <-ctx.Done():
					return // returning not to leak the goroutine
				case dst <- n:
					n++
				}
			}
		}()
		return dst
	}

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel() // cancel when we are finished consuming integers

	for n := range gen(ctx) {
		fmt.Println(n)
		if n == 5 {
			break
		}
	}
}
复制代码

Context

context 是一个接口,定义以下:源码github

type Context interface {
	Deadline() (deadline time.Time, ok bool)

	Done() <-chan struct{}

	Err() error

	Value(key interface{}) interface{}
}
复制代码

包含了以下3个功能:golang

  • 生存时间
  • 取消信号
  • request 派生的gorouting之间共享value

emptyCtx是对Context实现,分别实现了Deadline、Done、Err、Value、String方法,数据库

type emptyCtx int

func (*emptyCtx) Deadline() (deadline time.Time, ok bool) {
	return 
}

func (*emptyCtx) Done() <-chan struct{} {
	return nil
}

func (*emptyCtx) Err() error {
	return nil
}

func (*emptyCtx) Value(key interface{}) interface{} {
	return nil
}

func (e *emptyCtx) String() string {
	switch e {
	case background:
		return "context.Background"
	case todo:
		return "context.TODO"
	}
	return "unknown empty Context"
}
复制代码

cancelCtx

从示例中,咱们看到使用 context 第一个须要作的初始化操做后端

ctx, cancel := context.WithCancel(context.Background())
复制代码

这里,context.Background() 返回的就是 emptyCtx 类型的指针。安全

var (
	background = new(emptyCtx)
	todo       = new(emptyCtx)
)

func Background() Context {
	return background
}

func TODO() Context {
	return todo
}
复制代码

咱们再来看看,WithCancel 函数接收了 background 做为参数,建立了一个cancelCtx实例。同时将Context 做为它的一个匿名字段,这样,它就能够被当作一个 Context。bash

func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
	c := newCancelCtx(parent)
	propagateCancel(parent, &c)
	return &c, func() { c.cancel(true, Canceled) } //什么意思,看下文您就明白了
}

func newCancelCtx(parent Context) cancelCtx {
	return cancelCtx{Context: parent}
}

type cancelCtx struct {
	Context

	mu       sync.Mutex            // protects following fields
	done     chan struct{}         // created lazily, closed by first cancel call
	children map[canceler]struct{} // set to nil by the first cancel call
	err      error                 // set to non-nil by the first cancel call
}
复制代码

看下图,WithCancel 的主要职责是建立一个 cancelCtx,把本身挂载到父节点,而后返回cancelCtx和cancel()函数,用来触发取消事件。 服务器

cancelCtx 实现了本身的Done()、Err()、String()接口。值得关注的是,done 字段采用懒建立的方式, 在Done()第一次被调用的时候被建立,并且返回的是一个只读的Channel。

func (c *cancelCtx) Done() <-chan struct{} {
	c.mu.Lock()
	if c.done == nil {
		c.done = make(chan struct{})
	}
	d := c.done
	c.mu.Unlock()
	return d
}

func (c *cancelCtx) Err() error {
	c.mu.Lock()
	err := c.err
	c.mu.Unlock()
	return err
}

func (c *cancelCtx) String() string {
	return fmt.Sprintf("%v.WithCancel", c.Context)
}
复制代码

不只如此,cancelCtx最重要的是实现了 cancel() 方法。主工做流程以下:并发

  1. 设置取消的错误提示信息
  2. 关闭 channel:c.done
  3. 递归 关闭全部子节点
  4. 从父节点中删除本身
  5. 最终经过关闭channel把取消信号,传递给全部子节点
func (c *cancelCtx) cancel(removeFromParent bool, err error) {
	if err == nil {
		panic("context: internal error: missing cancel error")
	}
	c.mu.Lock()
	// 已经被取消
	if c.err != nil {
		c.mu.Unlock()
		return 
	}
	// 设置 cancelCtx 错误信息
	c.err = err
	if c.done == nil {
		c.done = closedchan
	} else {
		close(c.done)
	}
	//  递归地取消全部子节点
	for child := range c.children {
		// NOTE: acquiring the child's lock while holding parent's lock.
		child.cancel(false, err)
	}
	// 清空子节点
	c.children = nil
	c.mu.Unlock()

	if removeFromParent {
		// 从父节点中移除本身 
		removeChild(c.Context, c)
	}
}
复制代码

还有一个重点函数 propagateCancel须要重点关注

func propagateCancel(parent Context, child canceler) {
	// 父节点是一个空节点,能够理解为本节点为根节点,不须要挂载
	if parent.Done() == nil {
		return // parent is never canceled
	}
	// 父节点是可取消类型的
	if p, ok := parentCancelCtx(parent); ok {
		p.mu.Lock()
		if p.err != nil {
			// parent has already been canceled
			// 父节点被取消了,本节点也须要取消
			child.cancel(false, p.err)
		} else {
			if p.children == nil {
				p.children = make(map[canceler]struct{})
			}
			// 挂载到父节点
			p.children[child] = struct{}{}
		}
		p.mu.Unlock()
	} else {
		// 为了兼容,Context 内嵌到一个类型里的状况发生
		go func() {
			select {
			case <-parent.Done():
				child.cancel(false, parent.Err())
			case <-child.Done():
			}
		}()
	}
}
复制代码

这里说明下上述代码中 else 的状况,为何须要开一个goroutine来监控取消信号,先看下第一个case:

case <-parent.Done():
复制代码

此处主要了为了不cancelCtx被内嵌的一个类型中,作为匿名字段的状况,好比:

type CancelContext struct {
    Context
}
复制代码

这时候 parentCancelCtx 函数 是没法正常识别出CancelContext是可取消类型的ctx。
再看第二个 case:

case <-child.Done():
复制代码

主要做用是在子节点取消的时候,可让select语句正常退出,避免goroutine泄露。

经过以下parentCancelCtx源码,咱们肯定一旦入参的parent是通过包装的类型,parentCancelCtx就没法正确的识别出parent的类型。

func parentCancelCtx(parent Context) (*cancelCtx, bool) {
	for {
		switch c := parent.(type) {
		case *cancelCtx:
			return c, true
		case *timerCtx:
			return &c.cancelCtx, true
		case *valueCtx:
			parent = c.Context
		default:
			return nil, false
		}
	}
}
复制代码

timerCtx

从定义中咱们能够看出timerCtx基于cancelCtx实现,多出了timer和deadline两个字段。并且timerCtx实现了本身的Deadline方法。

type timerCtx struct {
	cancelCtx
	timer *time.Timer // Under cancelCtx.mu.

	deadline time.Time
}

func (c *timerCtx) Deadline() (deadline time.Time, ok bool) {
	return c.deadline, true
}
复制代码

所以咱们直接看核心的函数WithDeadline

func WithDeadline(parent Context, d time.Time) (Context, CancelFunc) {
	// 判断父节点是否超时,(非timeCtx类型的Deadline()直接return的)
	// 若是父节点的超时时间比当前节点早,直接返回cancalCtx便可
	// 由于父节点超时会自动调用cancel,子节点随之取消,因此不须要单独处理子节点的超时问题
	if cur, ok := parent.Deadline(); ok && cur.Before(d) {
		// The current deadline is already sooner than the new one.
		return WithCancel(parent)
	}
	c := &timerCtx{
		cancelCtx: newCancelCtx(parent),
		deadline:  d,
	}
	propagateCancel(parent, c)
	dur := time.Until(d)
	// 直接取消
	if dur <= 0 {
		c.cancel(true, DeadlineExceeded) // deadline has already passed
		return c, func() { c.cancel(false, Canceled) }
	}
	c.mu.Lock()
	defer c.mu.Unlock()
	// 核心代码在这,若是当前节点没被取消,则经过time.AfterFunc在dur时间后调用cancel函数,自动取消。
	if c.err == nil {
		c.timer = time.AfterFunc(dur, func() {
			c.cancel(true, DeadlineExceeded)
		})
	}
	return c, func() { c.cancel(true, Canceled) }
}
// 基于WithDeadline封装实现
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) {
	return WithDeadline(parent, time.Now().Add(timeout))
}
复制代码

valueCtx

源码比较简单,将Context 做为匿名字段来实现类型链表的接口,一层层传递,获取值主要查看Value方法,它会一层层判断是否有值,直到找到顶层的Context。 所以这里也有个要注意的地方,就是子节点的key值是会覆盖父节点的值,所以在命名key值得时候须要特别注意。

func WithValue(parent Context, key, val interface{}) Context {
	if key == nil {
		panic("nil key")
	}
	if !reflect.TypeOf(key).Comparable() {
		panic("key is not comparable")
	}
	return &valueCtx{parent, key, val}
}

// A valueCtx carries a key-value pair. It implements Value for that key and
// delegates all other calls to the embedded Context.
type valueCtx struct {
	Context
	key, val interface{}
}

func (c *valueCtx) String() string {
	return fmt.Sprintf("%v.WithValue(%#v, %#v)", c.Context, c.key, c.val)
}

func (c *valueCtx) Value(key interface{}) interface{} {
	if c.key == key {
		return c.val
	}
	return c.Context.Value(key)
}
复制代码

解析

Done 方法返回 <-chan struct{} ,用来goroutine间进行消息通讯。

结束

欢迎关注个人Github

参考

Go Concurrency Patterns: Context
Go context
深度解密Go语言之context

相关文章
相关标签/搜索