💧 Posted on 

Go 系列(一):chan 源码分析

本文主要介绍了 Go 语言 (golang) 中的 channel,并从源码层面分析其具体实现,包括创建 channel,发送数据,接收数据以及相关调度等。

以下分析基于 Go 1.17.5

1. 概述

官方对 chan 的描述如下:

A channel provides a mechanism for concurrently executing functions to communicate by sending and receivingvalues of a specified element type. The value of an uninitialized channel is nil.

chan 提供了一种并发通信机制,用于生产和消费某一指定类型数据,未初始化的 chan 的值是 nil。

Chan 是 Go 里面的一种数据结构,具有以下特性:

  • goroutine-safe,多个 goroutine 可以同时访问一个 channel 而不会出现并发问题
    • hchan mutex,通过加锁来避免数据竞争。
  • 可以用于在 goroutine 之间存储和传递值
    • copying into and out of hchan buffer
  • 其语义是先入先出(FIFO)
  • 可以导致 goroutine 的 block 和 unblock
    • 通过 sudog queues 来记录阻塞的 goroutine。
    • 通过 runtime scheduler(gopark, goready) 来实现阻塞与唤醒。

Channel 字面意义是“通道”,类似于 Linux 中的管道。声明 channel 的语法如下:

1
2
3
chan T // 声明一个双向通道
chan<- T // 声明一个只能用于发送的通道
<-chan T // 声明一个只能用于接收的通道

单向通道的声明,用 <- 来表示,它指明通道的方向。

因为 channel 是一个引用类型,所以在它被初始化之前,它的值是 nil,channel 使用 make 函数进行初始化。可以向它传递一个 int 值,代表 channel 缓冲区的大小(容量),构造出来的是一个缓冲型的 channel;不传或传 0 的,构造的就是一个非缓冲型的 channel。

1
2
3
4
5
// 无缓冲
ch1 := make(chan int)

// 缓冲区为 3
ch2 := make(chan int, 3)

chan(即 hchan 结构体) 默认会被分配在堆上,make 返回的只是一个指向该对象的指针

无缓冲 channel

无缓冲的 channel(unbuffered channel),其缓冲区大小则默认为 0。在功能上其接受者会阻塞等待并阻塞应用程序,直至收到通信和接收到数据。

(引用 William Kennedy 的图)

缓冲 channel

有缓存的 channel(buffered channel),其缓存区大小是根据所设置的值来调整。在功能上,若缓冲区未满则不会阻塞,会源源不断的进行传输。当缓冲区满了后,发送者就会阻塞并等待。而当缓冲区为空时,接受者就会阻塞并等待,直至有新的数据:

(引用 William Kennedy 的图)

2. 数据结构

本质上 channel 在设计上就是环形队列。其包含发送方队列、接收方队列,加上互斥锁 mutex 等结构。

channel 是一个有锁的环形队列:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// src/runtime/chan.go
type hchan struct {
closed uint32 // channel是否关闭的标志
elemtype *_type // channel中的元素类型

// channel分为无缓冲和有缓冲两种。
// 对于有缓冲的channel存储数据,使用了 ring buffer(环形缓冲区) 来缓存写入的数据,本质是循环数组
// 为啥是循环数组?普通数组不行吗,普通数组容量固定更适合指定的空间,弹出元素时,普通数组需要全部都前移
// 当下标超过数组容量后会回到第一个位置,所以需要有两个字段记录当前读和写的下标位置
buf unsafe.Pointer // 指向底层循环数组的指针(环形缓冲区)
qcount uint // 循环数组中的元素数量
dataqsiz uint // 循环数组的长度
elemsize uint16 // 元素的大小
sendx uint // 下一次写下标的位置
recvx uint // 下一次读下标的位置

// 尝试读取channel或向channel写入数据而被阻塞的goroutine
recvq waitq // 读等待队列
sendq waitq // 写等待队列

lock mutex //互斥锁,保证读写channel时不存在并发竞争问题
}

type waitq struct {
first *sudog
last *sudog
}

type sudog struct {
g *g // 指向当前的 goroutine。
next *sudog // 指向下一个 g。
prev *sudog // 指向上一个 g。
elem unsafe.Pointer // 数据元素,可能会指向堆栈。
c *hchan
...
}

3. 实现原理

3.1 创建 chan

在源码中通道的创建由 makechan 方法实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 通用创建方法
func makechan(t *chantype, size int) *hchan

// 类型为 int64 的进行特殊处理
func makechan64(t *chantype, size int64) *hchan

//go:linkname reflect_makechan reflect.makechan
func reflect_makechan(t *chantype, size int) *hchan {
return makechan(t, size)
}

func makechan64(t *chantype, size int64) *hchan {
if int64(int(size)) != size {
panic(plainError("makechan: size out of range"))
}

return makechan(t, int(size))
}

内部都是调用的 makechan 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
func makechan(t *chantype, size int) *hchan {
elem := t.elem

// 编译器检查 typesize 和 align
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}
// 计算存放数据元素的内存大小以及是否溢出
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}

var c *hchan
switch {
case mem == 0:
// chan的size为0,或者每个元素占用的大小为0(比如struct{}大小就是0,不占空间)
// 这种情况就不需要单独为buf分配空间
c = (*hchan)(mallocgc(hchanSize, nil, true))
c.buf = c.raceaddr()
case elem.ptrdata == 0:
// 如果队列中不存在指针,那么每个元素都需要被存储并占用空间,占用大小为前面乘法算出来的mem
// 同时还要加上hchan本身占用的空间大小,加起来就是整个hchan占用的空间大小
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
// 把buf指针指向空的hchan占用空间大小的末尾
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// 如果chan中的元素是指针类型的数据,为buf单独开辟mem大小的空间,用来保存所有的数据
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
// 元素大小、类型以及缓冲区大小赋值
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
// 初始化锁
lockInit(&c.lock, lockRankHchan)

return c
}

具体流程如下:

  • 1)首先是编译器检查,包括通道元素类型的 size 以及通道和元素的对齐,然后计算存放数据元素的内存大小以及是否溢出
  • 2)然后根据不同条件进行内存分配
    • 总体的原则是:总内存大小 = hchan 需要的内存大小 + 元素需要的内存大小
    • 队列为空或元素大小为 0:只需要开辟的内存空间为 hchan 本身的大小
    • 元素不是指针类型:需要开辟的内存空间 = hchan 本身大小 + 每个元素的大小 * 申请的队列长度
    • 元素是指针类型:这种情况下 buf 需要单独开辟空间,buf 占用内存大小为每个元素的大小 * 申请的队列长度
      3)最后则对 chan 的其他字段赋值

3.2 发送数据

发送数据到 channel 时,直观的理解是将数据放到 chan 的环形队列中,不过 go 做了一些优化:

  • 先判断是否有等待接收数据的 groutine,如果有,直接将数据发给 Groutine,唤醒 groutine,就不放入队列中了。
    • 这样省去了两次内存拷贝和加锁的开销
  • 当然还有另外一种情况就是:队列如果满了,那就只能放到队列中等待,直到有数据被取走才能发送。

chan 的发送逻辑涉及到 5 个方法:

1
2
3
4
5
func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {}
func chansend1(c *hchan, elem unsafe.Pointer) {…}
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {…}
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {…}
func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {…}

chansend1 方法是 go 编译代码中c <- x这种写法的入口点,即当我们编写代码 c <- x其实就是调用此方法。 这四个方法的调用关系:chansend1 -> chansend -> send -> sendDirect

具体发送逻辑在chansend这个方法里,然后真正使用的方法其实是对该方法的一层包装。

1
2
3
4
5
6
7
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true, getcallerpc())
}

func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
return chansend(c, elem, false, getcallerpc())
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// 判断 channel 是否为 nil
if c == nil {
if !block {// 如果非阻塞,直接返回 false
return false
}
// 当向 nil channel 发送数据时,会调用 gopark
// 而 gopark 会将当前的 goroutine 休眠,并用过第一个参数的 unlockf 来回调唤醒
// 但此处传递的参数为 nil,因此向 channel 发送数据的 goroutine 和接收数据的 goroutine 都会阻塞,进而死锁
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}

if raceenabled {
racereadpc(c.raceaddr(), callerpc, funcPC(chansend))
}

// 对于不阻塞的 send,快速检测失败场景
// 如果 channel 未关闭且 channel 没有多余的缓冲空间。这可能是:
// 1. channel 是非缓冲型的,且等待接收队列里没有 goroutine
// 2. channel 是缓冲型的,但循环数组已经装满了元素
// 主要用于 select 语句中,涉及到指令重排队+可观测性
if !block && c.closed == 0 && full(c) {
return false
}

var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
// 加锁,避免竞争
lock(&c.lock)
// 检查 channel 是否已关闭,不允许向关闭的 channel 发送数据
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel")) // 直接panic
}
// 从 recvq 队首取出一个接收者,如果存在接收者,就绕过环形队列(buf)直接把 ep 拷贝给 sg,并释放锁
// 这就是前面提到的,官方做的一个优化,如果有goroutine在等待就直接把数据给该goroutine,没必要在写到buf,然后接收者又从buf中拷贝出来
if sg := c.recvq.dequeue(); sg != nil {
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
// 到这里说明当前没有等待状态的接收者
// 如果环形队列还未满
if c.qcount < c.dataqsiz {
// 拿到 sendx 索引的位置
qp := chanbuf(c, c.sendx)
if raceenabled {
racenotify(c, c.sendx, nil)
}
// 直接把数据从 qp 拷贝到 qp,就是把数据拷贝到环形队列中
typedmemmove(c.elemtype, qp, ep)
// 维护 snedx 的值,因为是环形队列,所以到最大值时就重置为0
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
//qcount即当前chan中的元素个数
c.qcount++
unlock(&c.lock)
return true
}
// 到这里说明环形队列已经满了
// 如果还是要非阻塞的方式发送,就只能返回错误了
if !block {
unlock(&c.lock)
return false
}
// 到这里说明缓存队列满了,然后调用法指定是阻塞方式进行发送
// channel 满了,发送方会被阻塞。接下来会构造一个 sudog
gp := getg() // 获取当前 goroutine
mysg := acquireSudog()// 从对象池获取 sudog
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// 把发送的数据(ep)、当前g(gp)、已经当前这个chan(c)都存到sudog中
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
// 保存当前 sudog,下面要用到做校验
gp.waiting = mysg
gp.param = nil
// 把这个sudog存入sendq队列
c.sendq.enqueue(mysg)
atomic.Store8(&gp.parkingOnChan, 1)
// 调用gopark,挂起当前的 g,将当前的 g 移出调度器的队列
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
// 等到有接收者从chan中取值的时候,这个发送的g又会被重新调度,然后从这里开始继续执行
KeepAlive(ep)

// 检验是否为当前的 sudog
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
// 这里sudog中的success表示的是当前这个通道上是否进行过通信
// 为 true 则说明是真正的唤醒,chan上有活动(有数据写进来,或者有数据被读取出去)
// 为 false 则说明是假的唤醒,即当前唤醒是否关闭chan导致的
// 这里主要根据这个值判断chan是否被关闭了
closed := !mysg.success
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
// 将 sudog 放回对象池
releaseSudog(mysg)
if closed {
// 如果chan被关闭了也是直接panic
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
return true
}

核心逻辑

  • 如果 recvq 不为空,从 recvq 中取出一个等待接收数据的 Groutine,直接将数据发送给该 Groutine
  • 如果 recvq 为空,才将数据放入 buf 中
  • 如果 buf 已满,则将要发送的数据和当前的 Groutine 打包成 Sudog 对象放入 sendq,并将 groutine 置为等待状态
  • 等 goroutine 再次被调度时程序继续执行

然后追踪一下 send 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
// 忽略 race 检查..
if sg.elem != nil {
// 直接拷贝到接受者内存,使用写屏障
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
gp := sg.g // 取出sudog中记录的g,这里的g就是被阻塞接收者
unlockf()
gp.param = unsafe.Pointer(sg) // 更新接收者g的param字段,在recv方法中会用到
sg.success = true
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
// 最后把被阻塞的接收者g唤醒
goready(gp, skip+1)
}

继续看 sendDirect 方法:

1
2
3
4
5
6
7
8
9
10
11
func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
// src 在当前 goroutine 的栈上,dst 是另一个 goroutine 的栈
// 直接进行内存"搬迁"
// 如果目标地址的栈发生了栈收缩,当我们读出了 sg.elem 后
// 就不能修改真正的 dst 位置的值了
// 因此需要在读和写之前加上一个屏障
dst := sg.elem
typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
// 拷贝内存
memmove(dst, src, t.size)
}

这里涉及到一个 goroutine 直接写另一个 goroutine 栈的操作,一般而言,不同 goroutine 的栈是各自独有的。而这也违反了 GC 的一些假设。为了不出问题,写的过程中增加了写屏障,保证正确地完成写操作。这样做的好处是减少了一次内存 copy:不用先拷贝到 channel 的 buf,直接由发送者到接收者,没有中间商赚差价,效率得以提高,完美。

3.3 接收数据

从 channel 读取数据的流程和发送的类似,基本是发送操作的逆操作。

这里同样存在和 send 一样的优化:从 channel 读取数据时,不是直接去环形队列中去数据,而是先判断是否有等待发送数据的 groutine。如果有,直接将 groutine 出队列,取出数据返回,并唤醒 groutine。如果没有等待发送数据的 groutine,再从环形队列中取数据。

chan 的接收涉及到 7 个方法:

1
2
3
4
5
6
7
func reflect_chanrecv(c *hchan, nb bool, elem unsafe.Pointer) (selected bool, received bool) {}
func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected, received bool) {}
func chanrecv1(c *hchan, elem unsafe.Pointer) {…},
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {…}
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {…}
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {…}
func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) {…}

按照发送时的套路可知,只有 chanrecv 是具体逻辑,上面几个都是包装方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//go:linkname reflect_chanrecv reflect.chanrecv
func reflect_chanrecv(c *hchan, nb bool, elem unsafe.Pointer) (selected bool, received bool) {
return chanrecv(c, elem, !nb)
}
func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected, received bool) {
return chanrecv(c, elem, false)
}
//go:nosplit
func chanrecv1(c *hchan, elem unsafe.Pointer) {
chanrecv(c, elem, true)
}

//go:nosplit
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
_, received = chanrecv(c, elem, true)
return
}

接收操作有两种写法,一种带 “ok”,反应 channel 是否关闭;

一种不带 “ok”,这种写法,当接收到相应类型的零值时无法知道是真实的发送者发送过来的值,还是 channel 被关闭后,返回给接收者的默认类型的零值。

两种写法,都有各自的应用场景。

经过编译器的处理后,这两种写法最后对应源码里的就是不带okchanrecv1和带okchanrecv2这两个函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
// chanrecv 函数接收 channel c 的元素并将其写入 ep 所指向的内存地址。
// 如果 ep 是 nil,说明忽略了接收值。比如 <-ch 这样,没有接收取到的值
// 如果 block == false,即非阻塞型接收,在没有数据可接收的情况下,返回 (false, false)
// 否则,如果 c 处于关闭状态,将 ep 指向的地址清零,返回 (true, false)
// 否则,用返回值填充 ep 指向的内存地址。返回 (true, true)
// 如果 ep 非空,则应该指向堆或者函数调用者的栈
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// 如果是一个 nil 的 channel
if c == nil {
// 如果不阻塞,直接返回 (false, false)
if !block {
return
}
// 否则,接收一个 nil 的 channel,调用gopark将goroutine 挂起
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable") // 被挂起之后不会执行到这一句
}
// 这块主要用在 select 语句中,先大概了解下,比较难懂。。。
// 快速路径: 在不需要锁的情况下检查失败的非阻塞操作
// 注意到 channel 不能由已关闭转换为未关闭,则失败的条件是:
// 1. channel 是非缓冲型的,recvq 队列为空
// 2. channel 是缓冲型的,buf 为空
if !block && empty(c) {
// 此处的 c.closed 必须在条件判断之后进行验证,
// 因为指令重排后,如果先判断 c.closed,得出 channel 未关闭,无法判断失败条件中channel 是已关闭还是未关闭(从而需要 atomic 操作)
if atomic.Load(&c.closed) == 0 {
return
}
// 再次检查 channel 是否为空
if empty(c) {
// 接收者不为 nil 时返回该类型的零值
if ep != nil {
// typedmemclr 逻辑是根据类型清理相应地址的内存
typedmemclr(c.elemtype, ep)
}
// 返回(true,fasle)
// 返回值1--true:表示被 select case 选中,
// 返回值2--fasle 表示是否正常收到数据
return true, false
}
}

var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
// 加锁,保证并发安全
lock(&c.lock)
// channel 已关闭,并且循环数组 buf 里没有元素
// 这里可以处理非缓冲型关闭 和 缓冲型关闭但 buf 无元素的情况
// 也就是说即使是关闭状态,但在缓冲型的 channel,
// buf 里有元素的情况下还能接收到元素
if c.closed != 0 && c.qcount == 0 {
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
// 等待发送队列里有 goroutine 存在,说明 buf 是满的
// 这有可能是:
// 1. 非缓冲型的 channel
// 2. 缓冲型的 channel,但 buf 满了
// 针对 1,直接进行内存拷贝(从 sender goroutine -> receiver goroutine)
// 针对 2,接收到循环数组头部的元素,并将发送者的元素放到循环数组尾部
if sg := c.sendq.dequeue(); sg != nil {
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
// chan的buf 里有元素,可以正常接收
if c.qcount > 0 {
// 直接从循环数组里找到要接收的元素
qp := chanbuf(c, c.recvx)
// ep != nil表示代码里,没有忽略要接收的值
// 即接收的代码不是 "<- ch",而是 "val <- ch"这种,ep 指向 val
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// 清理掉循环数组里相应位置的值
typedmemclr(c.elemtype, qp)
// 维护接收游标
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
// buf 数组里的元素个数减 1
c.qcount--
// 处理完成,解锁返回
unlock(&c.lock)
return true, true
}

// 到这里说明chan的buf里没有数据了,如果是非阻塞接收就直接返回了
if !block {
unlock(&c.lock)
return false, false
}

// 接下来就是要被阻塞的情况了
// 和发送类似的,构造一个 sudog
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// 这里需要注意一下,ep就是我们用来接收值得对象
// 这里把ep直接存到sudog.elem字段上
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg // 这个waiting同样是用来唤醒后做校验的
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
// 加入到chan的recvq队列里
c.recvq.enqueue(mysg)
atomic.Store8(&gp.parkingOnChan, 1)
// 将当前 goroutine 挂起
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)

// 唤醒后,继续往下执行

// 同样是进行数据校验
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
// 又是mysg.success,如果chan活动过就是true,否则是false
success := mysg.success
gp.param = nil
mysg.c = nil
releaseSudog(mysg)// 将 sudog 放回对象池
// 到这里如果goroutine被正常唤醒肯定是可以取到数据的
// 因为recvq的数据是由发送的时候直接copy过来了
return true, success
}

继续追踪一下 recv 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
// 非缓冲型的 channel
if c.dataqsiz == 0 {
// 并且需要接收值
if ep != nil {
// 直接进行内存拷贝
recvDirect(c.elemtype, sg, ep)
}
} else {
// 需要注意:进入recv方法说明sendq队列里是有值的
// 那么对缓冲型的 channel来说,sendq有值就意味着buf满了
// 也就是 recvx和sendx重合了都
// 这里要做的就是先从buf中读一个数据出来,然后再把发送者发送的数据写入buf
qp := chanbuf(c, c.recvx)
// 将接收游标处的数据拷贝给接收者
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// 从发送者把数据写入 recvx
typedmemmove(c.elemtype, qp, sg.elem)
// 然后修改 recvx和sendx 的位置
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
sg.elem = nil
gp := sg.g
// 解锁
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
// 最后唤醒发送的 goroutine
goready(gp, skip+1)
}

再看一下 recvDirect:

1
2
3
4
5
6
7
func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) {
// 如果是非缓冲型的,就直接从发送者的栈拷贝到接收者的栈。
// 和sendDirect一样的需要加内存屏障
src := sg.elem
typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
memmove(dst, src, t.size)
}

看了接收部分代码后,整个流程就更新清晰了。

根据前面的发送逻辑可以知道,不管是接收还是发送只要被阻塞了,加入到了 sendq 或者 recvq 之后,那么后续的发送或者接收都是由对方进行处理了。

比如接收被阻塞了,当前 g 构成成一个 sudog 然后加入到 recvq,接着调用了 gopark 就已经阻塞了,啥也干不了了。

只能等到有发送者来的时候直接从 recvq 里把这个 sudog 取出来,并且直接把要他发送的值拷贝到这个 sudog.elem 字段上,也就是调用 chan 接收方法是传进来的哪个值. 最后发送方再调用 goready 把这个 g 给唤醒,这样再把剩下的逻辑走完,这个被阻塞了一会的接收者就可以拿着数据返回了。

核心逻辑:

  • 1)如果有等待发送数据的 groutine,从 sendq 中取出一个等待发送数据的 Groutine,取出数据
  • 2)如果没有等待的 groutine,且环形队列中有数据,从队列中取出数据
  • 3)如果没有等待的 groutine,且环形队列中也没有数据,则阻塞该 Groutine,并将 groutine 打包为 sudogo 加入到 recevq 等待队列中

3.4 关闭 chan

close 就比较简单了,相关方法就两个:

1
2
3
4
5
//go:linkname reflect_chanclose
func reflect_chanclose(c *hchan) {
closechan(c)
}
func closechan(c *hchan){}

其中一个还是包装方法,真正逻辑就在 clsoechan 里。

每个逻辑都有一个 reflect_xxx 的方法,根据名字猜测是反射的时候用的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
func closechan(c *hchan) {
// 关闭一个nil的chan直接panic
if c == nil {
panic(plainError("close of nil channel"))
}
// 同样是先加锁
lock(&c.lock)
// 判断一下是否被关闭过了,关闭一个已经关闭的chan也是直接panic
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
// 修改closed标记为,表示chan已经被关闭了
c.closed = 1
// gList 是通过 g.schedlink 链接 G 的列表,一个 G 只能是一次在一个 gQueue 或 gList 上
// gList 模拟的是栈操作(FILO)
// gQueue 模拟的是队列操作(FIFO)
var glist gList

// 释放所有的接收者
for {
sg := c.recvq.dequeue()
// sg == nil,表示接收队列已为空,跳出循环
if sg == nil {
break
}
// 如果 elem 不为空说明未忽略接收值,赋值为该类型的零值
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success = false
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}

// release all writers (they will panic)
// 释放所有的发送者,抛出异常
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success = false
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
unlock(&c.lock)

// 循环读取 glist 里面的数据,挨个唤醒
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
}

核心流程:

  • 设置关闭状态
  • 唤醒所有等待读取 chanel 的协程
  • 所有等待写入 channel 的协程,抛出异常

4. 进阶

4.1 操作 chan

总结一下操作 channel 的结果:

操作 nil channel closed channel not nil, not closed channel
close panic panic 正常关闭
读 <- ch 阻塞 读到对应类型的零值 阻塞或正常读取数据。缓冲型 channel 为空或非缓冲型 channel 没有等待发送者时会阻塞
写 ch <- 阻塞 panic 阻塞或正常写入数据。非缓冲型 channel 没有等待接收者或缓冲型 channel buf 满时会被阻塞

总结一下,发生 panic 的情况有三种:向一个关闭的 channel 进行写操作;关闭一个 nil 的 channel;重复关闭一个 channel。

读、写一个 nil channel 都会被阻塞。

4.2 发送和接收元素的本质

Channel 发送和接收元素的本质是什么?参考资料【深入 channel 底层】里是这样回答的:

Remember all transfer of value on the go channels happens with the copy of value.

就是说 channel 的发送和接收操作本质上都是 “值的拷贝”,无论是从 sender goroutine 的栈到 chan buf,还是从 chan buf 到 receiver goroutine,或者是直接从 sender goroutine 到 receiver goroutine。

这里再引用文中的一个例子,我会加上更加详细地解释。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
type user struct {
name string
age int8
}

var u = user{name: "Ankur", age: 25}
var g = &u

func modifyUser(pu *user) {
fmt.Println("modifyUser Received Vaule", pu)
pu.name = "Anand"
}

func printUser(u <-chan *user) {
time.Sleep(2 * time.Second)
fmt.Println("printUser goRoutine called", <-u)
}

func main() {
c := make(chan *user, 5)
c <- g
fmt.Println(g)
// modify g
g = &user{name: "Ankur Anand", age: 100}
go printUser(c)
go modifyUser(g)
time.Sleep(5 * time.Second)
fmt.Println(g)
}

&{Ankur 25}
modifyUser Received Value &{Ankur Anand 100}
printUser goRoutine called &{Ankur 25}
&{Anand 100}

一开始构造一个结构体 u,地址是 0x56420,图中地址上方就是它的内容。接着把 &u 赋值给指针 g,g 的地址是 0x565bb0,它的内容就是一个地址,指向 u。

main 程序里,先把 g 发送到 c,根据 copy value 的本质,进入到 chan buf 里的就是 0x56420,它是指针 g 的值(不是它指向的内容),所以打印从 channel 接收到的元素时,它就是 &{Ankur 25}。因此,这里并不是将指针 g “发送” 到了 channel 里,只是拷贝它的值而已。

4.3 资源泄漏

Channel 可能会引发 goroutine 泄漏。

泄漏的原因是 goroutine 操作 channel 后,处于发送或接收阻塞状态,而 channel 处于满或空的状态,一直得不到改变。同时,垃圾回收器也不会回收此类资源,进而导致 gouroutine 会一直处于等待队列中,不见天日。