| 
                            
                                  channel是用于 goroutine 之间的同步、通信的数据结构 channel 的底层是通过 mutex 来控制并发的,但它为程序员提供了更高一层次的抽象,封装了更多的功能,这样并发编程变得更加容易和安全,得以让程序员把注意力留到业务上去,提升开发效率 channel的用途包括但不限于以下几点: 
	协程间通信,同步定时任务:和timer结合解耦生产方和消费方,实现阻塞队列控制并发数 本文将介绍channel的底层原理,包括数据结构,channel的创建,发送,接收,关闭的实现逻辑 整体结构Go channel的数据结构如下所示: 
	
		
			| 1 2 3 4 5 6 7 8 9 10 11 12 13 | type hchan struct {     qcount   uint           // total data in the queue     dataqsiz uint           // size of the circular queue     buf      unsafe.Pointer // points to an array of dataqsiz elements     elemsize uint16     closed   uint32     elemtype *_type // element type     sendx    uint   // send index     recvx    uint   // receive index     recvq    waitq  // list of recv waiters     sendq    waitq  // list of send waiters     lock mutex } |  qcount:已经存储了多少个元素 dataqsie:最多存储多少个元素,即缓冲区容量 buf:指向缓冲区的位置,实际上是一个数组 elemsize:每个元素占多大空间 closed:channel能够关闭,这里记录其关闭状态 elemtype:保存数据的类型信息,用于go运行时使用 sendx,recvx: 
	记录下一个要发送到的位置,下一次从哪里还是接收这里用数组模拟队列,这两个变量即表示队列的队头,队尾因此channel的缓冲也被称为环形缓冲区 recvq,sendq: 当发送个接收不能立即完成时,需要让协程在channel上等待,所以有两个等待队列,分别针对接收和发送 lock:channel支持协程间并发访问,因此需要一把锁来保护 创建创建channel会被编译器编译为调用makechan函数 
	
		
			| 1 2 3 4 | // 无缓冲通道 ch1 := make(chan int) // 有缓冲通道 ch2 := make(chan int, 10) |  会根据创建的是带缓存,还是无缓冲,决定第二个参数size的值 可以看出,创建出来的是hchan指针,这样就能在函数间直接传递 channel,而不用传递 channel 的指针 
	
		
			| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 | func makechan(t *chantype, size int) *hchan {    elem := t.elem          // mem:缓冲区大小    mem, overflow := math.MulUintptr(elem.size, uintptr(size))    if overflow || mem > maxAlloc-hchanSize || size < 0 {       panic(plainError( "makechan: size out of range" ))    }      var c *hchan    switch {    // 缓冲区大小为空,只申请hchanSize大小的内存    case mem == 0:        c = (*hchan)(mallocgc(hchanSize, nil, true))        c.buf = c.raceaddr()    // 元素类型不包含指针,一次性分配hchanSize+mem大小的内存    case elem.ptrdata == 0:        c = (*hchan)(mallocgc(hchanSize+mem, nil, true))        c.buf = add(unsafe.Pointer(c), hchanSize)    // 否则就是带缓存,且有指针,分配两次内存    default:       // Elements contain pointers.        c = new(hchan)        c.buf = mallocgc(mem, elem, true)    }          // 保存元素类型,元素大小,容量    c.elemsize = uint16(elem.size)    c.elemtype = elem    c.dataqsiz = uint(size)    lockInit(&c.lock, lockRankHchan)         return c } |  发送执行以下代码时: 编译器会转化为对chansend的调用 
	
		
			| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 | func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {    // 如果channel是空    if c == nil {       // 非阻塞,直接返回       if !block {          return  false       }       // 否则阻塞当前协程       gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)       throw( "unreachable" )    }      // 非阻塞,没有关闭,且容量满了,无法发送,直接返回    if !block && c.closed == 0 && full(c) {       return  false    }      // 加锁    lock(&c.lock)      // 如果已经关闭,无法发送,直接panic    if c.closed != 0 {       unlock(&c.lock)       panic(plainError( "send on closed channel" ))    }      // 从接收队列弹出一个协程的包装结构sudog    if sg := c.recvq.dequeue(); sg != nil {       // 如果能弹出,即有等到接收的协程,说明:       // 该channel要么是无缓冲,要么缓冲区为空,不然不可能有协程在等待       // 将要发送的数据拷贝到该协程的接收指针上       send(c, sg, ep, func() { unlock(&c.lock) }, 3)       return  true }      // 缓冲区还有空间    if c.qcount < c.dataqsiz {       // qp:计算要发送到的位置的地址       qp := chanbuf(c, c.sendx)       // 将数据从ep拷贝到qp       typedmemmove(c.elemtype, qp, ep)       // 待发送位置移动       c.sendx++       // 由于是数组模拟队列,sendx到顶了需要归零       if c.sendx == c.dataqsiz {          c.sendx = 0       }       // 缓冲区数量++       c.qcount++       unlock(&c.lock)       return  true }      // 往下就是缓冲区无数据,也没有等到接收协程的情况了         // 如果是非阻塞模式,直接返回    if !block {       unlock(&c.lock)       return  false     }      // 将当前协程包装成sudog,阻塞到channel上    gp := getg()    mysg := acquireSudog()    mysg.releasetime = 0    if t0 != 0 {       mysg.releasetime = -1    }        mysg.elem = ep    mysg.waitlink = nil    mysg.g = gp    mysg.isSelect = false    mysg.c = c    gp.waiting = mysg    gp.param = nil         // 当前协程进入发送等待队列    c.sendq.enqueue(mysg)    atomic.Store8(&gp.parkingOnChan, 1)    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)       // 被唤醒后从这里开始执行         KeepAlive(ep)      if mysg != gp.waiting {       throw( "G waiting list is corrupted" )    }    gp.waiting = nil    gp.activeStackChans = false    closed := !mysg.success    gp.param = nil    if mysg.releasetime > 0 {       blockevent(mysg.releasetime-t0, 2)    }    mysg.c = nil    releaseSudog(mysg)    // 被唤醒后发现channel关闭了,panic    if closed {       if c.closed == 0 {          throw( "chansend: spurious wakeup" )       }       panic(plainError( "send on closed channel" ))    }    return  true } |  整体流程为: 如果当前操作为非阻塞,channel没有关闭,且容量满了,无法发送,直接返回 从接收队列弹出一个协程的包装结构sudog,如果能弹出,即有等到接收的协程,说明: 
	该channel要么是无缓冲,要么缓冲区为空,不然不可能有协程在等待将要发送的数据拷贝到该协程的接收指针上,返回这里直接从发送者拷贝到接收者的内存,而不是先把数据拷贝到缓冲区,再从缓冲区拷贝到接收者,节约了一次内存拷贝 否则看看缓冲区还有空间,如果有,将数据拷贝到缓冲区上,也返回 接下来就是既没有接收者等待,缓冲区也为空的情况,就需要将当前协程包装成sudog,阻塞到channel上 将协程阻塞到channel的等待队列时,将其包装成了sudog结构: 
	
		
			| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | type sudog struct {    // 协程    g *g    // 前一个,后一个指针    next *sudog    prev *sudog    // 等到发送的数据在哪,等待从哪个位置接收数据    elem unsafe.Pointer    acquiretime int64    releasetime int64    ticket      uint32    isSelect bool    success bool      parent   *sudog // semaRoot binary tree    waitlink *sudog // g.waiting list or semaRoot    waittail *sudog // semaRoot    // 在哪个channel上等待    c        *hchan // channel } |  其目的是: 
	g本身没有存储前一个,后一个指针,需要用sudog结构包装才能加入队列elem字段存储等到发送的数据在哪,等待从哪个位置接收数据,用于从数据能从协程到协程的直接拷贝 来看看一些子函数: 1.判断channel是否是满的 
	
		
			| 1 2 3 4 5 6 7 8 9 | func full(c *hchan) bool {    // 无缓冲    if c.dataqsiz == 0 {       // 并且没有其他协程在等待       return c.recvq.first == nil    }    // 有缓冲,但容量装满了    return c.qcount == c.dataqsiz } |  2.send方法: 
	
		
			| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 | /** c:要操作的channel sg:弹出的接收者协程 ep:要发送的数据在的位置 */ func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {    // 如果接收者指针不为空,直接把数据从ep拷贝到sg.elem    if sg.elem != nil {       sendDirect(c.elemtype, sg, ep)       sg.elem = nil    }    gp := sg.g    unlockf()    gp.param = unsafe.Pointer(sg)    sg.success = true    if sg.releasetime != 0 {       sg.releasetime = cputicks()    }    // 唤醒该接收者协程    goready(gp, skip+1) } |  接收从channel中接收数据有几种写法: 根据带不带ok,决定用下面哪个方法 
	
		
			| 1 2 3 4 5 6 7 8 | func chanrecv1(c *hchan, elem unsafe.Pointer) {         chanrecv(c, elem, true) }   func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {         _, received = chanrecv(c, elem, true)         return } |  根据接不接收返回值,决定elem是不是nil 最终都会调用chanrecv方法: 
	
		
			| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 | func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {     // 如果channel为nil,根据参数中是否阻塞来决定是否阻塞    if c == nil {       if !block {          return    }       gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)       throw( "unreachable" )    }      // 非阻塞,并且channel为空    if !block && empty(c) {       // 如果还没关闭,直接返回    if atomic.Load(&c.closed) == 0 {       return    }       // 否则已经关闭,       // 如果为空,返回该类型的零值    if empty(c) {      if ep != nil {         typedmemclr(c.elemtype, ep)      }      return  true, false        }    }      lock(&c.lock)         // 同样,如果channel已经关闭,且缓冲区没有元素,返回该类型零值    if c.closed != 0 && c.qcount == 0 {       unlock(&c.lock)       if ep != nil {          typedmemclr(c.elemtype, ep)       }       return  true, false }          // 如果有发送者正在阻塞,说明:    // 1.无缓冲    // 2.有缓冲,但缓冲区满了。因为只有缓冲区满了,才可能有发送者在等待    if sg := c.sendq.dequeue(); sg != nil {       // 将数据从缓冲区拷贝到ep,再将sg的数据拷贝到缓冲区,该函数详细流程可看下文       recv(c, sg, ep, func() { unlock(&c.lock) }, 3)       return  true, true }          // 如果缓存区有数据,    if c.qcount > 0 {       // qp为缓冲区中下一次接收的位置       qp := chanbuf(c, c.recvx)       // 将数据从qp拷贝到ep       if ep != nil {          typedmemmove(c.elemtype, ep, qp)       }       typedmemclr(c.elemtype, qp)       c.recvx++       if c.recvx == c.dataqsiz {          c.recvx = 0       }       c.qcount--       unlock(&c.lock)       return  true, true }      // 接下来就是既没有发送者在等待,也缓冲区也没数据    if !block {       unlock(&c.lock)       return  false, false }      // 将当前协程包装成sudog,阻塞到channel中    gp := getg()    mysg := acquireSudog()    mysg.releasetime = 0    if t0 != 0 {       mysg.releasetime = -1    }    // 记录接收地址    mysg.elem = ep    mysg.waitlink = nil    gp.waiting = mysg    mysg.g = gp    mysg.isSelect = false    mysg.c = c    gp.param = nil    c.recvq.enqueue(mysg)      atomic.Store8(&gp.parkingOnChan, 1)    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive,        traceEvGoBlockRecv, 2)      // 从这里唤醒    if mysg != gp.waiting {       throw( "G waiting list is corrupted" )    }    gp.waiting = nil    gp.activeStackChans = false    if mysg.releasetime > 0 {       blockevent(mysg.releasetime-t0, 2)    }    success := mysg.success    gp.param = nil    mysg.c = nil    releaseSudog(mysg)    return  true, success } |  接收流程如为: 如果channel为nil,根据参数中是否阻塞来决定是否阻塞 如果channel已经关闭,且缓冲区没有元素,返回该类型零值 如果有发送者正在阻塞,说明: 
	要么是无缓冲有缓冲,但缓冲区满了。因为只有缓冲区满了,才可能有发送者在等待将数据从缓冲区拷贝到ep,再将发送者的数据拷贝到缓冲区,并唤该发送者 如果缓存区有数据, 则从缓冲区将数据复制到ep,返回 接下来就是既没有发送者在等待,也缓冲区也没数据的情况: 将当前协程包装成sudog,阻塞到channel中 来看其中的子函数recv(): 
	
		
			| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 | /** c:操作的channel sg:阻塞的发送协程 ep:接收者接收数据的地址 */ func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {    // 如果是无缓冲channel,直接将数据从发送者sg拷贝到ep    if c.dataqsiz == 0 {       if ep != nil {          recvDirect(c.elemtype, sg, ep)       }    // 接下来是有缓冲,且缓冲区满的情况      } else {       // qp为channel缓冲区中,接收者下一次接收的地址    qp := chanbuf(c, c.recvx)       // 将数据从qp拷贝到ep    if ep != nil {          typedmemmove(c.elemtype, ep, qp)     }     // 将发送者的数据从sg.elem拷贝到qp     typedmemmove(c.elemtype, qp, sg.elem)     c.recvx++     if c.recvx == c.dataqsiz {        c.recvx = 0     }     // 由于一接收已发送,缓冲区还是满的,因此 c.sendx = c.recvx     c.sendx = c.recvx }    sg.elem = nil    gp := sg.g    unlockf()    gp.param = unsafe.Pointer(sg)    sg.success = true    if sg.releasetime != 0 {       sg.releasetime = cputicks()    }    // 唤醒发送者    goready(gp, skip+1) } |  关闭
	
		
			| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 | func closechan(c *hchan) {    // 不能关闭空channel    if c == nil {       panic(plainError( "close of nil channel" ))    }      lock(&c.lock)    // 不能重复关闭    if c.closed != 0 {       unlock(&c.lock)       panic(plainError( "close of closed channel" ))    }      // 修改关闭状态    c.closed = 1      var glist gList      // 释放所有的接收者协程,并为它们赋予零值  for {       sg := c.recvq.dequeue()       if sg == nil {          break       }       if sg.elem != nil {          typedmemclr(c.elemtype, sg.elem)          sg.elem = nil       }       if sg.releasetime != 0 {          sg.releasetime = cputicks()       }       gp := sg.g       gp.param = unsafe.Pointer(sg)       sg.success = false       glist.push(gp)    }      // 释放所有的发送者协程  for {       sg := c.sendq.dequeue()       if sg == nil {          break      }       sg.elem = nil       if sg.releasetime != 0 {          sg.releasetime = cputicks()       }       gp := sg.g       gp.param = unsafe.Pointer(sg)       sg.success = false       glist.push(gp)    }    unlock(&c.lock)      // 执行唤醒操作  for !glist.empty() {       gp := glist.pop()       gp.schedlink = 0       goready(gp, 3)    } } |  关闭的流程比较简单,可以看出: 不能关闭空channel,不能重复关闭channel 先上一把大锁,接着把所有挂在这个 channel 上的 sender 和 receiver 全都连成一个 sudog 链表,再解锁。最后,再将所有的 sudog 全都唤醒: 接收者:会收到该类型的零值 这里返回零值没有问题,因为之所以这些接收者会阻塞,就是因为缓冲区没有数据,因此channel关闭后该接收者收到零值也符合逻辑 发送者:会被唤醒,然后panic 因此不能在有多个sender的时候贸然关闭channel 
 |