欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

golang WaitGroup源码解析

程序员文章站 2022-06-24 12:19:42
上一篇学习了WaitGroup的用法,这篇学习下WaitGroup的机制。源码路径:$GOROOT/src/sync/waitgroup.goWaitGroup的总体机制是通过计数器记录被等待goroutine的数目,当goroutine退出后计数器会清零,同时通过信号量机制通知Wait函数解除阻塞。WaitGroup内部有两个计数器,可以称为v和w(借用源码里的计数器变量名),v记录了被等待的goroutine的数目,WaitGroup的Add()和Done()方法更新v的值;w记录了调用Wait()的...

  目录

WaitGroup的结构体

WaitGroup的方法

state()

Add()

Done()

Wait()


 

     上一篇学习了WaitGroup的用法,这篇学习下WaitGroup的源码。源码路径:$GOROOT/src/sync/waitgroup.go


     WaitGroup的基本机制是通过计数器记录被等待goroutine的数目,当goroutine退出后计数器会清零,同时通过信号量机制通知Wait函数解除阻塞。WaitGroup内部有两个计数器,可以称为v和w(借用源码里的计数器变量名),v记录了被等待的goroutine的数目,WaitGroup的Add()和Done()方法负责更新v的值;w记录了调用Wait()的goroutine的数目,Wait()方法负责更新w的值。

WaitGroup的结构体

type WaitGroup struct {
        //表示禁止复制,可以忽略
	noCopy noCopy

        //计数器,虽然有12个字节,但实际只使用了8个字节。定义12个字节跟64位地址对齐有关,目的是为了兼容32位编译器
	state1 [12]byte

        //信号量
	sema   uint32
}

核心成员就是state1和sema。state1用作计数器,实际只用了8个字节,高4个字节记录被等待goroutine的数目,低4个字节记录调用Wait()的goroutine的数目:

|________|________|

      v                w

WaitGroup的方法

state()

获取实际的8个字节作为state的值

func (wg *WaitGroup) state() *uint64 {
	if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
		return (*uint64)(unsafe.Pointer(&wg.state1))
	} else {
		return (*uint64)(unsafe.Pointer(&wg.state1[4]))
	}
}

如果state1的地址是8字节对齐的,直接取state1的低8字节;如果不是8字节对齐的,取state1的高8个字节。

Add()

非常重要,有两个作用,一个更新v的值,另一个当v变成0时发送信号量(下面的代码去掉了源码里的race判断,race判断是竞争检测相关的,只有当运行程序时带-race参数才起作用,不是WaitGroup的核心功能,去掉后方便看清程序主体)

func (wg *WaitGroup) Add(delta int) {
        //取出state的值
	statep := wg.state()

        //原子操作的方式把入参delta的值加到了state的高4字节,也就是v = v + delta
	state := atomic.AddUint64(statep, uint64(delta)<<32)

        //state右移32位得到v的值
	v := int32(state >> 32)

        //state的低32位是w的值
	w := uint32(state)

        //v变成负数时就panic。比如第一次调用Add就传负数,就会出现这种panic
	if v < 0 {
		panic("sync: negative WaitGroup counter")
	}

        //正常Add在Wait之前调用,w是0才对
	if w != 0 && delta > 0 && v == int32(delta) {
		panic("sync: WaitGroup misuse: Add called concurrently with Wait")
	}
        
        //正常Add之后程序返回
	if v > 0 || w == 0 {
		return
	}
	
        //又一个误用检查,没看懂
	if *statep != state {
		panic("sync: WaitGroup misuse: Add called concurrently with Wait")
	}

	//把计数器v和w都置0
	*statep = 0

        //发送w个信号量,也就是有多少goroutine调用了Wait()就发送多少信号量,让他们都解除阻塞
	for ; w != 0; w-- {
		runtime_Semrelease(&wg.sema, false)
	}
}

Done()

实际上封装的Add()

func (wg *WaitGroup) Done() {
	wg.Add(-1)
}

看到有人讨论Add()能不能传负数的问题,肯定是可以传负数的,只是得保证计数器v的值不能是负数。比如Add(2)后再Add(-1)是没问题的如果第一次调用Add()就传负数,会出现panic。

Wait()

两个作用,更新w的值和阻塞调用方

func (wg *WaitGroup) Wait() {
        //获取state的值
	statep := wg.state()

	for {
                //原子操作的方式获取state的值
		state := atomic.LoadUint64(statep)
		v := int32(state >> 32)
		w := uint32(state)
                
                //如果等待的goroutine的数量为0就不用等待直接返回
		if v == 0 {
			return
		}

		// 调用Wait()的goroutine的数目加1,Add()发送信号量的时候知道多少goroutine在等待
		if atomic.CompareAndSwapUint64(statep, state, state+1) {
                        //获取信号量,如果拿不到就阻塞,如果拿到了就继续执行
			runtime_Semacquire(&wg.sema)
                        
                        //程序运行到这里说明Add()已经发送了信号量,Wait()解除阻塞了,这个时间state的值是0才正常。非0是个异常情况,这里panic
			if *statep != 0 {
				panic("sync: WaitGroup is reused before previous Wait has returned")
			}
			return
		}
	}
}

总结

通过源码可以看出来使用WaitGroup时得注意各个方法的调用顺序,顺序用错会导致panic。另外如果Add()加的数量大于Done()减的数量的话,会导致程序出现deadlock。

 

 

本文地址:https://blog.csdn.net/haowunanhai/article/details/110121784