// newcond returns a new cond with locker l. func newcond(l locker) *cond { return &cond{l: l} } // a locker represents an object that can be locked and unlocked. type locker interface { lock() unlock() }
通过使用 newcond 函数可以返回 *sync.cond 类型的结果, *sync.cond 我们主要操作其三个方法,分别是:
// wait atomically unlocks c.l and suspends execution // of the calling goroutine. after later resuming execution, // wait locks c.l before returning. unlike in other systems, // wait cannot return unless awoken by broadcast or signal. // // because c.l is not locked when wait first resumes, the caller // typically cannot assume that the condition is true when // wait returns. instead, the caller should wait in a loop: // // c.l.lock() // for !condition() { // c.wait() // } // ... make use of condition ... // c.l.unlock() // func (c *cond) wait() { c.checker.check() t := runtime_notifylistadd(&c.notify) c.l.unlock() runtime_notifylistwait(&c.notify, t) c.l.lock() } // signal wakes one goroutine waiting on c, if there is any. // // it is allowed but not required for the caller to hold c.l // during the call. func (c *cond) signal() { c.checker.check() runtime_notifylistnotifyone(&c.notify) } // broadcast wakes all goroutines waiting on c. // // it is allowed but not required for the caller to hold c.l // during the call. func (c *cond) broadcast() { c.checker.check() runtime_notifylistnotifyall(&c.notify) }
sync.cond 主要实现一个条件变量,假如 goroutine a 执行前需要等待另外的goroutine b 的通知,那边处于等待的goroutine a 会保存在一个通知列表,也就是说需要某种变量状态的goroutine a 将会等待/wait在那里,当某个时刻状态改变时负责通知的goroutine b 通过对条件变量通知的方式(broadcast,signal)来通知处于等待条件变量的goroutine a, 这样便可首先一种“消息通知”的同步机制。
以go的http处理为例,在go的源码中http模块server部分源码中所示,当需要处理一个新的连接的时候,若连接conn是实现自*tls.conn的情况下,会进行相关的客户端与服务端的“握手”处理handshake(), 入口代码如下:
if tlsconn, ok := c.rwc.(*tls.conn); ok { if d := c.server.readtimeout; d != 0 { c.rwc.setreaddeadline(time.now().add(d)) } if d := c.server.writetimeout; d != 0 { c.rwc.setwritedeadline(time.now().add(d)) } if err := tlsconn.handshake(); err != nil { c.server.logf("http: tls handshake error from %s: %v", c.rwc.remoteaddr(), err) return } c.tlsstate = new(tls.connectionstate) *c.tlsstate = tlsconn.connectionstate() if proto := c.tlsstate.negotiatedprotocol; validnpn(proto) { if fn := c.server.tlsnextproto[proto]; fn != nil { h := initnpnrequest{tlsconn, serverhandler{c.server}} fn(c.server, tlsconn, h) } return } }
func (c *conn) handshake() error { c.handshakemutex.lock() defer c.handshakemutex.unlock() for { if err := c.handshakeerr; err != nil { return err } if c.handshakecomplete { return nil } if c.handshakecond == nil { break } c.handshakecond.wait() } c.handshakecond = sync.newcond(&c.handshakemutex) c.handshakemutex.unlock() c.in.lock() defer c.in.unlock() c.handshakemutex.lock() if c.handshakeerr != nil || c.handshakecomplete { panic("handshake should not have been able to complete after handshakecond was set") } if c.isclient { c.handshakeerr = c.clienthandshake() } else { c.handshakeerr = c.serverhandshake() } if c.handshakeerr == nil { c.handshakes++ } else { c.flush() } if c.handshakeerr == nil && !c.handshakecomplete { panic("handshake should have had a result.") } c.handshakecond.broadcast() c.handshakecond = nil return c.hand
写入器每次完成写入数据后,它都需要某种通知机制广播给处于阻塞状态的读取器,告诉它们可以对数据进行访问,这其实跟sync.cond 的 广播机制是不是很像? 有了这个广播机制,我们可以通过sync.cond来实现这个例子了:
package main import ( "bytes" "fmt" "io" "sync" "time" ) type mydatabucket struct { br *bytes.buffer gmutex *sync.rwmutex rcond *sync.cond //读操作需要用到的条件变量 } func newdatabucket() *mydatabucket { buf := make([]byte, 0) db := &mydatabucket{ br: bytes.newbuffer(buf), gmutex: new(sync.rwmutex), } db.rcond = sync.newcond(db.gmutex.rlocker()) return db } func (db *mydatabucket) read(i int) { db.gmutex.rlock() defer db.gmutex.runlock() var data []byte var d byte var err error for { //读取一个字节 if d, err = db.br.readbyte(); err != nil { if err == io.eof { if string(data) != "" { fmt.printf("reader-%d: %s\n", i, data) } db.rcond.wait() data = data[:0] continue } } data = append(data, d) } } func (db *mydatabucket) put(d []byte) (int, error) { db.gmutex.lock() defer db.gmutex.unlock() //写入一个数据块 n, err := db.br.write(d) db.rcond.broadcast() return n, err } func main() { db := newdatabucket() go db.read(1) go db.read(2) for i := 0; i < 10; i++ { go func(i int) { d := fmt.sprintf("data-%d", i) db.put([]byte(d)) }(i) time.sleep(100 * time.millisecond) } }
- 一定要在调用cond.wait方法前,锁定与之关联的读写锁
- 一定不要忘记在cond.wait后,若数据已经处理完毕,在返回前要对与之关联的读写锁进行解锁。
如下面 wait() 的源码所示,cond.wait会自动释放锁等待信号的到来,当信号到来后,第一个获取到信号的wait将继续往下执行并从新上锁
func (c *cond) wait() { c.checker.check() t := runtime_notifylistadd(&c.notify) c.l.unlock() runtime_notifylistwait(&c.notify, t) c.l.lock() }
如果不释放锁, 其它收到信号的gouroutine将阻塞无法继续执行。