channel是用于 goroutine 之间的同步、通信的数据结构
channel 的底层是通过 mutex 来控制并发的,但它为程序员提供了更高一层次的抽象,封装了更多的功能,这样并发编程变得更加容易和安全,得以让程序员把注意力留到业务上去,提升开发效率
channel的用途包括但不限于以下几点:
本文将介绍channel的底层原理,包括数据结构,channel的创建,发送,接收,关闭的实现逻辑
Go channel的数据结构如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
type hchan struct { qcount uint // total data in the queue dataqsiz uint // size of the circular queue buf unsafe.Pointer // points to an array of dataqsiz elements elemsize uint16 closed uint32 elemtype *_type // element type sendx uint // send index recvx uint // receive index recvq waitq // list of recv waiters sendq waitq // list of send waiters lock mutex } |
qcount:已经存储了多少个元素
dataqsie:最多存储多少个元素,即缓冲区容量
buf:指向缓冲区的位置,实际上是一个数组
elemsize:每个元素占多大空间
closed:channel能够关闭,这里记录其关闭状态
elemtype:保存数据的类型信息,用于go运行时使用
sendx,recvx:
recvq,sendq:
当发送个接收不能立即完成时,需要让协程在channel上等待,所以有两个等待队列,分别针对接收和发送
lock:channel支持协程间并发访问,因此需要一把锁来保护
创建channel会被编译器编译为调用makechan函数
1 2 3 4 |
// 无缓冲通道 ch1 := make(chan int) // 有缓冲通道 ch2 := make(chan int, 10) |
会根据创建的是带缓存,还是无缓冲,决定第二个参数size的值
可以看出,创建出来的是hchan指针,这样就能在函数间直接传递 channel,而不用传递 channel 的指针
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
func makechan(t *chantype, size int) *hchan { elem := t.elem
// mem:缓冲区大小 mem, overflow := math.MulUintptr(elem.size, uintptr(size)) if overflow || mem > maxAlloc-hchanSize || size < 0 { panic(plainError( "makechan: size out of range" )) }
var c *hchan switch { // 缓冲区大小为空,只申请hchanSize大小的内存 case mem == 0: c = (*hchan)(mallocgc(hchanSize, nil, true)) c.buf = c.raceaddr() // 元素类型不包含指针,一次性分配hchanSize+mem大小的内存 case elem.ptrdata == 0: c = (*hchan)(mallocgc(hchanSize+mem, nil, true)) c.buf = add(unsafe.Pointer(c), hchanSize) // 否则就是带缓存,且有指针,分配两次内存 default: // Elements contain pointers. c = new(hchan) c.buf = mallocgc(mem, elem, true) }
// 保存元素类型,元素大小,容量 c.elemsize = uint16(elem.size) c.elemtype = elem c.dataqsiz = uint(size) lockInit(&c.lock, lockRankHchan)
return c } |
执行以下代码时:
1 |
ch <- 3 |
编译器会转化为对chansend的调用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 |
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { // 如果channel是空 if c == nil { // 非阻塞,直接返回 if !block { return false } // 否则阻塞当前协程 gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2) throw( "unreachable" ) }
// 非阻塞,没有关闭,且容量满了,无法发送,直接返回 if !block && c.closed == 0 && full(c) { return false }
// 加锁 lock(&c.lock)
// 如果已经关闭,无法发送,直接panic if c.closed != 0 { unlock(&c.lock) panic(plainError( "send on closed channel" )) }
// 从接收队列弹出一个协程的包装结构sudog if sg := c.recvq.dequeue(); sg != nil { // 如果能弹出,即有等到接收的协程,说明: // 该channel要么是无缓冲,要么缓冲区为空,不然不可能有协程在等待 // 将要发送的数据拷贝到该协程的接收指针上 send(c, sg, ep, func() { unlock(&c.lock) }, 3) return true }
// 缓冲区还有空间 if c.qcount < c.dataqsiz { // qp:计算要发送到的位置的地址 qp := chanbuf(c, c.sendx) // 将数据从ep拷贝到qp typedmemmove(c.elemtype, qp, ep) // 待发送位置移动 c.sendx++ // 由于是数组模拟队列,sendx到顶了需要归零 if c.sendx == c.dataqsiz { c.sendx = 0 } // 缓冲区数量++ c.qcount++ unlock(&c.lock) return true }
// 往下就是缓冲区无数据,也没有等到接收协程的情况了
// 如果是非阻塞模式,直接返回 if !block { unlock(&c.lock) return false }
// 将当前协程包装成sudog,阻塞到channel上 gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 }
mysg.elem = ep mysg.waitlink = nil mysg.g = gp mysg.isSelect = false mysg.c = c gp.waiting = mysg gp.param = nil
// 当前协程进入发送等待队列 c.sendq.enqueue(mysg) atomic.Store8(&gp.parkingOnChan, 1) gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
// 被唤醒后从这里开始执行
KeepAlive(ep)
if mysg != gp.waiting { throw( "G waiting list is corrupted" ) } gp.waiting = nil gp.activeStackChans = false closed := !mysg.success gp.param = nil if mysg.releasetime > 0 { blockevent(mysg.releasetime-t0, 2) } mysg.c = nil releaseSudog(mysg) // 被唤醒后发现channel关闭了,panic if closed { if c.closed == 0 { throw( "chansend: spurious wakeup" ) } panic(plainError( "send on closed channel" )) } return true } |
整体流程为:
如果当前操作为非阻塞,channel没有关闭,且容量满了,无法发送,直接返回
从接收队列弹出一个协程的包装结构sudog,如果能弹出,即有等到接收的协程,说明:
否则看看缓冲区还有空间,如果有,将数据拷贝到缓冲区上,也返回
接下来就是既没有接收者等待,缓冲区也为空的情况,就需要将当前协程包装成sudog,阻塞到channel上
将协程阻塞到channel的等待队列时,将其包装成了sudog结构:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
type sudog struct { // 协程 g *g // 前一个,后一个指针 next *sudog prev *sudog // 等到发送的数据在哪,等待从哪个位置接收数据 elem unsafe.Pointer acquiretime int64 releasetime int64 ticket uint32 isSelect bool success bool
parent *sudog // semaRoot binary tree waitlink *sudog // g.waiting list or semaRoot waittail *sudog // semaRoot // 在哪个channel上等待 c *hchan // channel } |
其目的是:
来看看一些子函数:
1.判断channel是否是满的
1 2 3 4 5 6 7 8 9 |
func full(c *hchan) bool { // 无缓冲 if c.dataqsiz == 0 { // 并且没有其他协程在等待 return c.recvq.first == nil } // 有缓冲,但容量装满了 return c.qcount == c.dataqsiz } |
2.send方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
/** c:要操作的channel sg:弹出的接收者协程 ep:要发送的数据在的位置 */ func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { // 如果接收者指针不为空,直接把数据从ep拷贝到sg.elem if sg.elem != nil { sendDirect(c.elemtype, sg, ep) sg.elem = nil } gp := sg.g unlockf() gp.param = unsafe.Pointer(sg) sg.success = true if sg.releasetime != 0 { sg.releasetime = cputicks() } // 唤醒该接收者协程 goready(gp, skip+1) } |
从channel中接收数据有几种写法:
根据带不带ok,决定用下面哪个方法
1 2 3 4 5 6 7 8 |
func chanrecv1(c *hchan, elem unsafe.Pointer) { chanrecv(c, elem, true) }
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) { _, received = chanrecv(c, elem, true) return } |
根据接不接收返回值,决定elem是不是nil
最终都会调用chanrecv方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 |
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { // 如果channel为nil,根据参数中是否阻塞来决定是否阻塞 if c == nil { if !block { return } gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2) throw( "unreachable" ) }
// 非阻塞,并且channel为空 if !block && empty(c) { // 如果还没关闭,直接返回 if atomic.Load(&c.closed) == 0 { return } // 否则已经关闭, // 如果为空,返回该类型的零值 if empty(c) { if ep != nil { typedmemclr(c.elemtype, ep) } return true, false } }
lock(&c.lock)
// 同样,如果channel已经关闭,且缓冲区没有元素,返回该类型零值 if c.closed != 0 && c.qcount == 0 { unlock(&c.lock) if ep != nil { typedmemclr(c.elemtype, ep) } return true, false }
// 如果有发送者正在阻塞,说明: // 1.无缓冲 // 2.有缓冲,但缓冲区满了。因为只有缓冲区满了,才可能有发送者在等待 if sg := c.sendq.dequeue(); sg != nil { // 将数据从缓冲区拷贝到ep,再将sg的数据拷贝到缓冲区,该函数详细流程可看下文 recv(c, sg, ep, func() { unlock(&c.lock) }, 3) return true, true }
// 如果缓存区有数据, if c.qcount > 0 { // qp为缓冲区中下一次接收的位置 qp := chanbuf(c, c.recvx) // 将数据从qp拷贝到ep if ep != nil { typedmemmove(c.elemtype, ep, qp) } typedmemclr(c.elemtype, qp) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.qcount-- unlock(&c.lock) return true, true }
// 接下来就是既没有发送者在等待,也缓冲区也没数据 if !block { unlock(&c.lock) return false, false }
// 将当前协程包装成sudog,阻塞到channel中 gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } // 记录接收地址 mysg.elem = ep mysg.waitlink = nil gp.waiting = mysg mysg.g = gp mysg.isSelect = false mysg.c = c gp.param = nil c.recvq.enqueue(mysg)
atomic.Store8(&gp.parkingOnChan, 1) gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
// 从这里唤醒 if mysg != gp.waiting { throw( "G waiting list is corrupted" ) } gp.waiting = nil gp.activeStackChans = false if mysg.releasetime > 0 { blockevent(mysg.releasetime-t0, 2) } success := mysg.success gp.param = nil mysg.c = nil releaseSudog(mysg) return true, success } |
接收流程如为:
如果channel为nil,根据参数中是否阻塞来决定是否阻塞
如果channel已经关闭,且缓冲区没有元素,返回该类型零值
如果有发送者正在阻塞,说明:
如果缓存区有数据, 则从缓冲区将数据复制到ep,返回
接下来就是既没有发送者在等待,也缓冲区也没数据的情况:
将当前协程包装成sudog,阻塞到channel中
来看其中的子函数recv():
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
/** c:操作的channel sg:阻塞的发送协程 ep:接收者接收数据的地址 */ func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { // 如果是无缓冲channel,直接将数据从发送者sg拷贝到ep if c.dataqsiz == 0 { if ep != nil { recvDirect(c.elemtype, sg, ep) } // 接下来是有缓冲,且缓冲区满的情况 } else { // qp为channel缓冲区中,接收者下一次接收的地址 qp := chanbuf(c, c.recvx) // 将数据从qp拷贝到ep if ep != nil { typedmemmove(c.elemtype, ep, qp) } // 将发送者的数据从sg.elem拷贝到qp typedmemmove(c.elemtype, qp, sg.elem) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } // 由于一接收已发送,缓冲区还是满的,因此 c.sendx = c.recvx c.sendx = c.recvx } sg.elem = nil gp := sg.g unlockf() gp.param = unsafe.Pointer(sg) sg.success = true if sg.releasetime != 0 { sg.releasetime = cputicks() } // 唤醒发送者 goready(gp, skip+1) } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
func closechan(c *hchan) { // 不能关闭空channel if c == nil { panic(plainError( "close of nil channel" )) }
lock(&c.lock) // 不能重复关闭 if c.closed != 0 { unlock(&c.lock) panic(plainError( "close of closed channel" )) }
// 修改关闭状态 c.closed = 1
var glist gList
// 释放所有的接收者协程,并为它们赋予零值 for { sg := c.recvq.dequeue() if sg == nil { break } if sg.elem != nil { typedmemclr(c.elemtype, sg.elem) sg.elem = nil } if sg.releasetime != 0 { sg.releasetime = cputicks() } gp := sg.g gp.param = unsafe.Pointer(sg) sg.success = false glist.push(gp) }
// 释放所有的发送者协程 for { sg := c.sendq.dequeue() if sg == nil { break } sg.elem = nil if sg.releasetime != 0 { sg.releasetime = cputicks() } gp := sg.g gp.param = unsafe.Pointer(sg) sg.success = false glist.push(gp) } unlock(&c.lock)
// 执行唤醒操作 for !glist.empty() { gp := glist.pop() gp.schedlink = 0 goready(gp, 3) } } |
关闭的流程比较简单,可以看出:
不能关闭空channel,不能重复关闭channel
先上一把大锁,接着把所有挂在这个 channel 上的 sender 和 receiver 全都连成一个 sudog 链表,再解锁。最后,再将所有的 sudog 全都唤醒:
接收者:会收到该类型的零值
这里返回零值没有问题,因为之所以这些接收者会阻塞,就是因为缓冲区没有数据,因此channel关闭后该接收者收到零值也符合逻辑
发送者:会被唤醒,然后panic
因此不能在有多个sender的时候贸然关闭channel