创建进程的类:process([group [, target [, name [, args [, kwargs]]]]]),target表示调用对象,args表示调用对象的位置参数元组。kwargs表示调用对象的字典。name为别名。group实质上不使用。
import multiprocessing import time def worker(interval): n = 5 while n > 0: print("the time is {0}".format(time.ctime())) time.sleep(interval) n -= 1 if __name__ == "__main__": p = multiprocessing.process(target = worker, args = (3,)) p.start() print("", print("", print("p.is_alive:", p.is_alive()) ------------------------------------------------ >>> 1004 >>> process-1 >>> p.is_alive: true >>> the time is mon jul 29 21:31:11 2019 >>> the time is mon jul 29 21:31:14 2019 >>> the time is mon jul 29 21:31:17 2019 >>> the time is mon jul 29 21:31:20 2019 >>> the time is mon jul 29 21:31:23 2019
import multiprocessing import time def worker_1(interval): print("worker_1") time.sleep(interval) print("end worker_1") def worker_2(interval): print("worker_2") time.sleep(interval) print("end worker_2") def worker_3(interval): print("worker_3") time.sleep(interval) print("end worker_3") if __name__ == "__main__": p1 = multiprocessing.process(target = worker_1, args = (2,)) p2 = multiprocessing.process(target = worker_2, args = (3,)) p3 = multiprocessing.process(target = worker_3, args = (4,)) p1.start() p2.start() p3.start() print("the number of cpu is:" + str(multiprocessing.cpu_count())) for p in multiprocessing.active_children(): print("child" + + "\" + str( print("end") ------------------------------------------------ >>> the number of cpu is:8 >>> child p.id18208 >>> child p.id1404 >>> child p.id11684 >>> end >>> worker_1 >>> worker_2 >>> worker_3 >>> end worker_1 >>> end worker_2 >>> end worker_3
import multiprocessing import time class clockprocess(multiprocessing.process): def __init__(self, interval): multiprocessing.process.__init__(self) self.interval = interval def run(self): n = 5 while n > 0: print("the time is {0}".format(time.ctime())) time.sleep(self.interval) n -= 1 if __name__ == '__main__': p = clockprocess(3) p.start() ------------------------------------------------ >>> the time is mon jul 29 21:43:07 2019 >>> the time is mon jul 29 21:43:10 2019 >>> the time is mon jul 29 21:43:13 2019 >>> the time is mon jul 29 21:43:16 2019 >>> the time is mon jul 29 21:43:19 2019
1.4-1 不加daemon属性
import multiprocessing import time def worker(interval): print("work start:{0}".format(time.ctime())); time.sleep(interval) print("work end:{0}".format(time.ctime())); if __name__ == "__main__": p = multiprocessing.process(target = worker, args = (3,)) p.start() print("end!") ------------------------------------------------ >>> end! >>> work start:tue jul 29 21:29:10 2019 >>> work end:tue jul 29 21:29:13 2019
1.4-2 加上daemon属性
import multiprocessing import time def worker(interval): print("work start:{0}".format(time.ctime())); time.sleep(interval) print("work end:{0}".format(time.ctime())); if __name__ == "__main__": p = multiprocessing.process(target = worker, args = (3,)) p.daemon = true p.start() print("end!") ------------------------------------------------ >>> end!
1.4-3 设置daemon执行完结束的方法
import multiprocessing import time def worker(interval): print("work start:{0}".format(time.ctime())); time.sleep(interval) print("work end:{0}".format(time.ctime())); if __name__ == "__main__": p = multiprocessing.process(target = worker, args = (3,)) p.daemon = true p.start() p.join() print("end!") ------------------------------------------------ >>> work start:tue jul 29 22:16:32 2019 >>> work end:tue jul 29 22:16:35 2019 >>> end!
import multiprocessing import sys def worker_with(lock, f): with lock: fs = open(f, 'a+') n = 10 while n > 1: fs.write("lockd acquired via with\n") n -= 1 fs.close() def worker_no_with(lock, f): lock.acquire() try: fs = open(f, 'a+') n = 10 while n > 1: fs.write("lock acquired directly\n") n -= 1 fs.close() finally: lock.release() if __name__ == "__main__": lock = multiprocessing.lock() f = "file.txt" w = multiprocessing.process(target = worker_with, args=(lock, f)) nw = multiprocessing.process(target = worker_no_with, args=(lock, f)) w.start() nw.start() print("end") ------------------------------------------------ >>> lockd acquired via with >>> lockd acquired via with >>> lockd acquired via with >>> lockd acquired via with >>> lockd acquired via with >>> lockd acquired via with >>> lockd acquired via with >>> lockd acquired via with >>> lockd acquired via with >>> lock acquired directly >>> lock acquired directly >>> lock acquired directly >>> lock acquired directly >>> lock acquired directly >>> lock acquired directly >>> lock acquired directly >>> lock acquired directly >>> lock acquired directly
import multiprocessing import time def worker(s, i): s.acquire() print(multiprocessing.current_process().name + "acquire"); time.sleep(i) print(multiprocessing.current_process().name + "release\n"); s.release() if __name__ == "__main__": s = multiprocessing.semaphore(2) for i in range(5): p = multiprocessing.process(target = worker, args=(s, i*2)) p.start() ------------------------------------------------ >>> process-1acquire >>> process-1release >>> >>> process-2acquire >>> process-3acquire >>> process-2release >>> >>> process-5acquire >>> process-3release >>> >>> process-4acquire >>> process-5release >>> >>> process-4release
import multiprocessing import time def wait_for_event(e): print("wait_for_event: starting") e.wait() print("wairt_for_event: e.is_set()->" + str(e.is_set())) def wait_for_event_timeout(e, t): print("wait_for_event_timeout:starting") e.wait(t) print("wait_for_event_timeout:e.is_set->" + str(e.is_set())) if __name__ == "__main__": e = multiprocessing.event() w1 = multiprocessing.process(name = "block", target = wait_for_event, args = (e,)) w2 = multiprocessing.process(name = "non-block", target = wait_for_event_timeout, args = (e, 2)) w1.start() w2.start() time.sleep(3) e.set() print("main: event is set") ------------------------------------------------ >>> wait_for_event: starting >>> wait_for_event_timeout:starting >>> wait_for_event_timeout:e.is_set->false >>> main: event is set >>> wairt_for_event: e.is_set()->true
import multiprocessing def writer_proc(q): try: q.put(1, block = false) except: pass def reader_proc(q): try: print(q.get(block = false)) except: pass if __name__ == "__main__": q = multiprocessing.queue() writer = multiprocessing.process(target=writer_proc, args=(q,)) writer.start() reader = multiprocessing.process(target=reader_proc, args=(q,)) reader.start() reader.join() writer.join() ------------------------------------------------ >>> 1
pipe方法返回(conn1, conn2)代表一个管道的两个端。pipe方法有duplex参数,如果duplex参数为true(默认值),那么这个管道是全双工模式,也就是说conn1和conn2均可收发。duplex为false,conn1只负责接受消息,conn2只负责发送消息。
import multiprocessing import time def proc1(pipe): while true: for i in range(10000): print("send: %s" %(i)) pipe.send(i) time.sleep(1) def proc2(pipe): while true: print("proc2 rev:", pipe.recv()) time.sleep(1) def proc3(pipe): while true: print("proc3 rev:", pipe.recv()) time.sleep(1) if __name__ == "__main__": pipe = multiprocessing.pipe() p1 = multiprocessing.process(target=proc1, args=(pipe[0],)) p2 = multiprocessing.process(target=proc2, args=(pipe[1],)) # p3 = multiprocessing.process(target=proc3, args=(pipe[1],)) p1.start() p2.start() # p3.start() p1.join() p2.join() # p3.join() ------------------------------------------------ >>> send: 0 >>> roc2 rev: 0 >>> send: 1 >>> proc2 rev: 1 >>> send: 2 >>> proc2 rev: 2 >>> send: 3 >>> proc2 rev: 3 >>> send: 4 >>> proc2 rev: 4 >>> send: 5 >>> proc2 rev: 5 >>> send: 6 >>> proc2 rev: 6 >>> send: 7 >>> proc2 rev: 7 >>> send: 8 >>> proc2 rev: 8 . . . . . .
import multiprocessing import time def func(msg): print("msg:", msg) time.sleep(3) print("end") if __name__ == "__main__": pool = multiprocessing.pool(processes = 3) for i in range(4): msg = "hello %d" %(i) pool.apply_async(func, (msg, )) #维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去 print("mark~ mark~ mark~~~~~~~~~~~~~~~~~~~~~~") pool.close() pool.join() #调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束 print("sub-process(es) done.") ------------------------------------------------ >>> mark~ mark~ mark~~~~~~~~~~~~~~~~~~~~~~ >>> msg: hello 0 >>> msg: hello 1 >>> msg: hello 2 >>> end >>> msg: hello 3 >>> end >>> end >>> end >>> sub-process(es) done.
- apply_async(func[, args[, kwds[, callback]]]) 它是非阻塞,apply(func[, args[, kwds]])是阻塞的(理解区别,看例1例2结果区别)
- close() 关闭pool,使其不在接受新的任务。
- terminate() 结束工作进程,不在处理未完成的任务。
- join() 主进程阻塞,等待子进程的退出, join方法要在close或terminate之后使用。
执行说明:创建一个进程池pool,并设定进程的数量为3,xrange(4)会相继产生四个对象[0, 1, 2, 4],四个对象被提交到pool中,因pool指定进程数为3,所以0、1、2会直接送到进程中执行,当其中一个执行完事后才空出一个进程处理对象3,所以会出现输出“msg: hello 3”出现在"end"后。因为为非阻塞,主函数会自己执行自个的,不搭理进程的执行,所以运行完for循环后直接输出“mmsg: hark~ mark~ mark~~~~~~~~~~~~~~~~~~~~~~”,主程序在pool.join()处等待各个进程的结束。
import multiprocessing import time def func(msg): print("msg:", msg) time.sleep(3) print("end") if __name__ == "__main__": pool = multiprocessing.pool(processes = 3) for i in range(4): msg = "hello %d" %(i) pool.apply(func, (msg, )) #维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去 print("mark~ mark~ mark~~~~~~~~~~~~~~~~~~~~~~") pool.close() pool.join() #调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束 print("sub-process(es) done.") ------------------------------------------------ >>> msg: hello 0 >>> end >>> msg: hello 1 >>> end >>> msg: hello 2 >>> end >>> msg: hello 3 >>> end >>> mark~ mark~ mark~~~~~~~~~~~~~~~~~~~~~~ >>> sub-process(es) done.
import multiprocessing import time def func(msg): print("msg:", msg) time.sleep(3) print("end") return "done" + msg if __name__ == "__main__": pool = multiprocessing.pool(processes=4) result = [] for i in range(3): msg = "hello %d" %(i) result.append(pool.apply_async(func, (msg, ))) pool.close() pool.join() for res in result: print(":::", res.get()) print("sub-process(es) done.") ------------------------------------------------ >>> msg: hello 0 >>> msg: hello 1 >>> msg: hello 2 >>> end >>> end >>> end >>> ::: donehello 0 >>> ::: donehello 1 >>> ::: donehello 2 >>> sub-process(es) done.
import multiprocessing import os, time, random def lee(): print("\nrun task lee-%s" % (os.getpid())) # os.getpid()获取当前的进程的id start = time.time() time.sleep(random.random() * 10) # random.random()随机生成0-1之间的小数 end = time.time() print('task lee, runs %0.2f seconds.' % (end - start)) def marlon(): print("\nrun task marlon-%s" % (os.getpid())) start = time.time() time.sleep(random.random() * 40) end = time.time() print('task marlon runs %0.2f seconds.' % (end - start)) def allen(): print("\nrun task allen-%s" % (os.getpid())) start = time.time() time.sleep(random.random() * 30) end = time.time() print('task allen runs %0.2f seconds.' % (end - start)) def frank(): print("\nrun task frank-%s" % (os.getpid())) start = time.time() time.sleep(random.random() * 20) end = time.time() print('task frank runs %0.2f seconds.' % (end - start)) if __name__ == '__main__': function_list = [lee, marlon, allen, frank] print("parent process %s" % (os.getpid())) pool = multiprocessing.pool(4) for func in function_list: pool.apply_async(func) # pool执行函数,apply执行函数,当有一个进程执行完毕后,会添加一个新的进程到pool中 print('waiting for all subprocesses done...') pool.close() pool.join() # 调用join之前,一定要先调用close() 函数,否则会出错, close()执行后不会有新的进程加入到pool,join函数等待素有子进程结束 print('all subprocesses done.') ------------------------------------------------ >>> parent process 9828 >>> waiting for all subprocesses done... >>> >>> run task lee-12948 >>> >>> run task marlon-8948 >>> >>> run task allen-18124 >>> >>> run task frank-17404 >>> task frank runs 3.42 seconds. >>> task lee, runs 6.69 seconds. >>> task allen runs 8.38 seconds. >>> task marlon runs 13.37 seconds. >>> all subprocesses done.