Python3 系列之 并行编程
进程和线程
进程是程序运行的实例。一个进程里面可以包含多个线程,因此同一进程下的多个线程之间可以共享线程内的所有资源,它是操作系统动态运行的基本单元;每一个线程是进程下的一个实例,可以动态调度和独立运行,由于线程和进程有很多类似的特点,因此,线程又被称为轻量级的进程。线程的运行在进程之下,进程的存在依赖于线程;
开胃菜
基于 python3 创建一个简单的进程示例
from threading import thread from time import sleep class cookbook(thread): def __init__(self): thread.__init__(self) self.message = "hello parallel python cookbook!!\n" def print_message(self): print(self.message) def run(self): print("thread starting\n") x = 0 while x < 10: self.print_message() sleep(2) x += 1 print("thread ended!\n") print("process started") hello_python = cookbook() hello_python.start() print("process ended")
需要注意的是,永远不要让线程在后台默默执行,当其执行完毕后要及时释放资源。
基于线程的并行
多线程编程一般使用共享内存空间进行线程间的通信,这就使管理内存空间成为多线程编程的关键。python 通过标准库 threading 模块来管理线程,具有以下的组件:
- 线程对象
- lock 对象
- rlock 对象
- 信号对象
- 条件对象
- 事件对象
定义一个线程
基本语法
示例代码如下所示
import threading def function(i): print("function called by thread: {0}".format(i)) return threads = [] for i in range(5): t = threading.thread(target=function, args=(i,)) threads.append(t) t.start() lambda t, threads: t.join()
需要注意的是,线程创建后并不会自动运行,需要主动调用 start() 方法来启动线程,join() 会让调用它的线程被阻塞直到执行结束。(ps:可通过调用 t.setdaemon(true) 使其为后台线程避免主线程被阻塞)
线程定位
示例代码如下所示
import threading import time def first_function(): print("{0} is starting".format(threading.currentthread().getname())) time.sleep(2) print("{0} is exiting".format(threading.currentthread().getname())) def second_function(): print("{0} is starting".format(threading.currentthread().getname())) time.sleep(2) print("{0} is exiting".format(threading.currentthread().getname())) def third_function(): print("{0} is starting".format(threading.currentthread().getname())) time.sleep(2) print("{0} is exiting".format(threading.currentthread().getname())) if __name__ == "__main__": t1 = threading.thread(target=first_function,name="first") t2 = threading.thread(target=second_function,name="second") t3 = threading.thread(target=third_function,name="third") t1.start() t2.start() t3.start() t1.join() t2.join() t3.join()
通过设置 threading.thread() 函数的 name 参数来设置线程名称,通过 threading.currentthread().getname() 来获取当前线程名称;线程的默认名称会以 thread-{i} 格式来定义
自定义一个线程对象
示例代码如下所示
import threading import time exitflag = 0 class mythread(threading.thread): def __init__(self, threadid, name, counter): threading.thread.__init__(self) self.threadid = threadid self.name = name self.counter = counter def run(self): print("starting:{0}".format(self.name)) print_time(self.name, self.counter, 5) print("exiting:{0}".format(self.name)) def print_time(threadname, delay, counter): while counter: if exitflag: thread.exit() time.sleep(delay) print("{0} {1}".format(threadname, time.ctime(time.time()))) counter -= 1 t1 = mythread(1, "thread-1", 1) t2 = mythread(2, "thread-2", 1) t1.start() t2.start() t1.join() t2.join() print("exiting main thread.")
如果想自定义一个线程对象,首先就是要定义一个继承 threading.thread 类的子类,实现构造函数, 并重写 run() 方法即可。
线程同步
lock
示例代码如下所示
import threading shared_resource_with_lock = 0 shared_resource_with_no_lock = 0 count = 100000 shared_resource_lock = threading.lock() def increment_with_lock(): global shared_resource_with_lock for i in range(count): shared_resource_lock.acquire() shared_resource_with_lock += 1 shared_resource_lock.release() def decrement_with_lock(): global shared_resource_with_lock for i in range(count): shared_resource_lock.acquire() shared_resource_with_lock -= 1 shared_resource_lock.release() def increment_without_lock(): global shared_resource_with_no_lock for i in range(count): shared_resource_with_no_lock += 1 def decrement_wthout_lock(): global shared_resource_with_no_lock for i in range(count): shared_resource_with_no_lock -= 1 if __name__ == "__main__": t1 = threading.thread(target=increment_with_lock) t2 = threading.thread(target=decrement_with_lock) t3 = threading.thread(target=increment_without_lock) t4 = threading.thread(target=decrement_wthout_lock) t1.start() t2.start() t3.start() t4.start() t1.join() t2.join() t3.join() t4.join() print("the value of shared variable with lock management is :{0}".format( shared_resource_with_lock)) print("the value of shared variable with race condition is :{0}".format( shared_resource_with_no_lock))
通过 threading.lock() 方法我们可以拿到线程锁,一般有两种操作方式:acquire() 和 release() 在两者之间是加锁状态,如果释放失败的话会显示 runtimerror() 的异常。
rlock
rlock 也叫递归锁,和 lock 的区别在于:谁拿到谁释放,是通过 threading.rlock() 来拿到的;
示例代码如下所示
import threading import time class box(object): lock = threading.rlock() def __init__(self): self.total_items = 0 def execute(self, n): box.lock.acquire() self.total_items += n box.lock.release() def add(self): box.lock.acquire() self.execute(1) box.lock.release() def remove(self): box.lock.acquire() self.execute(-1) box.lock.release() def adder(box, items): while items > 0: print("adding 1 item in the box") box.add() time.sleep(1) items -= 1 def remover(box, items): while items > 0: print("removing 1 item in the box") box.remove() time.sleep(1) items -= 1 if __name__ == "__main__": items = 5 print("putting {0} items in the box".format(items)) box = box() t1 = threading.thread(target=adder, args=(box, items)) t2 = threading.thread(target=remover, args=(box, items)) t1.start() t2.start() t1.join() t2.join() print("{0} items still remain in the box".format(box.total_items))
信号量
示例代码如下所示
import threading import time import random semaphore = threading.semaphore(0) def consumer(): print("consumer is waiting.") semaphore.acquire() print("consumer notify:consumed item numbers {0}".format(item)) def producer(): global item time.sleep(10) item = random.randint(0, 10000) print("producer notify:produced item number {0}".format(item)) semaphore.release() if __name__ == "__main__": for i in range(0, 5): t1 = threading.thread(target=producer) t2 = threading.thread(target=consumer) t1.start() t2.start() t1.join() t2.join() print("program terminated.")
信号量初始化为 0 ,然后在两个并行线程中,通过调用 semaphore.acquire() 函数会阻塞消费者线程,直到 semaphore.release() 在生产者中被调用,这里模拟了生产者-消费者 模式来进行了测试;如果信号量的计数器到了0,就会阻塞 acquire() 方法,直到得到另一个线程的通知。如果信号量的计数器大于0,就会对这个值-1然后分配资源。
使用条件进行线程同步
解释条件机制最好的例子还是生产者-消费者问题。在本例中,只要缓存不满,生产者一直向缓存生产;只要缓存不空,消费者一直从缓存取出(之后销毁)。当缓冲队列不为空的时候,生产者将通知消费者;当缓冲队列不满的时候,消费者将通知生产者。
示例代码如下所示
from threading import thread, condition import time items = [] condition = condition() class consumer(thread): def __init__(self): thread.__init__(self) def consume(self): global condition global items condition.acquire() if len(items) == 0: condition.wait() print("consumer notify:no item to consum") items.pop() print("consumer notify: consumed 1 item") print("consumer notify: item to consume are:{0}".format(len(items))) condition.notify() condition.release() def run(self): for i in range(0, 20): time.sleep(2) self.consume() class producer(thread): def __init__(self): thread.__init__(self) def produce(self): global condition global items condition.acquire() if len(items) == 10: condition.wait() print("producer notify:items producted are:{0}".format(len(items))) print("producer notify:stop the production!!") items.append(1) print("producer notify:total items producted:{0}".format(len(items))) condition.notify() condition.release() def run(self): for i in range(0, 20): time.sleep(1) self.produce() if __name__ == "__main__": producer = producer() consumer = consumer() producer.start() consumer.start() producer.join() consumer.join()
通过 condition.acquire() 来获取锁对象,condition.wait() 会使当前线程进入阻塞状态,直到收到 condition.notify() 信号,同时,调用信号的通知的对象也要及时调用 condition.release() 来释放资源;
使用事件进行线程同步
事件是线程之间用于通信的对。有的线程等待信号,有的线程发出信号。
示例代码如下所示
import time from threading import thread, event import random items = [] event = event() class consumer(thread): def __init__(self, items, event): thread.__init__(self) self.items = items self.event = event def run(self): while true: time.sleep(2) self.event.wait() item = self.items.pop() print('consumer notify:{0} popped from list by {1}'.format( item, self.name)) class producer(thread): def __init__(self, integers, event): thread.__init__(self) self.items = items self.event = event def run(self): global item for i in range(100): time.sleep(2) item = random.randint(0, 256) self.items.append(item) print('producer notify: item n° %d appended to list by %s' % (item, self.name)) print('producer notify: event set by %s' % self.name) self.event.set() print('produce notify: event cleared by %s ' % self.name) self.event.clear() if __name__ == "__main__": t1 = producer(items, event) t2 = consumer(items, event) t1.start() t2.start() t1.join() t2.join()
使用 with 语法简化代码
import threading import logging logging.basicconfig(level=logging.debug, format='(%(threadname)-10s) %(message)s') def threading_with(statement): with statement: logging.debug("%s acquired via with" % statement) def threading_not_with(statement): statement.acquire() try: logging.debug("%s acquired directly " % statement) finally: statement.release() if __name__ == "__main__": lock = threading.lock() rlock = threading.rlock() condition = threading.condition() mutex = threading.semaphore(1) threading_synchronization_list = [lock, rlock, condition, mutex] for statement in threading_synchronization_list: t1 = threading.thread(target=threading_with, args=(statement,)) t2 = threading.thread(target=threading_not_with, args=(statement,)) t1.start() t2.start() t1.join() t2.join()
使用 queue 进行线程通信
queue 常用的方法有以下四个:
- put():往 queue 中添加一个元素
- get():从 queue 中删除一个元素,并返回该元素
- task_done():每次元素被处理的时候都需要调用这个方法
- join():所有元素都被处理之前一直阻塞
from threading import thread, event from queue import queue import time import random class producer(thread): def __init__(self, queue): thread.__init__(self) self.queue = queue def run(self): for i in range(10): item = random.randint(0, 256) self.queue.put(item) print("producer notify: item item n° %d appended to queue by %s" % (item, self.name)) time.sleep(1) class consumer(thread): def __init__(self, queue): thread.__init__(self) self.queue = queue def run(self): while true: item = self.queue.get() print('consumer notify : %d popped from queue by %s' % (item, self.name)) self.queue.task_done() if __name__ == "__main__": queue = queue() t1 = producer(queue) t2 = consumer(queue) t3 = consumer(queue) t4 = consumer(queue) t1.start() t2.start() t3.start() t4.start() t1.join() t2.join() t3.join() t4.join()
基于进程的并行
multiprocessing 是 python 标准库中的模块,实现了共享内存机制。
异步编程
使用 concurrent.futures 模块
该模块具有线程池和进程池,管理并行编程任务、处理非确定性的执行流程、进程/线程同步等功能;此模块由以下部分组成
- concurrent.futures.executor: 这是一个虚拟基类,提供了异步执行的方法。
- submit(function, argument): 调度函数(可调用的对象)的执行,将 argument 作为参数传入。
- map(function, argument): 将 argument 作为参数执行函数,以 异步 的方式。
- shutdown(wait=true): 发出让执行者释放所有资源的信号。
- concurrent.futures.future: 其中包括函数的异步执行。future对象是submit任务(即带有参数的functions)到executor的实例。
示例代码如下所示
import concurrent.futures import time number_list = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] def evaluate_item(x): result_item = count(x) return result_item def count(number): for i in range(0, 1000000): i = i + 1 return i * number if __name__ == "__main__": # 顺序执行 start_time = time.time() for item in number_list: print(evaluate_item(item)) print("sequential execution in " + str(time.time() - start_time), "seconds") # 线程池执行 start_time_1 = time.time() with concurrent.futures.threadpoolexecutor(max_workers=5) as executor: futures = [executor.submit(evaluate_item, item) for item in number_list] for future in concurrent.futures.as_completed(futures): print(future.result()) print("thread pool execution in " + str(time.time() - start_time_1), "seconds") # 线程池执行 start_time_2 = time.time() with concurrent.futures.processpoolexecutor(max_workers=5) as executor: futures = [executor.submit(evaluate_item, item) for item in number_list] for future in concurrent.futures.as_completed(futures): print(future.result()) print("process pool execution in " + str(time.time() - start_time_2), "seconds")
使用 asyncio 管理事件循环
python 的 asyncio 模块提供了管理事件、协程、任务和线程的方法,以及编写并发代码的原语。此模块的主要组件和概念包括:
- 事件循环: 在asyncio模块中,每一个进程都有一个事件循环。
- 协程: 这是子程序的泛化概念。协程可以在执行期间暂停,这样就可以等待外部的处理(例如io)完成之后,从之前暂停的地方恢复执行。
- futures: 定义了 future 对象,和 concurrent.futures 模块一样,表示尚未完成的计算。
- tasks: 这是asyncio的子类,用于封装和管理并行模式下的协程。
asyncio 提供了以下方法来管理事件循环:
- loop = get_event_loop(): 得到当前上下文的事件循环。
- loop.call_later(time_delay, callback, argument): 延后 time_delay 秒再执行 callback 方法。
- loop.call_soon(callback, argument): 尽可能快调用 callback, call_soon() 函数结束,主线程回到事件循环之后就会马上调用 callback 。
- loop.time(): 以float类型返回当前时间循环的内部时间。
- asyncio.set_event_loop(): 为当前上下文设置事件循环。
- asyncio.new_event_loop(): 根据此策略创建一个新的时间循环并返回。
- loop.run_forever(): 在调用 stop() 之前将一直运行。
示例代码如下所示
import asyncio import datetime import time def fuction_1(end_time, loop): print("function_1 called") if(loop.time() + 1.0) < end_time: loop.call_later(1, fuction_2, end_time, loop) else: loop.stop() def fuction_2(end_time, loop): print("function_2 called") if(loop.time() + 1.0) < end_time: loop.call_later(1, function_3, end_time, loop) else: loop.stop() def function_3(end_time, loop): print("function_3 called") if(loop.time() + 1.0) < end_time: loop.call_later(1, fuction_1, end_time, loop) else: loop.stop() def function_4(end_time, loop): print("function_4 called") if(loop.time() + 1.0) < end_time: loop.call_later(1, function_4, end_time, loop) else: loop.stop() loop = asyncio.get_event_loop() end_loop = loop.time() + 9.0 loop.call_soon(fuction_1, end_loop, loop) loop.run_forever() loop.close()
使用 asyncio 管理协程
示例代码如下所示
import asyncio import time from random import randint @asyncio.coroutine def startstate(): print("start state called \n") input_val = randint(0, 1) time.sleep(1) if input_val == 0: result = yield from state2(input_val) else: result = yield from state1(input_val) print("resume of the transition:\nstart state calling" + result) @asyncio.coroutine def state1(transition_value): outputval = str("state 1 with transition value=%s \n" % (transition_value)) input_val = randint(0, 1) time.sleep(1) print("...evaluating...") if input_val == 0: result = yield from state3(input_val) else: result = yield from state2(input_val) @asyncio.coroutine def state2(transition_value): outputval = str("state 2 with transition value= %s \n" % (transition_value)) input_val = randint(0, 1) time.sleep(1) print("...evaluating...") if (input_val == 0): result = yield from state1(input_val) else: result = yield from state3(input_val) result = "state 2 calling " + result return outputval + str(result) @asyncio.coroutine def state3(transition_value): outputval = str("state 3 with transition value = %s \n" % (transition_value)) input_val = randint(0, 1) time.sleep(1) print("...evaluating...") if(input_val == 0): result = yield from state1(input_val) else: result = yield from state2(input_val) result = "state 3 calling " + result return outputval + str(result) @asyncio.coroutine def endstate(transition_value): outputval = str("end state with transition value = %s \n" % (transition_value)) print("...stop computation...") return outputval if __name__ == "__main__": print("finites state machine simulation with asyncio coroutine") loop = asyncio.get_event_loop() loop.run_until_complete(startstate())
使用 asyncio 控制任务
示例代码如下所示
import asyncio @asyncio.coroutine def factorial(number): f = 1 for i in range(2, number + 1): print("asyncio.task:compute factorial(%s)" % (i)) yield from asyncio.sleep(1) f *= i print("asyncio.task - factorial(%s) = %s" % (number, f)) @asyncio.coroutine def fibonacci(number): a, b = 0, 1 for i in range(number): print("asyncio.task:complete fibonacci (%s)" % (i)) yield from asyncio.sleep(1) a, b = b, a+b print("asyncio.task - fibonaci (%s)= %s" % (number, a)) @asyncio.coroutine def binomialcoeff(n, k): result = 1 for i in range(1, k+1): result = result * (n-i+1) / i print("asyncio.task:compute binomialcoeff (%s)" % (i)) yield from asyncio.sleep(1) print("asyncio.task - binomialcoeff (%s,%s) = %s" % (n, k, result)) if __name__ == "__main__": tasks = [asyncio.task(factorial(10)), asyncio.task( fibonacci(10)), asyncio.task(binomialcoeff(20, 10))] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks)) loop.close()
使用asyncio和futures
示例代码如下所示
import asyncio import sys @asyncio.coroutine def first_coroutine(future, n): count = 0 for i in range(1, n + 1): count = count + i yield from asyncio.sleep(4) future.set_result( "first coroutine (sum of n integers) result = " + str(count)) @asyncio.coroutine def second_coroutine(future, n): count = 1 for i in range(2, n + 1): count *= i yield from asyncio.sleep(3) future.set_result("second coroutine (factorial) result = " + str(count)) def got_result(future): print(future.result()) if __name__ == "__main__": n1 = 1 n2 = 1 loop = asyncio.get_event_loop() future1 = asyncio.future() future2 = asyncio.future() tasks = [ first_coroutine(future1, n1), second_coroutine(future2, n2) ] future1.add_done_callback(got_result) future2.add_done_callback(got_result) loop.run_until_complete(asyncio.wait(tasks)) loop.close()
分布式编程
略
gpu 编程
略
相关参考
上一篇: Python IO多路复用