概述

Go 1.7 标准库引入 Context,用于传递上下文信息,如取消信号、超时信号、k-v 键值对等

源码解析

分析基于 go 1.17.5 版本的代码

整体概览

首先,代码中定义了两个 ctx backgroundTODO,他们作为所有 ctx 的根。所有的 ctx 组成树状的结构,如下图

image-20211225170432067

树中父节点 ctx 取消则会使其所有子节点取消,例如 cancelCtx 2 取消,则 cancelCtx3timerCtx4ctx 5 均会被取消

在源码文件中,主要有如下的 interface 与 struct,以下是它们间的关系

image-20211225144416238

其中 Context interface 指明编写一个 context 应实现哪些方法;而 canceler 是用于实现可取消的 context 的,例如下面的 cancelCtxtimerCtx

而基于这两个 interface,实现了如下四个 struct。

emptyCtx

emptyCtx,顾名思义就是空的 ctx。从代码中也可以看出,DeadlintDone 等方法的实现中无任何逻辑,就是一个空壳子。

这里有个细节:emptyCtx struct 事实上是 int 类型的。因为 backgroundtodo 两个变量均为 emptyCtx 类型的,因此不能将 emptyCtx 定义为 struct{} 类型

仅对外暴露 BackgroundTODO 方法,用于获取 ctx 树的根节点。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
type emptyCtx int

func (*emptyCtx) Deadline() (deadline time.Time, ok bool) {return}
func (*emptyCtx) Done() <-chan struct{} {return nil}
func (*emptyCtx) Err() error {return nil}
func (*emptyCtx) Value(key interface{}) interface{} {return nil}
func (e *emptyCtx) String() string {
	switch e {
	case background:
		return "context.Background"
	case todo:
		return "context.TODO"
	}
	return "unknown empty Context"
}

var (
	background = new(emptyCtx)
	todo       = new(emptyCtx)
)

func Background() Context {return background}
func TODO() Context {return todo}

cancelCtx

代码有些多,拆成两部分来说。

首先是 cancelCtx 的 struct 定义。mu 字段用户保证 done、children、err 字段的读写原子性;done 字段记录该 cancelCtx 是否被取消;children 记录基于其衍生出的 context,即其后代 ctx。

1
2
3
4
5
6
7
8
type cancelCtx struct {
	Context

	mu       sync.Mutex            // protects following fields
	done     atomic.Value          // of chan struct{}, created lazily, closed by first cancel call
	children map[canceler]struct{} // set to nil by the first cancel call
	err      error                 // set to non-nil by the first cancel call
}

然后是 cancelCtx 各方法的实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
var cancelCtxKey int

// context 的 value 的获取是遍历 ctx 树的过程。如果当前 context 的 key 对不上,则检查父节点的 key。
func (c *cancelCtx) Value(key interface{}) interface{} {
  // 如果 key 为 cancelCtxKey ,则返回自身
  // 这是一段特殊逻辑,为了便于找最近的 cancelCtx
	if key == &cancelCtxKey {
		return c
	}
	return c.Context.Value(key)
}

// 返回 c.done 的值
func (c *cancelCtx) Done() <-chan struct{} {
  // 这里用了 Load 原子读,所以不需要加锁
	d := c.done.Load()
	if d != nil {
		return d.(chan struct{})
	}
	c.mu.Lock()
	defer c.mu.Unlock()
	d = c.done.Load()
	if d == nil {
		d = make(chan struct{})
		c.done.Store(d)
	}
	return d.(chan struct{})
}

func (c *cancelCtx) Err() error {
  // 上锁防止读时被其它 goroutine 写,从而导致数据不一致
	c.mu.Lock()
	err := c.err
	c.mu.Unlock()
	return err
}

type stringer interface {
	String() string
}

func contextName(c Context) string {
	if s, ok := c.(stringer); ok {
		return s.String()
	}
	return reflectlite.TypeOf(c).String()
}

func (c *cancelCtx) String() string {
	return contextName(c.Context) + ".WithCancel"
}

func (c *cancelCtx) cancel(removeFromParent bool, err error) {
	if err == nil {
		panic("context: internal error: missing cancel error")
	}
	c.mu.Lock()
	if c.err != nil {
		c.mu.Unlock()
		return // 当前 ctx 已被取消
	}
  
  // 取消当前 ctx,同时取消其子 ctx
	c.err = err
	d, _ := c.done.Load().(chan struct{})
	if d == nil {
		c.done.Store(closedchan)
	} else {
		close(d)
	}
	for child := range c.children {
		// NOTE: acquiring the child's lock while holding parent's lock.
		child.cancel(false, err)
	}
	c.children = nil
	c.mu.Unlock()

  // 是否将当前 ctx 从父 ctx 的 children 列表中移除
	if removeFromParent {
		removeChild(c.Context, c)
	}
}

// removeChild removes a context from its parent.
func removeChild(parent Context, child canceler) {
	p, ok := parentCancelCtx(parent)
	if !ok {
		return
	}
	p.mu.Lock()
	if p.children != nil {
		delete(p.children, child)
	}
	p.mu.Unlock()
}

接下来看看 WithCancel 方法,即生成 cancelCtx 的方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
	if parent == nil {
		panic("cannot create context from nil parent")
	}
  // 以 parent 为父 ctx,创建子 ctx
	c := newCancelCtx(parent)
  // 根据 parent ctx 状态判断子 ctx 是否应该取消
	propagateCancel(parent, &c)
	return &c, func() { c.cancel(true, Canceled) }
}

// newCancelCtx returns an initialized cancelCtx.
func newCancelCtx(parent Context) cancelCtx {
	return cancelCtx{Context: parent}
}

其中的 propagateCancel 方法比较复杂,单独拎出来仔细瞧瞧

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
// goroutines counts the number of goroutines ever created; for testing.
var goroutines int32

// propagateCancel arranges for child to be canceled when parent is.
// propagateCancel 保证了当父 ctx 取消时,子 ctx 也会被取消
func propagateCancel(parent Context, child canceler) {
	done := parent.Done()
	if done == nil {
		return // 父 ctx 未被取消
	}

  // 若父 ctx 已取消,则新创建的子 ctx 直接取消
	select {
	case <-done:
		// parent is already canceled
		child.cancel(false, parent.Err())
		return
	default:
	}

	if p, ok := parentCancelCtx(parent); ok {
    // 如果 parent 为 cancelCtx,则其有 children 字段
		p.mu.Lock()
		if p.err != nil {
			// parent has already been canceled
			child.cancel(false, p.err)
		} else {
			if p.children == nil {
				p.children = make(map[canceler]struct{})
			}
			p.children[child] = struct{}{}
		}
		p.mu.Unlock()
	} else {
    // 如果 parent 不为 cancelCtx,则其无 children 字段,不能通过遍历 children 的方式取消子 ctx(方法cancelCtx.cancel)
    // 只能通过启 goroutine 的方式,每个 goroutine 监听 parent.Done 然后关闭 children
		atomic.AddInt32(&goroutines, +1)
		go func() {
			select {
			case <-parent.Done():
				child.cancel(false, parent.Err())
			case <-child.Done():
			}
		}()
	}
}

// &cancelCtxKey is the key that a cancelCtx returns itself for.
var cancelCtxKey int

// parentCancelCtx returns the underlying *cancelCtx for parent.
// It does this by looking up parent.Value(&cancelCtxKey) to find
// the innermost enclosing *cancelCtx and then checking whether
// parent.Done() matches that *cancelCtx. (If not, the *cancelCtx
// has been wrapped in a custom implementation providing a
// different done channel, in which case we should not bypass it.)
// 判断 parent 是否为 cancelCtx 或者其子类
func parentCancelCtx(parent Context) (*cancelCtx, bool) {
	done := parent.Done()
	if done == closedchan || done == nil {
		return nil, false
	}
	p, ok := parent.Value(&cancelCtxKey).(*cancelCtx)
	if !ok {
		return nil, false
	}
	pdone, _ := p.done.Load().(chan struct{})
	if pdone != done {
		return nil, false
	}
	return p, true
}

timerCtx

timerCtx 源码相对简单,是在 cancelCtx 的基础上,增加了过期时间控制

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
type timerCtx struct {
	cancelCtx
	timer *time.Timer // Under cancelCtx.mu.

	deadline time.Time
}

func (c *timerCtx) Deadline() (deadline time.Time, ok bool) {
	return c.deadline, true
}

func (c *timerCtx) String() string {
	return contextName(c.cancelCtx.Context) + ".WithDeadline(" +
		c.deadline.String() + " [" +
		time.Until(c.deadline).String() + "])"
}

func (c *timerCtx) cancel(removeFromParent bool, err error) {
	c.cancelCtx.cancel(false, err)
	if removeFromParent {
		// Remove this timerCtx from its parent cancelCtx's children.
		removeChild(c.cancelCtx.Context, c)
	}
	c.mu.Lock()
	if c.timer != nil {
		c.timer.Stop()
		c.timer = nil
	}
	c.mu.Unlock()
}

创建 timerCtx 时候,有不少对过期时间的判断

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func WithDeadline(parent Context, d time.Time) (Context, CancelFunc) {
	if parent == nil {
		panic("cannot create context from nil parent")
	}
  // 如果父节点的过期时间比子节点过期时间早,则相当于子节点的过期取消逻辑不会被触发,因此这里直接返回 cancelCtx
	if cur, ok := parent.Deadline(); ok && cur.Before(d) {
		// The current deadline is already sooner than the new one.
		return WithCancel(parent)
	}
	c := &timerCtx{
		cancelCtx: newCancelCtx(parent),
		deadline:  d,
	}
	propagateCancel(parent, c)
	dur := time.Until(d)
  // 如果 deadline 时间已到
	if dur <= 0 {
		c.cancel(true, DeadlineExceeded) // deadline has already passed
		return c, func() { c.cancel(false, Canceled) }
	}
	c.mu.Lock()
	defer c.mu.Unlock()
	if c.err == nil {
    // 设置过期触发的事件
		c.timer = time.AfterFunc(dur, func() {
			c.cancel(true, DeadlineExceeded)
		})
	}
	return c, func() { c.cancel(true, Canceled) }
}

valueCtx

逻辑比较清楚明了,因此只看一下 Value 方法。Value 的逻辑是在 ctx 树中回溯遍历 key,一直遍历到满足条件为止。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
type valueCtx struct {
	Context
	key, val interface{}
}

func (c *valueCtx) Value(key interface{}) interface{} {
	if c.key == key {
		return c.val
	}
	return c.Context.Value(key)
}

思考

valueCtx

context 中获取 value 值是通过逐个遍历 key,找到对应的 value 的。相比于 map 查找性能低不少;并且我理解 context 应该是用作上下文状态同步的,value 方法感觉有点多余

cancelCtx

cancelCtx.mu 是普通的互斥锁而不是读写锁。个人理解 context 是读多写少的场景,因此使用读写锁可能开销更大。

参考资料

https://zhuanlan.zhihu.com/p/110085652

https://zhuanlan.zhihu.com/p/68792989

https://zhuanlan.zhihu.com/p/163684835

https://lailin.xyz/post/go-training-week3-context.html