golang WaitGroup源码解析
目录
上一篇学习了WaitGroup的用法,这篇学习下WaitGroup的源码。源码路径:$GOROOT/src/sync/waitgroup.go
WaitGroup的基本机制是通过计数器记录被等待goroutine的数目,当goroutine退出后计数器会清零,同时通过信号量机制通知Wait函数解除阻塞。WaitGroup内部有两个计数器,可以称为v和w(借用源码里的计数器变量名),v记录了被等待的goroutine的数目,WaitGroup的Add()和Done()方法负责更新v的值;w记录了调用Wait()的goroutine的数目,Wait()方法负责更新w的值。
WaitGroup的结构体
type WaitGroup struct {
//表示禁止复制,可以忽略
noCopy noCopy
//计数器,虽然有12个字节,但实际只使用了8个字节。定义12个字节跟64位地址对齐有关,目的是为了兼容32位编译器
state1 [12]byte
//信号量
sema uint32
}
核心成员就是state1和sema。state1用作计数器,实际只用了8个字节,高4个字节记录被等待goroutine的数目,低4个字节记录调用Wait()的goroutine的数目:
|________|________|
v w
WaitGroup的方法
state()
获取实际的8个字节作为state的值
func (wg *WaitGroup) state() *uint64 {
if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
return (*uint64)(unsafe.Pointer(&wg.state1))
} else {
return (*uint64)(unsafe.Pointer(&wg.state1[4]))
}
}
如果state1的地址是8字节对齐的,直接取state1的低8字节;如果不是8字节对齐的,取state1的高8个字节。
Add()
非常重要,有两个作用,一个更新v的值,另一个当v变成0时发送信号量。(下面的代码去掉了源码里的race判断,race判断是竞争检测相关的,只有当运行程序时带-race参数才起作用,不是WaitGroup的核心功能,去掉后方便看清程序主体)
func (wg *WaitGroup) Add(delta int) {
//取出state的值
statep := wg.state()
//原子操作的方式把入参delta的值加到了state的高4字节,也就是v = v + delta
state := atomic.AddUint64(statep, uint64(delta)<<32)
//state右移32位得到v的值
v := int32(state >> 32)
//state的低32位是w的值
w := uint32(state)
//v变成负数时就panic。比如第一次调用Add就传负数,就会出现这种panic
if v < 0 {
panic("sync: negative WaitGroup counter")
}
//正常Add在Wait之前调用,w是0才对
if w != 0 && delta > 0 && v == int32(delta) {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
//正常Add之后程序返回
if v > 0 || w == 0 {
return
}
//又一个误用检查,没看懂
if *statep != state {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
//把计数器v和w都置0
*statep = 0
//发送w个信号量,也就是有多少goroutine调用了Wait()就发送多少信号量,让他们都解除阻塞
for ; w != 0; w-- {
runtime_Semrelease(&wg.sema, false)
}
}
Done()
实际上封装的Add()
func (wg *WaitGroup) Done() {
wg.Add(-1)
}
看到有人讨论Add()能不能传负数的问题,肯定是可以传负数的,只是得保证计数器v的值不能是负数。比如Add(2)后再Add(-1)是没问题的,如果第一次调用Add()就传负数,会出现panic。
Wait()
两个作用,更新w的值和阻塞调用方
func (wg *WaitGroup) Wait() {
//获取state的值
statep := wg.state()
for {
//原子操作的方式获取state的值
state := atomic.LoadUint64(statep)
v := int32(state >> 32)
w := uint32(state)
//如果等待的goroutine的数量为0就不用等待直接返回
if v == 0 {
return
}
// 调用Wait()的goroutine的数目加1,Add()发送信号量的时候知道多少goroutine在等待
if atomic.CompareAndSwapUint64(statep, state, state+1) {
//获取信号量,如果拿不到就阻塞,如果拿到了就继续执行
runtime_Semacquire(&wg.sema)
//程序运行到这里说明Add()已经发送了信号量,Wait()解除阻塞了,这个时间state的值是0才正常。非0是个异常情况,这里panic
if *statep != 0 {
panic("sync: WaitGroup is reused before previous Wait has returned")
}
return
}
}
}
总结
通过源码可以看出来使用WaitGroup时得注意各个方法的调用顺序,顺序用错会导致panic。另外如果Add()加的数量大于Done()减的数量的话,会导致程序出现deadlock。
本文地址:https://blog.csdn.net/haowunanhai/article/details/110121784