Kotlin协程通信机制: Channel
coroutines channels
java中的多线程通信, 总会涉及到共享状态(shared mutable state)的读写, 有同步, 死锁等问题要处理.
协程中的channel用于协程间的通信, 它的宗旨是:
do not communicate by sharing memory; instead, share memory by communicating.
channel basics
channels用于协程间的通信, 允许我们在不同的协程间传递数据(a stream of values).
生产者-消费者模式
发送数据到channel的协程被称为producer
, 从channel接受数据的协程被称为consumer
.
生产: send, produce.
消费: receive, consume.
当需要的时候, 多个协程可以向同一个channel发送数据, 一个channel的数据也可以被多个协程接收.
当多个协程从同一个channel接收数据的时候, 每个元素仅被其中一个consumer消费一次. 处理元素会自动将其从channel里删除.
channel的特点
channel
在概念上有点类似于blockingqueue
, 元素从一端被加入, 从另一端被消费. 关键的区别在于, 读写的方法不是blocking的, 而是suspending的.
在为空或为满时. channel可以suspend它的send
和receive
操作.
channel的关闭和迭代
channel可以被关闭, 说明没有更多的元素了.
取消producer协程也会关闭channel.
在receiver端有一种方便的方式来接收: 用for
迭代.
看这个例子:
fun main() = runblocking<unit> { val channel = channel<int>() launch { for (x in 1..5) channel.send(x) channel.close() // we're done sending } // here we print received values using `for` loop (until the channel is closed) for (y in channel) println(y) println("done!") }
运行后会输出:
1 2 3 4 5 done! process finished with exit code 0
如果注释掉channel.close()
就会变成:
1 2 3 4 5
done没有被输出, 程序也没有退出, 这是因为接受者协程还在一直等待.
不同的channel类型
库中定义了多个channel类型, 它们的主要区别在于:
- 内部可以存储的元素数量;
-
send
是否可以被挂起.
所有channel类型的receive
方法都是同样的行为: 如果channel不为空, 接收一个元素, 否则挂起.
channel的不同类型:
- rendezvous channel: 0尺寸buffer,
send
和receive
要meet on time, 否则挂起. (默认类型). - unlimited channel: 无限元素,
send
不被挂起. - buffered channel: 指定大小, 满了之后
send
挂起. - conflated channel: 新元素会覆盖旧元素, receiver只会得到最新元素,
send
永不挂起.
创建channel:
val rendezvouschannel = channel<string>() val bufferedchannel = channel<string>(10) val conflatedchannel = channel<string>(conflated) val unlimitedchannel = channel<string>(unlimited)
默认是rendezvous channel.
练习: 分析代码输出
看这段代码:
fun main() = runblocking<unit> { val channel = channel<string>() launch { channel.send("a1") channel.send("a2") log("a done") } launch { channel.send("b1") log("b done") } launch { repeat(3) { val x = channel.receive() log(x) } } } fun log(message: any?) { println("[${thread.currentthread().name}] $message") }
这段代码创建了一个channel, 传递string类型的元素.
两个producder协程, 分别向channel发送不同的字符串, 发送完毕后打印各自的"done".
一个receiver协程, 接收channel中的3个元素并打印.
程序的运行输出结果会是怎样呢?
记得在configurations中加上vm options: -dkotlinx.coroutines.debug
. 可以看到协程信息.
答案揭晓:
[main @coroutine#4] a1 [main @coroutine#4] b1 [main @coroutine#2] a done [main @coroutine#3] b done [main @coroutine#4] a2
答对了吗?
为什么会是这样呢? 原因主要有两点:
- 这里创建的channel是默认的rendezvous类型, 没有buffer, send和receive必须要meet, 否则挂起.
- 两个producer和receiver协程都运行在同一个线程上, ready to be resumed也只是加入了一个等待队列, resume要按顺序来.
这个例子在introduction to coroutines and channels中有一个视频解说.
另外, 官方文档中还有一个ping-pang的例子, 为了说明channels are fair.
参考
推荐阅读
-
使用kotlin协程提高app性能(译)
-
Kotlin+MVVM+Retrofit+协程+ViewBinding+EventBus
-
一文彻底搞懂Kotlin中的协程
-
Kotlin协程到底是如何切换线程的
-
Kotlin协程通信机制: Channel
-
go 利用chan的阻塞机制,实现协程的开始、阻塞、返回控制器
-
Kotlin 协程 + Spring webflux 开发后端
-
Android MVVM架构Demo(kotlin+databinding+livedata+lifecycle+协程+retrofit)
-
kotlin使用timerTask{}执行协程任务失效问题
-
go 利用chan的阻塞机制,实现协程的开始、阻塞、返回控制器