欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

再次探讨go实现无限 buffer 的 channel方法

程序员文章站 2022-03-17 16:04:10
前言总所周知,go 里面只有两种 channel,一种是 unbuffered channel, 其声明方式为ch := make(chan interface{})另一种是 buffered cha...

前言

总所周知,go 里面只有两种 channel,一种是 unbuffered channel, 其声明方式为

ch := make(chan interface{})

另一种是 buffered channel,其声明方式为

buffersize := 5
ch := make(chan interface{},buffersize)

对于一个 buffered channel,无论它的 buffer 有多大,它终究是有极限的。这个极限就是该 channel 最初被 make 时,所指定的 buffersize 。

jojo,buffer channel 的大小是有极限的,我不做 channel 了。

一旦 channel 满了的话,再往里面添加元素的话,将会阻塞。

so how can we make a infinite buffer channel?

本文参考了 medinum 上面的一篇文章,有兴趣的同学可以直接阅读原文。

实现

接口的设计

首先当然是建一个 struct,在百度翻译的帮助下,我们将这个 struct 取名为 infinitechannel

type infinitechannel struct {
}

思考一下 channel 的核心行为,实际上就两个,一个流入(fan in),一个流出(fan out),因此我们添加如下几个 method。

func (c *infinitechannel) in(val interface{}) {
	// todo
}

func (c *infinitechannel) out() interface{} {
	// todo
}

内部实现

通过 in() 接收的数据,总得需要一个地方来存放。我们可以用一个 slice 来存放,就算用 in() 往里面添加了很多元素,也可以通过 append() 来拓展 sliceslice 的容量可以无限拓展下去(内存足够的话),所以 channel 也是 infiniteinfinitechannel 的第一个成员就这么敲定下来的。

type infinitechannel struct {
	data    []interface{}
}

用户调用 in()out() 时,可能是并发的环境,在 go 中如何进行并发编程,最容易想到的肯定是 channel 了,因此我们在内部准备两个 channel,一个 inchan,一个 outchan,用 inchan 来接收数据,用 outchan 来流出数据。

type infinitechannel struct {
	inchan  chan interface{}
	outchan chan interface{}
	data    []interface{}
}

func (c *infinitechannel) in(val interface{}) {
	c.inchan <- val
}

func (c *infinitechannel) out() interface{} {
	return <-c.outchan
}

其中, inchanoutchan 都是 unbuffered channel。

此外,也肯定是需要一个 select 来处理来自 inchanoutchan 身上的事件。因此我们另起一个协程,在里面做 select 操作。

func (c *infinitechannel) background() {
	for true {
		select {
		case newval := <-c.inchan:
			c.data = append(c.data, newval)
        case c.outchan <- c.pop():		// pop() 将取出队列的首个元素
		}
	}
}
func newinfinitechannel() *infinitechannel {
	c := &infinitechannel{
		inchan:  make(chan interface{}),
		outchan: make(chan interface{}),
	}
	go c.background()	// 注意这里另起了一个协程
	return c
}

ps:感觉这也算是 go 并发编程的一个套路了。即

  1. 在 new struct 的时候,顺手 go 一个 select 协程,select 协程内执行一个 for 循环,不停的 select,监听一个或者多个 channel 的事件。
  2. struct 对外提供的 method,只会操作 struct 内的 channel(在本例中就是 inchan 和 outchan),不会操作 struct 内的其他数据(在本例中,in() 和 out() 都没有直接操作 data)。
  3. 触发 channel 的事件后,由 select 协程进行数据的更新(在本例中就是 data )。因为只有 select 协程对除 channel 外的数据成员进行读写操作,且 go 保证了对于 channel 的并发读写是安全的,所以代码是并发安全的。
  4. 如果 struct 是 exported ,用户或许会越过 new ,直接手动 make 一个 struct,可以考虑将 struct 设置为 unexported,把它的首字母小写即可。

pop() 的实现也非常简单。

// 取出队列的首个元素,如果队列为空,将会返回一个 nil
func (c *infinitechannel) pop() interface{} {
	if len(c.data) == 0 {
		return nil
	}
	val := c.data[0]
	c.data = c.data[1:]
	return val
}

测试一下

用一个协程每秒钟生产一条数据,另一个协程每半秒消费一条数据,并打印。

func main() {
	c := newinfinitechannel()
	go func() {
		for i := 0; i < 20; i++ {
			c.in(i)
			time.sleep(time.second)
		}
	}()

	for i := 0; i < 50; i++ {
		val := c.out()
		fmt.print(val)
		time.sleep(time.millisecond * 500)
	}
}
// out
<nil>0<nil>1<nil>23<nil>4<nil><nil>5<nil>67<nil><nil>89<nil><nil>1011<nil>12<nil>13<nil>14<nil>15<nil>16<nil>17<nil><nil>1819<nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil>
process finished with the exit code 0

可以看到,将 infinitechannel 内没有数据可供消费时,调用 out() 将会返回一个 nil,不过这也在我们的意料之中,原因是 pop() 在队列为空时,将会返回 nil。

目前 infinitechannel 的行为与标准的 channel 的行为是有出入的,go 中的 channel,在没有数据却仍要取数据时会被阻塞,如何实现这个效果?

优化

我认为此处是是整篇文章最有技巧的地方,我第一次看到时忍不住拍案叫绝。

首先把原来的 background() 摘出来

func (c *infinitechannel) background() {
	for true {
		select {
		case newval := <-c.inchan:
			c.data = append(c.data, newval)
		case c.outchan <- c.pop():
		}
	}
}

outchan 进行一个简单封装

func (c *infinitechannel) background() {
	for true {
		select {
		case newval := <-c.inchan:
			c.data = append(c.data, newval)
		case c.outchanwrapper() <- c.pop():
		}
	}
}
func (c *infinitechannel) outchanwrapper() chan interface{} {
	return c.outchan
}

目前为止,一切照旧。

点睛之笔来了:

func (c *infinitechannel) outchanwrapper() chan interface{} {
	if len(c.data) == 0 {
		return nil
	}
	return c.outchan
}

c.data 为空的时候,返回一个 nil

background() 中,当执行到 case c.outchan <- c.pop(): 时,实际上将会变成:

case nil <- nil:

go 中,是无法往一个 nilchannel 中发送元素的。例如

func main() {
	var c chan interface{}
	select {
	case c <- 1:
	}
}
// fatal error: all goroutines are asleep - deadlock!
func main() {
	var c chan interface{}
	select {
	case c <- 1:
	default:
		fmt.println("hello world")
	}
}
// hello world

因此,对于

select {
case newval := <-c.inchan:
	c.data = append(c.data, newval)
case c.outchanwrapper() <- c.pop():
}

将会一直阻塞在 select 那里,直到 inchan 来了数据。

再测试一下

012345678910111213141516171819fatal error: all goroutines are asleep - deadlock!

最后,程序 panic 了,因为死锁了。

补充

实际上 channel 除了 in()out() 外,还有一个行为,即 close(),如果 channel close 后,依旧从其中取元素的话,将会取出该类型的默认值。

func main() {
	c := make(chan interface{})
	close(c)
	for true {
		v := <-c
		fmt.println(v)
		time.sleep(time.second)
	}
}
// output
// <nil>
// <nil>
// <nil>
// <nil>
func main() {
	c := make(chan interface{})
	close(c)
	for true {
		v, isopen := <-c
		fmt.println(v, isopen)
		time.sleep(time.second)
	}
}
// output
// <nil> false
// <nil> false
// <nil> false
// <nil> false

我们也需要实现相同的效果。

func (c *infinitechannel) close() {
	close(c.inchan)
}

func (c *infinitechannel) background() {
	for true {
		select {
		case newval, isopen := <-c.inchan:
			if isopen {
				c.data = append(c.data, newval)
			} else {
				c.isopen = false
			}
		case c.outchanwrapper() <- c.pop():
		}
	}
}

func newinfinitechannel() *infinitechannel {
	c := &infinitechannel{
		inchan:  make(chan interface{}),
		outchan: make(chan interface{}),
		isopen:  true,
	}
	go c.background()
	return c
}

func (c *infinitechannel) outchanwrapper() chan interface{} {
    // 这里添加了对 c.isopen 的判断
	if c.isopen && len(c.data) == 0 {
		return nil
	}
	return c.outchan
}

再测试一下

func main() {
	c := newinfinitechannel()
	go func() {
		for i := 0; i < 20; i++ {
			c.in(i)
			time.sleep(time.second)
		}
		c.close()		// 这里调用了 close
	}()

	for i := 0; i < 50; i++ {
		val := c.out()
		fmt.print(val)
		time.sleep(time.millisecond * 500)
	}
}
// output
012345678910111213141516171819<nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil>
process finished with the exit code 0

符合预期

遗憾

目前看上去已经很完美了,但是和标准的 channel 相比,仍然有差距。因为标准的 channel 是有这种用法的

v,isopen := <- ch

可以通过 isopen 变量来获取 channel 的开闭情况。

因此 infinitechannel 也应该提供一个类似的 method

func (c *infinitechannel) outandisopen() (interface{}, bool) {
	// todo
}

可惜的是,要想得知 infinitechannel 是否是 open 的,就必定要访问 infinitechannel 内的 isopen 成员。

type infinitechannel struct {
	inchan  chan interface{}
	outchan chan interface{}
	data    []interface{}
	isopen  bool
}

isopen 并非 channel 类型,根据之前的套路,这种非 channel 类型的成员只应该被 select 协程访问。一旦有多个协程访问,就会出现并发问题,除非加锁。

我不能接受!所以干脆不提供这个 method 了,嘿嘿。

完整代码

func main() {
	c := newinfinitechannel()
	go func() {
		for i := 0; i < 20; i++ {
			c.in(i)
			time.sleep(time.second)
		}
		c.close()
	}()

	for i := 0; i < 50; i++ {
		val := c.out()
		fmt.print(val)
		time.sleep(time.millisecond * 500)
	}
}

type infinitechannel struct {
	inchan  chan interface{}
	outchan chan interface{}
	data    []interface{}
	isopen  bool
}

func (c *infinitechannel) in(val interface{}) {
	c.inchan <- val
}

func (c *infinitechannel) out() interface{} {
	return <-c.outchan
}

func (c *infinitechannel) close() {
	close(c.inchan)
}

func (c *infinitechannel) background() {
	for true {
		select {
		case newval, isopen := <-c.inchan:
			if isopen {
				c.data = append(c.data, newval)
			} else {
				c.isopen = false
			}
		case c.outchanwrapper() <- c.pop():
		}
	}
}

func newinfinitechannel() *infinitechannel {
	c := &infinitechannel{
		inchan:  make(chan interface{}),
		outchan: make(chan interface{}),
		isopen:  true,
	}
	go c.background()
	return c
}

// 取出队列的首个元素,如果队列为空,将会返回一个 nil
func (c *infinitechannel) pop() interface{} {
	if len(c.data) == 0 {
		return nil
	}
	val := c.data[0]
	c.data = c.data[1:]
	return val
}

func (c *infinitechannel) outchanwrapper() chan interface{} {
	if c.isopen && len(c.data) == 0 {
		return nil
	}
	return c.outchan
}

参考

以上就是再次探讨go实现无限 buffer 的 channel方法的详细内容,更多关于go无限 buffer 的 channel的资料请关注其它相关文章!

相关标签: go buffer channel