python 分布式计算
dispy简介
dispy论其实现,还是比较复杂的,但由于大牛们精巧的封装和设计,dispy作为一项工具,非常容易入手并深入使用。dispy既适用于在单机多处理器(SMP)环境下并行计算,也适用在计算机集群上进行并行计算。dispy没有提供各个子任务的通信机制,如有需要,可以直接使用asyncoro库进行通信和同步。
dispy支持python2.7+和python3.1+版本,使用pip工具可以直接安装:sudo pip install dispy。
dispy使用
dispy的精简性的最大体现,就是其逻辑结构极其扁平。它只有四个组件:dispy.py、dispynode.py、dispyscheduler.py、dispynetrelay.py。
拿一个集群来说,每一个主机都可以认为是一个node,而每次提交任务的主体,可以认为是一个client。client提交计算任务,而node进行实际计算。client分两种类型,JobCluster和SharedJobCluster,二者功能基本一致,区别在于,一个主机上只可以有一个JobCluster实例,而可以存在多个SharedJobCluster实例。dispynode.py、dispyscheduler.py分别对应于两种client的node。
dispynetrelay的作用在于对跨域分布式计算提供支持。
node创建
node的创建不需要编写任何代码,在成功安装dispy后,直接在命令行输入dispynode.py即可运行node(–daemon可以创建守护进程运行)。node运行起来后,即可以提交任务进行计算。在创建node时也可以同时指定参数,如设定本地使用的最多CPU数量、向client传递文件时的最大size、指定端口等等。
创建client提交任务
前文已经提到过,dispy可以创建分两种类型client:JobCluster和SharedJobCluster。JobCluster的定义如下:
Class dispy.JobCluster (computation, nodes=[‘*’], depends=[], callback=None, cluster_status=None, ip_addr=None, ext_ip_addr=None, port=51347, node_port=51348, recover_file=None, dest_path=None, loglevel=logging.WARNING, setup=None, cleanup=True, pulse_interval=None, ping_interval=None, reentrant=False, secret=”, keyfile=None, certfile=None)
重要参数说明:
computation:计算主体,可以是一个函数(直接调用),也可以是一个可执行文件(用字符串表示);
node:依赖的机群中其他node的IP或域名;
depends:依赖的模块、文件或者类,如果其他node没有安装该client所在环境所具有的模块,可以通过此方式共享;
callback:回调函数,指定提交的任务返回中间值或者完成后要执行的行为;
cluster_status:用户指定的函数,在node状态变化时执行该函数;
ip_addr,ext_ip_addr:NAT相关,指定外网IP;
port,node_port:指定与其他client和node通信的端口;
recover_file:存放client运行状态的文件的名称,可以用来恢复client;
dest_path:各个client回传文件时在本地存放文件的路径;
loglevel:日志级别;
setup:指定各个node在运行任务时的初始化函数;
clean:是否清除分布任务生成的文件;
pulse_interval=None, ping_interval=None, reentrant=False:心跳保活相关,reentrant指定node已挂时的策略,reentrant为True时该node在自己已挂时会将任务移交其他node进行,reentrant为False时,node节点挂掉的任务会被取消;
secret,keyfile,certfile:通信安全相关以及SSH相关参数。
SharedJobCluster与JobCluster几乎一致,不再赘述。
Class dispy.SharedJobCluster(computation, nodes=[‘*’], depends=[], ip_addr=None, port=51347, scheduler_node=None, scheduler_port=None, ext_ip_addr=None, dest_path=None, loglevel=logging.WARNING, cleanup=True, reentrant=False, exclusive=False, secret=”, keyfile=None, certfile=None)
建立起来的Cluster可以通过submit方法提交任务,之后可以通过该方法返回的Job获取id,status(Created, Running, Finished, Cancelled orTerminated),ip_addr。提交的任务实体可以被称为Job,最终结果的返回值包含result,std_out,std_err,exception,start_time,end_time。
executed on each node before any jobs are scheduled def setup(data_file): # read data in file to global variable global data, algorithms # if running under Windows, modules can't be global, as they are not # serializable; instead, they must be loaded in 'compute' (jobs); under # Posix (Linux, OS X and other Unix variants), modules declared global in # 'setup' will be available in 'compute' # 'os' module is already available (loaded by dispynode) if os.name != 'nt': # if not Windows, load hashilb module in global scope global hashlib import hashlib data = open(data_file).read() # read file in to memory; data_file can now be deleted if sys.version_info.major > 2: data = data.encode() # convert to bytes algorithms = list(hashlib.algorithms_guaranteed) else: algorithms = hashlib.algorithms return 0 # successfull initialization should return 0 def cleanup(): global data del data if os.name != 'nt': global hashlib del hashlib def compute(n): if os.name == 'nt': # Under Windows modules must be loaded in jobs import hashlib # 'data' and 'algorithms' global variables are initialized in 'setup' alg = algorithms[n % len(algorithms)] csum = getattr(hashlib, alg)() csum.update(data) return (alg, csum.hexdigest()) if __name__ == '__main__': import dispy, sys, functools # if no data file name is given, use this file as data file data_file = sys.argv[1] if len(sys.argv) > 1 else sys.argv[0] cluster = dispy.JobCluster(compute, depends=[data_file], setup=functools.partial(setup, data_file), cleanup=cleanup) jobs = [] for n in range(10): job = cluster.submit(n) job.id = n jobs.append(job) for job in jobs: job() if job.status == dispy.DispyJob.Finished: print('%s: %s : %s' % (job.id, job.result[0], job.result[1])) else: print(job.exception) cluster.print_status() cluster.close()
job computation runs at dispynode servers def compute(path): import hashlib, time, os csum = hashlib.sha1() with open(os.path.basename(path), 'rb') as fd: while True: data = fd.read(1024000) if not data: break csum.update(data) time.sleep(5) return csum.hexdigest() # 'cluster_status' callback function. It is called by dispy (client) # to indicate node / job status changes. Here node initialization and # job done status are used to schedule jobs, so at most one job is # running on a node (even if a node has more than one processor). Data # files are assumed to be 'data000', 'data001' etc. def status_cb(status, node, job): if status == dispy.DispyJob.Finished: print('sha1sum for %s: %s' % (job.id, job.result)) elif status == dispy.DispyJob.Terminated: print('sha1sum for %s failed: %s' % (job.id, job.exception)) elif status == dispy.DispyNode.Initialized: print('node %s with %s CPUs available' % (node.ip_addr, node.avail_cpus)) else: # ignore other status messages return global submitted data_file = 'data%03d' % submitted if os.path.isfile(data_file): submitted += 1 # 'node' and 'dispy_job_depends' are consumed by dispy; # 'compute' is called with only 'data_file' as argument(s) job = cluster.submit_node(node, data_file, dispy_job_depends=[data_file]) job.id = data_file if __name__ == '__main__': import dispy, sys, os cluster = dispy.JobCluster(compute, cluster_status=status_cb) submitted = 0 while True: try: cmd = sys.stdin.readline().strip().lower() except KeyboardInterrupt: break if cmd == 'quit' or cmd == 'exit': break cluster.wait() cluster.print_status()
a version of word frequency example from mapreduce tutorial def mapper(doc): # input reader and map function are combined import os words = [] with open(os.path.join('/tmp', doc)) as fd: for line in fd: words.extend((word.lower(), 1) for word in line.split() \ if len(word) > 3 and word.isalpha()) return words def reducer(words): # we should generate sorted lists which are then merged, # but to keep things simple, we use dicts word_count = {} for word, count in words: if word not in word_count: word_count[word] = 0 word_count[word] += count # print('reducer: %s to %s' % (len(words), len(word_count))) return word_count if __name__ == '__main__': import dispy, logging # assume nodes node1 and node2 have 'doc1', 'doc2' etc. on their # local storage, so no need to transfer them map_cluster = dispy.JobCluster(mapper, nodes=['node1', 'node2'], reentrant=True) # any node can work on reduce reduce_cluster = dispy.JobCluster(reducer, nodes=['*'], reentrant=True) map_jobs = [] for f in ['doc1', 'doc2', 'doc3', 'doc4', 'doc5']: job = map_cluster.submit(f) map_jobs.append(job) reduce_jobs = [] for map_job in map_jobs: words = map_job() if not words: print(map_job.exception) continue # simple partition n = 0 while n < len(words): m = min(len(words) - n, 1000) reduce_job = reduce_cluster.submit(words[n:n+m]) reduce_jobs.append(reduce_job) n += m # reduce word_count = {} for reduce_job in reduce_jobs: words = reduce_job() if not words: print(reduce_job.exception) continue for word, count in words.iteritems(): if word not in word_count: word_count[word] = 0 word_count[word] += count # sort words by frequency and print for word in sorted(word_count, key=lambda x: word_count[x], reverse=True): count = word_count[word] print(word, count) reduce_cluster.print_status()