channel的关闭的经典场景
程序员文章站
2022-06-27 21:18:37
关闭原则 一个常用的使用Go通道的原则是不要在数据接收方或者在有多个发送者的情况下关闭通 道。 通用的原则是不要关闭已关闭的通道 错误关闭 go func Write(dataCh chan int){ for{ if value:=rand.Intn(1000);value!=0{ time.Sl ......
关闭原则
一个常用的使用go通道的原则是不要在数据接收方或者在有多个发送者的情况下关闭通
道。
通用的原则是不要关闭已关闭的通道
错误关闭
ch:=make(chan int,1) go func(ch chan int) { ch<-1 log.println("ok1") ch<-2 //阻塞1秒 log.println("ok2") ch<-3 //报错 因为下面把管道关闭了 }(ch) go func(ch chan int) { log.println("进入读") time.sleep(time.second) log.println(<-ch) close(ch) }(ch) time.sleep(time.hour)
优雅关闭
do once代表只能执行一次,防止重复关闭
type mychannel struct { c chan int once sync.once } func (mychannel mychannel) safeclose(){ mychannel.once.do(func() { close(mychannel.c) }) //once代表只执行一次 } func main() { write:=mychannel{c:make(chan int)} go func() {write.c<-1;write.safeclose()}() go func() {log.println(<-write.c)}() time.sleep(time.hour) }
关闭情况
1、多个接收者,一个发送者
这里只有一个发送者,当value=0时,这个发送者关闭管道,这里只有一个写进程,所以不用担心重复关闭问题,写进程可以直接关闭管道
这时,所有的接受者for-range会全部结束
这里的结束信号由写者发出
func write(datach chan int){ for{ if value:=rand.intn(1000);value!=0{ time.sleep(time.microsecond*500) datach<-value }else{ close(datach) } } } const numreceivers =100 var wg=sync.waitgroup{} //多个接收者 func readers(datach <-chan int){ for i:=0;i<numreceivers;i++{ go func() { defer wg.done() for value:=range datach{ log.println(value) } }() } } func main() { rand.seed(time.now().unixnano()) wg.add(numreceivers) datach:=make(chan int) go write(datach) readers(datach) wg.wait() }
2、【变种】多个接收者,一个发送者,但是关闭信号是由第三方发送
这里用了两个关闭信号
closing:用来通知写携程可以停止写了
closed:用来表示closing信号已经由一个第三方发出过了,后面的第三方可以不用再发了
package main import ( "log" "math/rand" "sync" "time" ) func main() { rand.seed(time.now().unixnano()) const numreceivers = 100 const numthirdparties = 100 //第三方 wg4:=sync.waitgroup{} wg4.add(numreceivers) datach2:=make(chan int) closing:=make(chan bool) //通知写进程 关闭的信号 closed:=make(chan bool) //通道已经关闭的 信号 stop:= func() { select { case closing<-true: //通知关闭 <-closed //有一个写进程收到通知 则结束 case <-closed: //如果已经关闭 则执行这一句 所以多次调用stop是安全非阻塞的 } } //多个第三方携程 for i:=0;i<numthirdparties;i++{ go func() { time.sleep(time.second*time.duration(rand.intn(100))) stop() }() } //发送者 1 go func() { defer func() { close(closed) //通知第三方 已经关闭了 close(datach2) //关闭数据通道 }() for{ select { case <-closing: return default: } select { case <-closing: //收到第三方的关闭请求 return case datach2<-rand.intn(100000): } } }() for i:=0;i<numreceivers;i++{ go func() { defer wg4.done() for value:=range datach2{ log.println(value) } }() } wg4.wait() }
3、多个发送者,一个接收者
这里有10000个发送者,为了遵循 管道不能被读者关闭的原则,于是这里又建立了一个stopch非缓冲管道,目的只是为了通知多个发送者,可以不用发送了,发送者先检查有没有关闭信号,然后再发送。这里用了两次select,来检查 。
同时注意,为了防止重复关闭管道,这里没有让写进程关闭datach,当所有读写进程都不再引用datach的时候,就会被gc
这里的关闭信号由读者发出
const numsenders = 10000 var wg2 = sync.waitgroup{} var datach = make(chan int) var stopch = make(chan bool) func sender(){ for i:=0;i<numsenders;i++{ go func() { select { case <-stopch: //通道关闭后直接执行这个 return default: } select { case <-stopch: //当通道被关闭 则直接执行这个 return //直接返回 case datach<-rand.intn(1000): } }() } } func receive(){ defer wg2.done() for value:=range datach{ log.println(value) if value==0{ close(stopch) return } } } func main() { wg2.add(1) rand.seed(time.now().unixnano()) go sender() go receive() wg2.wait() }
4、多个接收者和多个发送者
这种情况比较复杂,必须叫一个中间人来通知双方,当读写进程需要关闭,通过tostop管道通知中间人,中间人收到通知然后关闭stopch2管道,这个管道是让读写进程结束读写的信号。
tostop的容量必须设置成**>=1** 如果设置成非缓冲管道,那么如果中间人没有准备好,要发出结束信号时会阻塞,就会转到default,从而让结束信号丢失。
这里的结束信号读写进程都可以发出
var numsends = 100 var numrecives = 50 var wg3 = sync.waitgroup{} func main() { rand.seed(time.now().unixnano()) wg3.add(numrecives) datach2 := make(chan int) stopch2 := make(chan bool) tostop := make(chan bool, 1) //通知中间者可以关闭了 至少为1 //中间人 go func() { _ = <-tostop close(stopch2) }() //发送者 for i := 0; i < numsends; i++ { go func() { for { v := rand.intn(1000) if v == 0 { //写进程通知关闭 select { case tostop <- true: datach2 <- v log.println("===========================", 0, "-") default: } return } //尝试关闭 select { case <-stopch2: return default: } select { case <-stopch2: return case datach2 <- v: log.println(v) } } }() } //接收者 for i := 0; i < numrecives; i++ { go func() { defer wg3.done() for { select { case <-stopch2: return default: } select { case <-stopch2: return case val := <-datach2: log.println(val) if val == 888 { //读进程通知关闭 select { case tostop <- true: log.println("==========================", 888, "-") default: } return } } } }() } wg3.wait() }
上一篇: 浅谈浅拷贝与深拷贝