Julia并行计算笔记(二)
本文是《Julia语言程序设计》第14章的读书笔记,包含个人理解,内容可能不准确。建议参考原著。
四、远程调用之一
上一节讲了Julia的协程(Task)级并行,本节讲的是进程(Worker)级并行。协程只能在单台计算机上并行,而进程可以在多台计算机上并行。一切开始前,首先仍要using Distributed
,并且要addprocs(n
)或julia -p n
来开启多个Worker。
远程调用,指通过本地Worker在远程Worker中启动某一函数或表达式。对于函数,用remotecall()
实现远程调用:
remotecall(函数, 远程Worker的PID, 函数参数)
例如:在本地Worker上调用rand()
函数创建一个2x3的随机数组,表达式为rand(2,3)
。如果要在PID=4的远程Worker上做这件事,那么应写成:
remotecall(rand,4,2,3)
远程调用后不会立即返回结果到本地,需要使用fetch()
提取结果,例如:
julia> r = remotecall(rand,4,2,3)
Future(4, 1, 5, nothing)
julia> fetch(r)
2×3 Array{Float64,2}:
0.466937 0.761268 0.975553
0.754082 0.025674 0.824383
注意这里的fetch()
跟之前Task并行不一样,会移除远程Worker上的数据。如果提取不到结果,它会阻塞直到有结果为止。fetch()
提取的结果会缓存在本地Worker上,具体来说是存在r的内部的一个类型为Future的对象中。
我们要专门讲一下Future对象。Future(远程PID, 本地PID, Future ID, 远程结果)
是一个存储远程调用信息的对象,会在远程调用时立即返回,但这时最后一项只包含nothing
。当fetch()
从远程提取结果后,会把结果填入nothing
的位置。实际上,我们可以自己创建一个Future对象,例如:
julia> Future(100)
Future(100, 1, 34, nothing)
这个Future的远程PID是100,本地PID是1,自己的编号是34(说明它是你创建的第34个Future)。不过这个Future不属于任何一个远程调用,所以不会有数据填充nothing
位置。
像Future这样的存储远程调用信息的对象,称之为“远程引用”(所以远程引用是一个对象而不是一个操作)。另一个远程引用是RemoteChannel
,是上一节Channel
的跨进程版本,用于进程间交换数据。或者这样说,Channel
是协程间管道,RemoteChannel
是进程间管道。
现在回头来看fetch()
。提取结果后,可以在本地Worker上重复使用fetch(r)
来多次使用结果,或者干脆把结果赋值给一个新的对象:
julia> result = fetch(r);
julia> result
2×3 Array{Float64,2}:
0.466937 0.761268 0.975553
0.754082 0.025674 0.824383
自然地,又有一步到位的技巧,即remotecall_fetch()
,可以把上面的示例缩写为:
result = remotecall_fetch(rand,4,2,3)
同样地,它会一直阻塞直到成功提取结果。由于Future对象会消耗时间,如果不需要提取结果,那么可使用不含Future对象的remote_do()
代替remotecall()
。不过这里有个细节是:当多次远程调用时,remotecall()
会依次执行,而remote_do()
则是无序的。
对于表达式(强调一下,表达式即是赋参的函数),我们可用宏命令@spawnat
来实现远程调用。例如:
julia> s = @spawnat 4 rand(2,3)
Future(4, 1, 7, nothing)
julia> fetch(s)
2×3 Array{Float64,2}:
0.375975 0.844135 0.257647
0.057513 0.169291 0.0544206
当然也可以这样写:
julia> fetch(@spawnat 4 rand(2,3))
2×3 Array{Float64,2}:
0.57577 0.500889 0.228997
0.268749 0.295895 0.0822172
再做得更复杂一点:
julia> fetch(@spawnat 3 (1).+fetch(s))
2×3 Array{Float64,2}:
1.37598 1.84413 1.25765
1.05751 1.16929 1.05442
这里的表达式(1).+fetch(s)
的意思是把fetch(s)
逐个加1。注意要写成(1).+fetch(s)
而不是书中的1 .+fetch(s)
,否则会报错。貌似是1.1版本的变化之一。
另一个宏命令@spawn
不需要指定PID,用起来更方便得多。所以一般用它代替@spawnat
或remotecall()
。例如:
julia> fetch(@spawn (1).+fetch(@spawn rand(2,3)))
2×3 Array{Float64,2}:
1.14194 1.57693 1.90071
1.88392 1.31092 1.51812
小贴士:用
myid()
可以查询当前进程的PID。如果直接调用,会返回1;如果在远程调用,则会返回远程Worker的PID。
但它不能代替remote_do()
,因为后者不返回结果。此外,如果远程Worker是已知空闲的,指定PID会稍微快一点(可惜多数情况下我们并不清楚哪些Worker是空闲的)。
宏命令也有“一步到位”的技巧!我们可以把fetch(@spawn 表达式)
简写为@fetch 表达式
,把fetch(@spawnat PID 表达式)
简写为@fetchfrom PID 表达式
。例如:
julia> @fetch rand(2,3)
2×3 Array{Float64,2}:
0.0622937 0.93881 0.471734
0.576323 0.621816 0.713404
与之前的“一步到位”类似,由于把调用和提取合并为一个命令,所以本地Worker会阻塞,等待远程Worker返回结果之后才继续。这种调用方式称为“同步调用”。如果把调用和提取拆分开来,本地Worker就可以在调用之后去做别的事情,直到远程Worker通知它,再来提取结果。这种方式称为“异步调用”。
异步调用就是你 喊 你朋友吃饭 ,你朋友说知道了 ,待会忙完去找你 ,你就去做别的了。
同步调用就是你 喊 你朋友吃饭 ,你朋友在忙 ,你就一直在那等,等你朋友忙完了 ,你们一起去。
现在我们来看一组例子加深对同步性质的理解:
# 例1
julia> @time @spawn sleep(3)
0.000288 seconds (112 allocations: 5.891 KiB)
Future(4, 1, 24, nothing)
# 例2
julia> @time @sync @spawn sleep(3)
3.014166 seconds (2.96 k allocations: 173.319 KiB)
Future(2, 1, 25, nothing)
# 例3
julia> @time @sync @fetch rand(2,3)
0.015145 seconds (152 allocations: 7.703 KiB)
2×3 Array{Float64,2}:
0.665148 0.607531 0.563096
0.506471 0.748635 0.588137
@time
是计时的宏命令。可以看到,在例1中,调用后立即返回了Future对象,但此时远程Worker仍未执行完毕。例2添加了一个@sync
宏命令,它会强制令Future对象在远程Worker执行完毕后才返回。例3表明@sync
亦可作用于@fetch
。实际上,@sync
可作用于@spawn
、@spawnat
、@fetch
、@fecthfrom
、@async
、@distributed
(后文介绍),但不适用于异步操作如remotecall()
和remote_do()
。
最后我们讨论一下数据跨进程传输的问题。在远程调用一个表达式时,表达式中的参数会自动传输到远程Worker上,例如:
julia> A = rand(1000,1000);
julia> @time @spawn A^2
0.251465 seconds (496.78 k allocations: 24.047 MiB, 6.12% gc time)
Future(3, 1, 31, nothing)
这里传输到远程Worker上的参数是A。换一种方式写:
julia> @time @spawn rand(1000,1000)^2
0.000250 seconds (123 allocations: 6.588 KiB)
Future(4, 1, 32, nothing)
这里传输的参数是1000,1000
,显然比A的数据量小,于是Future对象返回得更快了。两种写法各有好处:如果你觉得跨进程传输A的代价相比于A的运算小很多,那么选第一种,否则选第二种。写法的差异会对Julia运行效率产生显著影响。