Go 系列(一):chan 源码分析
本文主要介绍了 Go 语言 (golang) 中的 channel,并从源码层面分析其具体实现,包括创建 channel,发送数据,接收数据以及相关调度等。
以下分析基于 Go 1.17.5
1. 概述 官方对 chan 的描述如下:
A channel provides a mechanism for concurrently executing functions to communicate by sending and receivingvalues of a specified element type. The value of an uninitialized channel is nil.
chan 提供了一种并发通信机制,用于生产和消费某一指定类型数据,未初始化的 chan 的值是 nil。
Chan 是 Go 里面的一种数据结构,具有以下特性:
goroutine-safe,多个 goroutine 可以同时访问一个 channel 而不会出现并发问题
可以用于在 goroutine 之间存储和传递值
copying into and out of hchan buffer
其语义是先入先出(FIFO)
可以导致 goroutine 的 block 和 unblock
通过 sudog queues 来记录阻塞的 goroutine。
通过 runtime scheduler (gopark, goready) 来实现阻塞与唤醒。
Channel 字面意义是“通道”,类似于 Linux 中的管道。声明 channel 的语法如下:
1 2 3 chan T chan <- T <-chan T
单向通道的声明,用 <- 来表示,它指明通道的方向。
因为 channel 是一个引用类型,所以在它被初始化之前,它的值是 nil,channel 使用 make 函数进行初始化。可以向它传递一个 int 值,代表 channel 缓冲区的大小(容量),构造出来的是一个缓冲型的 channel;不传或传 0 的,构造的就是一个非缓冲型的 channel。
1 2 3 4 5 ch1 := make (chan int ) ch2 := make (chan int , 3 )
chan(即 hchan 结构体) 默认会被分配在堆上,make 返回的只是一个指向该对象的指针 。
无缓冲 channel
无缓冲的 channel(unbuffered channel),其缓冲区大小则默认为 0。在功能上其接受者会阻塞等待并阻塞应用程序,直至收到通信和接收到数据。
(引用 William Kennedy 的图)
缓冲 channel
有缓存的 channel(buffered channel),其缓存区大小是根据所设置的值来调整。在功能上,若缓冲区未满则不会阻塞,会源源不断的进行传输。当缓冲区满了后,发送者就会阻塞并等待。而当缓冲区为空时,接受者就会阻塞并等待,直至有新的数据:
(引用 William Kennedy 的图)
2. 数据结构 本质上 channel 在设计上就是环形队列。其包含发送方队列、接收方队列,加上互斥锁 mutex 等结构。
channel 是一个有锁的环形队列:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 type hchan struct { closed uint32 elemtype *_type buf unsafe.Pointer qcount uint dataqsiz uint elemsize uint16 sendx uint recvx uint recvq waitq sendq waitq lock mutex } type waitq struct { first *sudog last *sudog } type sudog struct { g *g next *sudog prev *sudog elem unsafe.Pointer c *hchan ... }
3. 实现原理 3.1 创建 chan 在源码中通道的创建由 makechan 方法实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 func makechan (t *chantype, size int ) *hchanfunc makechan64 (t *chantype, size int64 ) *hchanfunc reflect_makechan (t *chantype, size int ) *hchan { return makechan(t, size) } func makechan64 (t *chantype, size int64 ) *hchan { if int64 (int (size)) != size { panic (plainError("makechan: size out of range" )) } return makechan(t, int (size)) }
内部都是调用的 makechan 方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 func makechan (t *chantype, size int ) *hchan { elem := t.elem if elem.size >= 1 <<16 { throw("makechan: invalid channel element type" ) } if hchanSize%maxAlign != 0 || elem.align > maxAlign { throw("makechan: bad alignment" ) } mem, overflow := math.MulUintptr(elem.size, uintptr (size)) if overflow || mem > maxAlloc-hchanSize || size < 0 { panic (plainError("makechan: size out of range" )) } var c *hchan switch { case mem == 0 : c = (*hchan)(mallocgc(hchanSize, nil , true )) c.buf = c.raceaddr() case elem.ptrdata == 0 : c = (*hchan)(mallocgc(hchanSize+mem, nil , true )) c.buf = add(unsafe.Pointer(c), hchanSize) default : c = new (hchan) c.buf = mallocgc(mem, elem, true ) } c.elemsize = uint16 (elem.size) c.elemtype = elem c.dataqsiz = uint (size) lockInit(&c.lock, lockRankHchan) return c }
具体流程如下:
1)首先是编译器检查,包括通道元素类型的 size 以及通道和元素的对齐,然后计算存放数据元素的内存大小以及是否溢出
2)然后根据不同条件进行内存分配
总体的原则是:总内存大小 = hchan 需要的内存大小 + 元素需要的内存大小
队列为空或元素大小为 0:只需要开辟的内存空间为 hchan 本身的大小
元素不是指针类型:需要开辟的内存空间 = hchan 本身大小 + 每个元素的大小 * 申请的队列长度
元素是指针类型:这种情况下 buf 需要单独开辟空间,buf 占用内存大小为每个元素的大小 * 申请的队列长度 3)最后则对 chan 的其他字段赋值
3.2 发送数据 发送数据到 channel 时,直观的理解是将数据放到 chan 的环形队列中,不过 go 做了一些优化:
先判断是否有等待接收数据的 groutine,如果有,直接将数据发给 Groutine,唤醒 groutine,就不放入队列中了。
当然还有另外一种情况就是:队列如果满了,那就只能放到队列中等待,直到有数据被取走才能发送。
chan 的发送逻辑涉及到 5 个方法:
1 2 3 4 5 func selectnbsend (c *hchan, elem unsafe.Pointer) (selected bool ) {}func chansend1 (c *hchan, elem unsafe.Pointer) {…}func chansend (c *hchan, ep unsafe.Pointer, block bool , callerpc uintptr ) bool {…}func send (c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func () , skip int ) {…}func sendDirect (t *_type, sg *sudog, src unsafe.Pointer) {…}
chansend1 方法是 go 编译代码中c <- x
这种写法的入口点,即当我们编写代码 c <- x
其实就是调用此方法。 这四个方法的调用关系:chansend1 -> chansend -> send -> sendDirect
具体发送逻辑在chansend
这个方法里,然后真正使用的方法其实是对该方法的一层包装。
1 2 3 4 5 6 7 func chansend1 (c *hchan, elem unsafe.Pointer) { chansend(c, elem, true , getcallerpc()) } func selectnbsend (c *hchan, elem unsafe.Pointer) (selected bool ) { return chansend(c, elem, false , getcallerpc()) }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 func chansend (c *hchan, ep unsafe.Pointer, block bool , callerpc uintptr ) bool { if c == nil { if !block { return false } gopark(nil , nil , waitReasonChanSendNilChan, traceEvGoStop, 2 ) throw("unreachable" ) } if raceenabled { racereadpc(c.raceaddr(), callerpc, funcPC(chansend)) } if !block && c.closed == 0 && full(c) { return false } var t0 int64 if blockprofilerate > 0 { t0 = cputicks() } lock(&c.lock) if c.closed != 0 { unlock(&c.lock) panic (plainError("send on closed channel" )) } if sg := c.recvq.dequeue(); sg != nil { send(c, sg, ep, func () { unlock(&c.lock) }, 3 ) return true } if c.qcount < c.dataqsiz { qp := chanbuf(c, c.sendx) if raceenabled { racenotify(c, c.sendx, nil ) } typedmemmove(c.elemtype, qp, ep) c.sendx++ if c.sendx == c.dataqsiz { c.sendx = 0 } c.qcount++ unlock(&c.lock) return true } if !block { unlock(&c.lock) return false } gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } mysg.elem = ep mysg.waitlink = nil mysg.g = gp mysg.isSelect = false mysg.c = c gp.waiting = mysg gp.param = nil c.sendq.enqueue(mysg) atomic.Store8(&gp.parkingOnChan, 1 ) gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2 ) KeepAlive(ep) if mysg != gp.waiting { throw("G waiting list is corrupted" ) } gp.waiting = nil gp.activeStackChans = false closed := !mysg.success gp.param = nil if mysg.releasetime > 0 { blockevent(mysg.releasetime-t0, 2 ) } mysg.c = nil releaseSudog(mysg) if closed { if c.closed == 0 { throw("chansend: spurious wakeup" ) } panic (plainError("send on closed channel" )) } return true }
核心逻辑
如果 recvq 不为空,从 recvq 中取出一个等待接收数据的 Groutine,直接将数据发送给该 Groutine
如果 recvq 为空,才将数据放入 buf 中
如果 buf 已满,则将要发送的数据和当前的 Groutine 打包成 Sudog 对象放入 sendq,并将 groutine 置为等待状态
等 goroutine 再次被调度时程序继续执行
然后追踪一下 send 方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 func send (c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func () , skip int ) { if sg.elem != nil { sendDirect(c.elemtype, sg, ep) sg.elem = nil } gp := sg.g unlockf() gp.param = unsafe.Pointer(sg) sg.success = true if sg.releasetime != 0 { sg.releasetime = cputicks() } goready(gp, skip+1 ) }
继续看 sendDirect 方法:
1 2 3 4 5 6 7 8 9 10 11 func sendDirect (t *_type, sg *sudog, src unsafe.Pointer) { dst := sg.elem typeBitsBulkBarrier(t, uintptr (dst), uintptr (src), t.size) memmove(dst, src, t.size) }
这里涉及到一个 goroutine 直接写另一个 goroutine 栈的操作,一般而言,不同 goroutine 的栈是各自独有的。而这也违反了 GC 的一些假设。为了不出问题,写的过程中增加了写屏障,保证正确地完成写操作。这样做的好处是减少了一次内存 copy:不用先拷贝到 channel 的 buf,直接由发送者到接收者,没有中间商赚差价,效率得以提高,完美。
3.3 接收数据 从 channel 读取数据的流程和发送的类似,基本是发送操作的逆操作。
这里同样存在和 send 一样的优化:从 channel 读取数据时,不是直接去环形队列中去数据,而是先判断是否有等待发送数据的 groutine。如果有,直接将 groutine 出队列,取出数据返回,并唤醒 groutine。如果没有等待发送数据的 groutine,再从环形队列中取数据。
chan 的接收涉及到 7 个方法:
1 2 3 4 5 6 7 func reflect_chanrecv (c *hchan, nb bool , elem unsafe.Pointer) (selected bool , received bool ) {}func selectnbrecv (elem unsafe.Pointer, c *hchan) (selected, received bool ) {}func chanrecv1 (c *hchan, elem unsafe.Pointer) {…},func chanrecv2 (c *hchan, elem unsafe.Pointer) (received bool ) {…}func chanrecv (c *hchan, ep unsafe.Pointer, block bool ) (selected, received bool ) {…}func recv (c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func () , skip int ) {…}func recvDirect (t *_type, sg *sudog, dst unsafe.Pointer) {…}
按照发送时的套路可知,只有 chanrecv 是具体逻辑,上面几个都是包装方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 func reflect_chanrecv (c *hchan, nb bool , elem unsafe.Pointer) (selected bool , received bool ) { return chanrecv(c, elem, !nb) } func selectnbrecv (elem unsafe.Pointer, c *hchan) (selected, received bool ) { return chanrecv(c, elem, false ) } func chanrecv1 (c *hchan, elem unsafe.Pointer) { chanrecv(c, elem, true ) } func chanrecv2 (c *hchan, elem unsafe.Pointer) (received bool ) { _, received = chanrecv(c, elem, true ) return }
接收操作有两种写法,一种带 “ok”,反应 channel 是否关闭;
一种不带 “ok”,这种写法,当接收到相应类型的零值时无法知道是真实的发送者发送过来的值,还是 channel 被关闭后,返回给接收者的默认类型的零值。
两种写法,都有各自的应用场景。
经过编译器的处理后,这两种写法最后对应源码里的就是不带ok
的chanrecv1
和带ok
的chanrecv2
这两个函数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 func chanrecv (c *hchan, ep unsafe.Pointer, block bool ) (selected, received bool ) { if c == nil { if !block { return } gopark(nil , nil , waitReasonChanReceiveNilChan, traceEvGoStop, 2 ) throw("unreachable" ) } if !block && empty(c) { if atomic.Load(&c.closed) == 0 { return } if empty(c) { if ep != nil { typedmemclr(c.elemtype, ep) } return true , false } } var t0 int64 if blockprofilerate > 0 { t0 = cputicks() } lock(&c.lock) if c.closed != 0 && c.qcount == 0 { unlock(&c.lock) if ep != nil { typedmemclr(c.elemtype, ep) } return true , false } if sg := c.sendq.dequeue(); sg != nil { recv(c, sg, ep, func () { unlock(&c.lock) }, 3 ) return true , true } if c.qcount > 0 { qp := chanbuf(c, c.recvx) if ep != nil { typedmemmove(c.elemtype, ep, qp) } typedmemclr(c.elemtype, qp) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.qcount-- unlock(&c.lock) return true , true } if !block { unlock(&c.lock) return false , false } gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } mysg.elem = ep mysg.waitlink = nil gp.waiting = mysg mysg.g = gp mysg.isSelect = false mysg.c = c gp.param = nil c.recvq.enqueue(mysg) atomic.Store8(&gp.parkingOnChan, 1 ) gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2 ) if mysg != gp.waiting { throw("G waiting list is corrupted" ) } gp.waiting = nil gp.activeStackChans = false if mysg.releasetime > 0 { blockevent(mysg.releasetime-t0, 2 ) } success := mysg.success gp.param = nil mysg.c = nil releaseSudog(mysg) return true , success }
继续追踪一下 recv 方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 func recv (c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func () , skip int ) { if c.dataqsiz == 0 { if ep != nil { recvDirect(c.elemtype, sg, ep) } } else { qp := chanbuf(c, c.recvx) if ep != nil { typedmemmove(c.elemtype, ep, qp) } typedmemmove(c.elemtype, qp, sg.elem) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.sendx = c.recvx } sg.elem = nil gp := sg.g unlockf() gp.param = unsafe.Pointer(sg) sg.success = true if sg.releasetime != 0 { sg.releasetime = cputicks() } goready(gp, skip+1 ) }
再看一下 recvDirect:
1 2 3 4 5 6 7 func recvDirect (t *_type, sg *sudog, dst unsafe.Pointer) { src := sg.elem typeBitsBulkBarrier(t, uintptr (dst), uintptr (src), t.size) memmove(dst, src, t.size) }
看了接收部分代码后,整个流程就更新清晰了。
根据前面的发送逻辑可以知道,不管是接收还是发送只要被阻塞了,加入到了 sendq 或者 recvq 之后,那么后续的发送或者接收都是由对方进行处理了。
比如接收被阻塞了,当前 g 构成成一个 sudog 然后加入到 recvq,接着调用了 gopark 就已经阻塞了,啥也干不了了。
只能等到有发送者来的时候直接从 recvq 里把这个 sudog 取出来,并且直接把要他发送的值拷贝到这个 sudog.elem 字段上,也就是调用 chan 接收方法是传进来的哪个值. 最后发送方再调用 goready 把这个 g 给唤醒,这样再把剩下的逻辑走完,这个被阻塞了一会的接收者就可以拿着数据返回了。
核心逻辑:
1)如果有等待发送数据的 groutine,从 sendq 中取出一个等待发送数据的 Groutine,取出数据
2)如果没有等待的 groutine,且环形队列中有数据,从队列中取出数据
3)如果没有等待的 groutine,且环形队列中也没有数据,则阻塞该 Groutine,并将 groutine 打包为 sudogo 加入到 recevq 等待队列中
3.4 关闭 chan close 就比较简单了,相关方法就两个:
1 2 3 4 5 func reflect_chanclose (c *hchan) { closechan(c) } func closechan (c *hchan) {}
其中一个还是包装方法,真正逻辑就在 clsoechan 里。
每个逻辑都有一个 reflect_xxx 的方法,根据名字猜测是反射的时候用的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 func closechan (c *hchan) { if c == nil { panic (plainError("close of nil channel" )) } lock(&c.lock) if c.closed != 0 { unlock(&c.lock) panic (plainError("close of closed channel" )) } c.closed = 1 var glist gList for { sg := c.recvq.dequeue() if sg == nil { break } if sg.elem != nil { typedmemclr(c.elemtype, sg.elem) sg.elem = nil } if sg.releasetime != 0 { sg.releasetime = cputicks() } gp := sg.g gp.param = unsafe.Pointer(sg) sg.success = false if raceenabled { raceacquireg(gp, c.raceaddr()) } glist.push(gp) } for { sg := c.sendq.dequeue() if sg == nil { break } sg.elem = nil if sg.releasetime != 0 { sg.releasetime = cputicks() } gp := sg.g gp.param = unsafe.Pointer(sg) sg.success = false if raceenabled { raceacquireg(gp, c.raceaddr()) } glist.push(gp) } unlock(&c.lock) for !glist.empty() { gp := glist.pop() gp.schedlink = 0 goready(gp, 3 ) } }
核心流程:
设置关闭状态
唤醒所有等待读取 chanel 的协程
所有等待写入 channel 的协程,抛出异常
4. 进阶 4.1 操作 chan 总结一下操作 channel 的结果:
操作
nil channel
closed channel
not nil, not closed channel
close
panic
panic
正常关闭
读 <- ch
阻塞
读到对应类型的零值
阻塞或正常读取数据。缓冲型 channel 为空或非缓冲型 channel 没有等待发送者时会阻塞
写 ch <-
阻塞
panic
阻塞或正常写入数据。非缓冲型 channel 没有等待接收者或缓冲型 channel buf 满时会被阻塞
总结一下,发生 panic 的情况有三种:向一个关闭的 channel 进行写操作;关闭一个 nil 的 channel;重复关闭一个 channel。
读、写一个 nil channel 都会被阻塞。
4.2 发送和接收元素的本质 Channel 发送和接收元素的本质是什么?参考资料【深入 channel 底层】里是这样回答的:
Remember all transfer of value on the go channels happens with the copy of value.
就是说 channel 的发送和接收操作本质上都是 “值的拷贝”,无论是从 sender goroutine 的栈到 chan buf,还是从 chan buf 到 receiver goroutine,或者是直接从 sender goroutine 到 receiver goroutine。
这里再引用文中的一个例子,我会加上更加详细地解释。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 type user struct { name string age int8 } var u = user{name: "Ankur" , age: 25 }var g = &ufunc modifyUser (pu *user) { fmt.Println("modifyUser Received Vaule" , pu) pu.name = "Anand" } func printUser (u <-chan *user) { time.Sleep(2 * time.Second) fmt.Println("printUser goRoutine called" , <-u) } func main () { c := make (chan *user, 5 ) c <- g fmt.Println(g) g = &user{name: "Ankur Anand" , age: 100 } go printUser(c) go modifyUser(g) time.Sleep(5 * time.Second) fmt.Println(g) } &{Ankur 25 } modifyUser Received Value &{Ankur Anand 100 } printUser goRoutine called &{Ankur 25 } &{Anand 100 }
一开始构造一个结构体 u,地址是 0x56420,图中地址上方就是它的内容。接着把 &u 赋值给指针 g,g 的地址是 0x565bb0,它的内容就是一个地址,指向 u。
main 程序里,先把 g 发送到 c,根据 copy value 的本质,进入到 chan buf 里的就是 0x56420,它是指针 g 的值(不是它指向的内容),所以打印从 channel 接收到的元素时,它就是 &{Ankur 25}。因此,这里并不是将指针 g “发送” 到了 channel 里,只是拷贝它的值而已。
4.3 资源泄漏 Channel 可能会引发 goroutine 泄漏。
泄漏的原因是 goroutine 操作 channel 后,处于发送或接收阻塞状态,而 channel 处于满或空的状态,一直得不到改变。同时,垃圾回收器也不会回收此类资源,进而导致 gouroutine 会一直处于等待队列中,不见天日。