Golang中channel的原理解读(推荐)
程序员文章站
2022-06-24 10:14:20
数据结构channel的数据结构在$goroot/src/runtime/chan.go文件下:type hchan struct { qcount uint // 当前队...
数据结构
channel的数据结构在$goroot/src/runtime/chan.go
文件下:
type hchan struct { qcount uint // 当前队列中剩余元素个数 dataqsiz uint // 环形队列长度,即可以存放的元素个数 buf unsafe.pointer // 环形队列指针 elemsize uint16 // 每个元素的大小 closed uint32 // 标记是否关闭 elemtype *_type // 元素类型 sendx uint // 队列下标,指向元素写入时存放到队列中的位置 recvx uint // 队列下标,指向元素从队列中读出的位置 recvq waitq // 等待读消息的groutine队列 sendq waitq // 等待写消息的groutine队列 lock mutex // 互斥锁 }
chan内部实现了一个环形队列作为缓冲区,队列的长度在创建chan时指定:
等待队列(recvq/sendq)使用双向链表 runtime.waitq 表示,链表中所有的元素都是 runtime.sudog
结构:
type waitq struct { first *sudog last *sudog } type sudog struct { g *g next *sudog prev *sudog elem unsafe.pointer // data element (may point to stack) acquiretime int64 releasetime int64 ticket uint32 isselect bool parent *sudog // semaroot binary tree waitlink *sudog // g.waiting list or semaroot waittail *sudog // semaroot c *hchan // channel }
创建channel
通常使用make(channel string, 0)
的方式创建无缓存的channel,使用make(channel string, 10)
创建有缓存的channel。
源码:
func makechan(t *chantype, size int) *hchan { elem := t.elem // compiler checks this but be safe. if elem.size >= 1<<16 { throw("makechan: invalid channel element type") } if hchansize%maxalign != 0 || elem.align > maxalign { throw("makechan: bad alignment") } mem, overflow := math.muluintptr(elem.size, uintptr(size)) if overflow || mem > maxalloc-hchansize || size < 0 { panic(plainerror("makechan: size out of range")) } var c *hchan switch { case mem == 0: // 如果当前 channel 中不存在缓冲区,那么就只会为 runtime.hchan 分配一段内存空间; c = (*hchan)(mallocgc(hchansize, nil, true)) c.buf = c.raceaddr() case elem.ptrdata == 0: // 如果当前 channel 中存储的类型不是指针类型,会为当前的 channel 和底层的数组分配一块连续的内存空间; c = (*hchan)(mallocgc(hchansize+mem, nil, true)) c.buf = add(unsafe.pointer(c), hchansize) default: //单独为 runtime.hchan 和缓冲区分配内存; c = new(hchan) c.buf = mallocgc(mem, elem, true) } c.elemsize = uint16(elem.size) c.elemtype = elem c.dataqsiz = uint(size) lockinit(&c.lock, lockrankhchan) // 在函数的最后会统一更新elemsize、elemtype 和 dataqsiz 几个字段; if debugchan { print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n") } return c }
channel读写
写
- 当有新数据来时,首先判断recvq中是否有groutine存在,如果recvq不为空,则说明缓冲区为空,或者没有缓冲区,因为如果缓冲区有数据会被recvq里面的groutine消费。此时从recvq中拿出一个groutine并绑定数据,唤醒该groutine执行任务,这个过程跳过了将数据写入缓冲区的过程。
- 如果缓冲区有数据并有空余位置,将数据放入缓冲区。
- 如果缓冲区有数据但没有空余位置,当前groutine绑定数据并放入sendx,进入睡眠,等待被唤醒。
源码:
func chansend(c *hchan, ep unsafe.pointer, block bool, callerpc uintptr) bool { ..... lock(&c.lock) if c.closed != 0 { unlock(&c.lock) panic(plainerror("send on closed channel")) } // 如果channel 没有被关闭并且已经有处于读等待的 goroutine, // 那么从接收队列 recvq 中取出最先陷入等待的 goroutine 并直接向它发送数据 if sg := c.recvq.dequeue(); sg != nil { send(c, sg, ep, func() { unlock(&c.lock) }, 3) return true } // 如果recvq为空且缓冲区中还有剩余空间 if c.qcount < c.dataqsiz { // 计算出下一个可以存储数据的位置, qp := chanbuf(c, c.sendx) // raceenabled: 是否启用数据竞争检测,在编译时指定,默认为false if raceenabled { // 发出数据竞争警告 raceacquire(qp) racerelease(qp) } // 将发送的数据拷贝到缓冲区中,产生内存拷贝 typedmemmove(c.elemtype, qp, ep) // 增加 sendx 索引 c.sendx++ if c.sendx == c.dataqsiz { c.sendx = 0 } // 增加计数器 c.qcount++ unlock(&c.lock) return true } if !block { unlock(&c.lock) return false } // 将channel数据绑定到当前groutine并使groutine休眠 // 获取发送数据使用的 goroutine gp := getg() // 获取 runtime.sudog 结构并设置这一次阻塞发送的相关信息, // 例如发送的 channel、是否在 select 中和待发送数据的内存地址等 mysg := acquiresudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } // 将刚刚创建并初始化的 mysg 加入发送等待队列,并设置到当前 goroutine的waiting上, // 表示 goroutine 正在等待该sudog准备就绪 mysg.elem = ep mysg.waitlink = nil mysg.g = gp mysg.isselect = false mysg.c = c gp.waiting = mysg gp.param = nil c.sendq.enqueue(mysg) // 休眠groutine gopark(chanparkcommit, unsafe.pointer(&c.lock), waitreasonchansend, traceevgoblocksend, 2) // 保证传入的数据不被gc keepalive(ep) // someone woke us up. if mysg != gp.waiting { throw("g waiting list is corrupted") } gp.waiting = nil gp.activestackchans = false if gp.param == nil { if c.closed == 0 { throw("chansend: spurious wakeup") } panic(plainerror("send on closed channel")) } gp.param = nil if mysg.releasetime > 0 { blockevent(mysg.releasetime-t0, 2) } mysg.c = nil releasesudog(mysg) return true }
读
- 如果sendx不为空且缓冲区不为空,从缓冲区头部读出数据并在当前g执行任务,在sendx中拿出一个g,将其数据写入缓冲区尾部并唤醒该g。
- 如果sendx不为空且缓冲区为空,直接从sendx中拿出一个g,将g中数据取出并唤醒该g。
- 如果sendx为空且缓冲区不为空,则从缓冲区头部拿出一个数据。
- 如果sendx为空且缓冲区为空,将该g放入recvq,进入休眠,等待被唤醒。
源码:
func chanrecv(c *hchan, ep unsafe.pointer, block bool) (selected, received bool) { // block:这次接收是否阻塞 if debugchan { print("chanrecv: chan=", c, "\n") } if c == nil { if !block { return } // 从一个空 channel 接收数据时会直接让出处理器的使用权 gopark(nil, nil, waitreasonchanreceivenilchan, traceevgostop, 2) throw("unreachable") } // fast path: check for failed non-blocking operation without acquiring the lock. if !block && empty(c) { // 如果channel为空并且未关闭,直接返回 if atomic.load(&c.closed) == 0 { return } if empty(c) { // the channel is irreversibly closed and empty. if raceenabled { raceacquire(c.raceaddr()) } if ep != nil { // 手动标记清楚对象 typedmemclr(c.elemtype, ep) } return true, false } } var t0 int64 if blockprofilerate > 0 { t0 = cputicks() } lock(&c.lock) //如果channel为空,并且已关闭,说明对象不可达 if c.closed != 0 && c.qcount == 0 { if raceenabled { raceacquire(c.raceaddr()) } unlock(&c.lock) if ep != nil { // 手动标记清除 typedmemclr(c.elemtype, ep) } return true, false } // 如果sendq不为空,直接消费,避免sendq --> queue --> recvx的过程 if sg := c.sendq.dequeue(); sg != nil { recv(c, sg, ep, func() { unlock(&c.lock) }, 3) return true, true } // 当 channel 的缓冲区中已经包含数据时,从 channel 中接收数据会直接从缓冲区中 // recvx 的索引位置中取出数据进行处理 if c.qcount > 0 { // receive directly from queue qp := chanbuf(c, c.recvx) if raceenabled { raceacquire(qp) racerelease(qp) } // 如果接收数据的内存地址不为空,那么会使用 runtime.typedmemmove将缓冲区中的数据拷贝到内存中 if ep != nil { typedmemmove(c.elemtype, ep, qp) } // 使用 runtime.typedmemclr清除队列中的数据并完成收尾工作 typedmemclr(c.elemtype, qp) c.recvx++ // recvx位置归零 if c.recvx == c.dataqsiz { c.recvx = 0 } c.qcount-- // 计数减一 unlock(&c.lock) return true, true } if !block { unlock(&c.lock) return false, false } // 当 sendq不为空 并且缓冲区中也不存在任何数据时,阻塞并休眠当前groutine gp := getg() mysg := acquiresudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } // no stack splits between assigning elem and enqueuing mysg // on gp.waiting where copystack can find it. mysg.elem = ep mysg.waitlink = nil gp.waiting = mysg mysg.g = gp mysg.isselect = false mysg.c = c gp.param = nil c.recvq.enqueue(mysg) gopark(chanparkcommit, unsafe.pointer(&c.lock), waitreasonchanreceive, traceevgoblockrecv, 2) // someone woke us up if mysg != gp.waiting { throw("g waiting list is corrupted") } gp.waiting = nil gp.activestackchans = false if mysg.releasetime > 0 { blockevent(mysg.releasetime-t0, 2) } closed := gp.param == nil gp.param = nil mysg.c = nil releasesudog(mysg) return true, !closed }
到此这篇关于golang中channel的原理解读的文章就介绍到这了,更多相关golang channel原理内容请搜索以前的文章或继续浏览下面的相关文章希望大家以后多多支持!
上一篇: 6盒一疗程
下一篇: 浅谈golang 的高效编码细节