再次探讨go实现无限 buffer 的 channel方法
前言
总所周知,go
里面只有两种 channel
,一种是 unbuffered channel, 其声明方式为
ch := make(chan interface{})
另一种是 buffered channel,其声明方式为
buffersize := 5 ch := make(chan interface{},buffersize)
对于一个 buffered channel,无论它的 buffer 有多大,它终究是有极限的。这个极限就是该 channel 最初被 make 时,所指定的 buffersize 。
jojo,buffer channel 的大小是有极限的,我不做 channel 了。
一旦 channel
满了的话,再往里面添加元素的话,将会阻塞。
so how can we make a infinite buffer channel?
本文参考了 medinum 上面的一篇文章,有兴趣的同学可以直接阅读原文。
实现
接口的设计
首先当然是建一个 struct
,在百度翻译的帮助下,我们将这个 struct
取名为 infinitechannel
type infinitechannel struct { }
思考一下 channel
的核心行为,实际上就两个,一个流入(fan in),一个流出(fan out),因此我们添加如下几个 method。
func (c *infinitechannel) in(val interface{}) { // todo } func (c *infinitechannel) out() interface{} { // todo }
内部实现
通过 in()
接收的数据,总得需要一个地方来存放。我们可以用一个 slice
来存放,就算用 in()
往里面添加了很多元素,也可以通过 append()
来拓展 slice
,slice
的容量可以无限拓展下去(内存足够的话),所以 channel
也是 infinite
。 infinitechannel
的第一个成员就这么敲定下来的。
type infinitechannel struct { data []interface{} }
用户调用 in()
和 out()
时,可能是并发的环境,在 go
中如何进行并发编程,最容易想到的肯定是 channel
了,因此我们在内部准备两个 channel
,一个 inchan
,一个 outchan
,用 inchan
来接收数据,用 outchan
来流出数据。
type infinitechannel struct { inchan chan interface{} outchan chan interface{} data []interface{} } func (c *infinitechannel) in(val interface{}) { c.inchan <- val } func (c *infinitechannel) out() interface{} { return <-c.outchan }
其中, inchan
和 outchan
都是 unbuffered channel。
此外,也肯定是需要一个 select
来处理来自 inchan
和 outchan
身上的事件。因此我们另起一个协程,在里面做 select
操作。
func (c *infinitechannel) background() { for true { select { case newval := <-c.inchan: c.data = append(c.data, newval) case c.outchan <- c.pop(): // pop() 将取出队列的首个元素 } } } func newinfinitechannel() *infinitechannel { c := &infinitechannel{ inchan: make(chan interface{}), outchan: make(chan interface{}), } go c.background() // 注意这里另起了一个协程 return c }
ps:感觉这也算是 go 并发编程的一个套路了。即
- 在 new struct 的时候,顺手 go 一个 select 协程,select 协程内执行一个 for 循环,不停的 select,监听一个或者多个 channel 的事件。
- struct 对外提供的 method,只会操作 struct 内的 channel(在本例中就是 inchan 和 outchan),不会操作 struct 内的其他数据(在本例中,in() 和 out() 都没有直接操作 data)。
- 触发 channel 的事件后,由 select 协程进行数据的更新(在本例中就是 data )。因为只有 select 协程对除 channel 外的数据成员进行读写操作,且 go 保证了对于 channel 的并发读写是安全的,所以代码是并发安全的。
- 如果 struct 是 exported ,用户或许会越过 new ,直接手动 make 一个 struct,可以考虑将 struct 设置为 unexported,把它的首字母小写即可。
pop()
的实现也非常简单。
// 取出队列的首个元素,如果队列为空,将会返回一个 nil func (c *infinitechannel) pop() interface{} { if len(c.data) == 0 { return nil } val := c.data[0] c.data = c.data[1:] return val }
测试一下
用一个协程每秒钟生产一条数据,另一个协程每半秒消费一条数据,并打印。
func main() { c := newinfinitechannel() go func() { for i := 0; i < 20; i++ { c.in(i) time.sleep(time.second) } }() for i := 0; i < 50; i++ { val := c.out() fmt.print(val) time.sleep(time.millisecond * 500) } }
// out <nil>0<nil>1<nil>23<nil>4<nil><nil>5<nil>67<nil><nil>89<nil><nil>1011<nil>12<nil>13<nil>14<nil>15<nil>16<nil>17<nil><nil>1819<nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil> process finished with the exit code 0
可以看到,将 infinitechannel
内没有数据可供消费时,调用 out()
将会返回一个 nil
,不过这也在我们的意料之中,原因是 pop()
在队列为空时,将会返回 nil。
目前 infinitechannel
的行为与标准的 channel
的行为是有出入的,go
中的 channel
,在没有数据却仍要取数据时会被阻塞,如何实现这个效果?
优化
我认为此处是是整篇文章最有技巧的地方,我第一次看到时忍不住拍案叫绝。
首先把原来的 background()
摘出来
func (c *infinitechannel) background() { for true { select { case newval := <-c.inchan: c.data = append(c.data, newval) case c.outchan <- c.pop(): } } }
对 outchan
进行一个简单封装
func (c *infinitechannel) background() { for true { select { case newval := <-c.inchan: c.data = append(c.data, newval) case c.outchanwrapper() <- c.pop(): } } } func (c *infinitechannel) outchanwrapper() chan interface{} { return c.outchan }
目前为止,一切照旧。
点睛之笔来了:
func (c *infinitechannel) outchanwrapper() chan interface{} { if len(c.data) == 0 { return nil } return c.outchan }
在 c.data
为空的时候,返回一个 nil
在 background()
中,当执行到 case c.outchan <- c.pop():
时,实际上将会变成:
case nil <- nil:
在 go
中,是无法往一个 nil
的 channel
中发送元素的。例如
func main() { var c chan interface{} select { case c <- 1: } } // fatal error: all goroutines are asleep - deadlock! func main() { var c chan interface{} select { case c <- 1: default: fmt.println("hello world") } } // hello world
因此,对于
select { case newval := <-c.inchan: c.data = append(c.data, newval) case c.outchanwrapper() <- c.pop(): }
将会一直阻塞在 select
那里,直到 inchan
来了数据。
再测试一下
012345678910111213141516171819fatal error: all goroutines are asleep - deadlock!
最后,程序 panic
了,因为死锁了。
补充
实际上 channel
除了 in()
和 out()
外,还有一个行为,即 close()
,如果 channel close 后,依旧从其中取元素的话,将会取出该类型的默认值。
func main() { c := make(chan interface{}) close(c) for true { v := <-c fmt.println(v) time.sleep(time.second) } } // output // <nil> // <nil> // <nil> // <nil> func main() { c := make(chan interface{}) close(c) for true { v, isopen := <-c fmt.println(v, isopen) time.sleep(time.second) } } // output // <nil> false // <nil> false // <nil> false // <nil> false
我们也需要实现相同的效果。
func (c *infinitechannel) close() { close(c.inchan) } func (c *infinitechannel) background() { for true { select { case newval, isopen := <-c.inchan: if isopen { c.data = append(c.data, newval) } else { c.isopen = false } case c.outchanwrapper() <- c.pop(): } } } func newinfinitechannel() *infinitechannel { c := &infinitechannel{ inchan: make(chan interface{}), outchan: make(chan interface{}), isopen: true, } go c.background() return c } func (c *infinitechannel) outchanwrapper() chan interface{} { // 这里添加了对 c.isopen 的判断 if c.isopen && len(c.data) == 0 { return nil } return c.outchan }
再测试一下
func main() { c := newinfinitechannel() go func() { for i := 0; i < 20; i++ { c.in(i) time.sleep(time.second) } c.close() // 这里调用了 close }() for i := 0; i < 50; i++ { val := c.out() fmt.print(val) time.sleep(time.millisecond * 500) } }
// output 012345678910111213141516171819<nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil> process finished with the exit code 0
符合预期
遗憾
目前看上去已经很完美了,但是和标准的 channel
相比,仍然有差距。因为标准的 channel
是有这种用法的
v,isopen := <- ch
可以通过 isopen
变量来获取 channel
的开闭情况。
因此 infinitechannel
也应该提供一个类似的 method
func (c *infinitechannel) outandisopen() (interface{}, bool) { // todo }
可惜的是,要想得知 infinitechannel
是否是 open
的,就必定要访问 infinitechannel
内的 isopen
成员。
type infinitechannel struct { inchan chan interface{} outchan chan interface{} data []interface{} isopen bool }
而 isopen
并非 channel
类型,根据之前的套路,这种非 channel
类型的成员只应该被 select
协程访问。一旦有多个协程访问,就会出现并发问题,除非加锁。
我不能接受!所以干脆不提供这个 method 了,嘿嘿。
完整代码
func main() { c := newinfinitechannel() go func() { for i := 0; i < 20; i++ { c.in(i) time.sleep(time.second) } c.close() }() for i := 0; i < 50; i++ { val := c.out() fmt.print(val) time.sleep(time.millisecond * 500) } } type infinitechannel struct { inchan chan interface{} outchan chan interface{} data []interface{} isopen bool } func (c *infinitechannel) in(val interface{}) { c.inchan <- val } func (c *infinitechannel) out() interface{} { return <-c.outchan } func (c *infinitechannel) close() { close(c.inchan) } func (c *infinitechannel) background() { for true { select { case newval, isopen := <-c.inchan: if isopen { c.data = append(c.data, newval) } else { c.isopen = false } case c.outchanwrapper() <- c.pop(): } } } func newinfinitechannel() *infinitechannel { c := &infinitechannel{ inchan: make(chan interface{}), outchan: make(chan interface{}), isopen: true, } go c.background() return c } // 取出队列的首个元素,如果队列为空,将会返回一个 nil func (c *infinitechannel) pop() interface{} { if len(c.data) == 0 { return nil } val := c.data[0] c.data = c.data[1:] return val } func (c *infinitechannel) outchanwrapper() chan interface{} { if c.isopen && len(c.data) == 0 { return nil } return c.outchan }
参考
以上就是再次探讨go实现无限 buffer 的 channel方法的详细内容,更多关于go无限 buffer 的 channel的资料请关注其它相关文章!