PCB(Processing Control Block):进程控制块,通常是系统内存占用区中的一个连续存区(数据结构),主要表示进程状态。它存放着操作系统用于描述进程情况及控制进程运行所需的全部信息。操作系统是根据PCB来对并发执行的进程进行控制和管理的。PCB和进程的关系是一对一的。进程控制块主要包括该进程的以下信息:进程名、特征信息、进程状态信息、调度优先权、通信信息、现场保护区、资源需求、进程实体信息、族系关系等。每个进程有唯一的PCB,PCB是进程存在的唯一标志。
- 一次仅允许一个进程使用的资源称为临界资源(Critical Resource)。临界资源可以是硬件也可以是软件。如打印机只能同时由一个进程使用,那么它就是临界资源。
- 在每个进程中访问临界资源的那段程序称为临界区(Critical Section)。对于不同临界资源的临界区,它们之间不互斥。
- 进程互斥,就是对进程进入时临界区加以控制。进入前要申请,获准后进入,执行临界区程序后退出, 然后才可以执行其它代码。
- 临界区进入准则:
- 空闲让进:如果若干进程要求进入空闲的临界区,一次仅允许一个进程进入。
- 忙则等待:任何时候,处于临界区的进程不可多于一个。
- 有限等待:进入临界区的进程要在有限时间内退出。
- 让权等待:如果进程不能进入自己的临界区,则应让出CPU,避免进程出现“忙等”现象。
is a package that supports spawning processes using an API similar to the threading module,多进程是根据多线程仿写的,它没有GIL(Global Interpreter Lock)的限制,可以充分利用多核资源。
from multiprocessing import Process import os, time def say(): time.sleep(2) print("子进程: ", os.getpid()) print("This is a subprocess.") print("子进程结束.") if __name__ == '__main__': print("主进程: ", os.getpid()) p = Process(target=say, args=()) p.start() print("主进程结束.")
from multiprocessing import Process import os, time class Say(Process): def __init__(self): super().__init__() def run(self): # 重写类方法 time.sleep(2) print("子进程: ", os.getpid()) print("This is a subprocess.") print("子进程结束.") if __name__ == '__main__': print("主进程: ", os.getpid()) p = Say() p.start() print("主进程结束.")
multiprocessing supports three ways to start a process。spawn、fork、forkserver。spawn只会使用部分父进程的资源(重开python解释器进程,速度慢),fork会继承父进程的全部资源(容易泄密),相较于前两个,forkserver会在需要启动子进程时由父进程向forserver请求创建新的子进程,它效率高并且不必继承所有资源。
import multiprocessing as mp def foo(q): q.put('hello') if __name__ == '__main__': ctx = mp.get_context('forkserver') q = ctx.Queue() p = ctx.Process(target=foo, args=(q,)) p.start() print(q.get()) p.join() # 守护进程
supports two types of communication channel between processes: Queue and Pipe。进程间通信的方式有两种,即通过队列或者管道。
举个例子: 开启两个进程一个队列,一个进程不断地往队列中添加url,另一个进程从队列中取出url,下载天龙八部章节。
from multiprocessing import Process, Queue import urllib from bs4 import BeautifulSoup def url_maker(q): num = 2024334 for i in range(5): url = "{}.html".format(num) q.put(url) print(url) num += 1 q.put("over") def url_parse(q): count = 0 while True: url = q.get() # 从列表中获取url if url == "over": break else: count += 1 html = urllib.request.urlopen(url) # 请求html html_bytes = # 读取字节数据 soup = BeautifulSoup(html_bytes, "html.parser") string = soup.find("div", attrs={"class": "contentbox", "id": "htmlContent"}).get_text() # 获取小说内容 lines = string.split() with open("天龙八部.txt", mode="a", encoding="utf-8") as f: # 写入文件 f.write("第 {} 章".format(count) + "\r\n") print("已下载第{}章...".format(count)) for i, line in enumerate(lines[: -6]): f.write(" " + line + "\r\n") if __name__ == '__main__': q = Queue() p1 = Process(target=url_maker, args=(q,)) p2 = Process(target=url_parse, args=(q,)) p1.start() p2.start() p1.join() p2.join()
from multiprocessing import Process, Queue import urllib from bs4 import BeautifulSoup class UrlMaker(Process): # 传参的时候也可以global,甚至target都可以另写 def __init__(self): super().__init__() def run(self): num = 2024334 for i in range(5): url = "{}.html".format(num) q.put(url) print(url) num += 1 q.put("over") class UrlParser(Process): def __init__(self, file): super().__init__() self.file = file def run(self): count = 0 while True: url = q.get() # 从列表中获取url if url == "over": break else: count += 1 html = urllib.request.urlopen(url) # 请求html html_bytes = # 读取字节数据 soup = BeautifulSoup(html_bytes, "html.parser") string = soup.find("div", attrs={"class": "contentbox", "id": "htmlContent"}).get_text() # 获取小说内容 lines = string.split() with open(self.file, mode="a", encoding="utf-8") as f: # 写入文件 f.write("第 {} 章".format(count) + "\r\n") print("已下载第{}章...".format(count)) for i, line in enumerate(lines[: -6]): f.write(" " + line + "\r\n") if __name__ == '__main__': q = Queue() p1 = UrlMaker() p2 = UrlParser("天龙八部1.txt") p1.start() p2.start() p1.join() p2.join()
官网给定的Pipe的例子: 貌似这货不能用while True循环以及I/O操作。
from multiprocessing import Process, Pipe def f(conn): conn.send([42, None, 'hello']) conn.close() if __name__ == '__main__': parent_conn, child_conn = Pipe() # Pipe性能高于Queue p = Process(target=f, args=(child_conn,)) p.start() print(parent_conn.recv()) # prints "[42, None, 'hello']" p.join()
When using multiple processes, one generally uses message passing for communication between processes and avoids having to use any synchronization primitives like locks.
(多进程一般会使用消息传递进行彼此间通信,并且避免使用像锁这样的同步原语),进程互斥有自己的原语(取得锁 -- do something -- 释放锁)。
For passing messages one can use Pipe()
(for a connection between two processes) or a queue (which allows multiple producers and consumers).
The Queue
, SimpleQueue
and JoinableQueue
types are multi-producer, multi-consumer FIFO queues modelled on the queue.Queue
class in the standard library.
(基于标准库中的queue.Queue类,Queue, SimpleQueue和JoinableQueue都是多对多的先进先出的队列模型)
If you use JoinableQueue
then you must call JoinableQueue.task_done()
for each task removed from the queue or else the semaphore used to count the number of unfinished tasks may eventually overflow, raising an exception.
from multiprocessing import Process, JoinableQueue import os, time import numpy as np def func1(q): while True: num = np.random.randint(1, 4) for i in range(num): q.get("包子") q.task_done() # 每一次取数据是一个task,都需要调用task_done()回调给队列,告诉队列进程已获取到该值 print("消费者%s消费了 --%d-- 个包子." % (os.getpid(), num)) time.sleep(1) def func2(q): num = np.random.randint(5, 15) for i in range(num): q.put("包子") print("生产者%s生产了 --%d-- 个包子." % (os.getpid(), num)) time.sleep(2) q.join() # 如果写q.join(),会使生产者进程一直阻塞到队列中所有的数据都被取走位置;不写的话生产完就结束了 if __name__ == '__main__': q = JoinableQueue() processes = [] for i in range(3): p = Process(target=func2, args=(q,), name="生产者") processes.append(p) for i in range(5): p = Process(target=func1, args=(q,), name="消费者") processes.append(p) for p in processes: p.start() p.join()
Note that one can also create a shared queue by using a manager object – see Managers.
Managers实例化时创建了一个服务进程,以供其它两个或多个进程间的通信。manager returned by Manager()
will support types list
, dict
, Namespace
, Lock
, RLock
, Semaphore
, BoundedSemaphore
, Condition
, Event
, Barrier
, Queue
, Value
and Array
from multiprocessing import Process, Manager import time def func1(dic): for i in range(5): dic.get("包子") print("消费者正在消费第%d个包子." % i) time.sleep(1) def func2(dic): for i in range(5): dic[i] = "包子 %d " % i print("--生产者生产了5个包子.--") if __name__ == '__main__': with Manager() as manager: dic = manager.dict() p1 = Process(target=func2, args=(dic,), name="生产者") p2 = Process(target=func1, args=(dic,), name="消费者") p1.start() p2.start() p1.join() p2.join()
from multiprocessing import Process def desc(): global num f = open("test.txt", ) for i in range(1000): # print("desc-----: ", num) num -= 1 print("----------num------------", num) def add(): global num for i in range(1000): print("add: ", num) num += 1 print("----------num------------", num) if __name__ == '__main__': num = 0 p1 = Process(target=desc) p1.start() p1.join() print(num) # 打印结果为: ----------num------------ -1000 0
可以看出: 在创建子进程时,将global资源给了子进程,然后两个进程就彼此独立了。主进程有个num,子进程有个num,各自计算num值没有交互。印证了进程间的独立性。另一方面,也说明了进程间通信必须依赖第三方内存(硬盘)空间(Queue、Pipe、Manager等)。
from multiprocessing import Process def desc(): global num for i in range(10): print("desc-----: ", num) num -= 1 print("----------num------------", num) def add(): global num for i in range(10): print("add: ", num) num += 1 print("----------num------------", num) if __name__ == '__main__': num = 0 p1 = Process(target=desc) p2 = Process(target=add) p1.start() p2.start() p1.join() p2.join() print(num)
from multiprocessing import Process, Lock lock = Lock() # lock是全局变量,它对每个创建的进程进行管理 class Write1(Process): def __init__(self): super().__init__() # 这里可以self.lock = lock,但是不能用self.lock = Lock() def run(self): f = open("test.txt", mode="a", encoding="utf8") lock.acquire() for i in range(1000): f.write("1111\r\n") lock.release() f.close() class Write2(Process): def __init__(self): super().__init__() def run(self): f = open("test.txt", mode="a", encoding="utf8") lock.acquire() for i in range(1000): f.write("22222222222222222\r\n") lock.release() f.close() if __name__ == '__main__': t1 = Write1() t2 = Write2() t1.start() t2.start() t1.join() t2.join()
from multiprocessing import Process, Condition import time cond = Condition() class P1(Process): def __init__(self, cond): super().__init__() self.cond = cond = "孙先生" def run(self): with self.cond: print("{}: 哟,美女呦。".format( self.cond.notify() self.cond.wait() print("{}: 红了好像更好呀。".format( class P2(Process): def __init__(self, cond): super().__init__() self.cond = cond = "段小姐" def run(self): with self.cond: self.cond.wait() print("{}: 呦,哟,哟,脸都红了啦。".format( time.sleep(5) self.cond.notify() self.cond.wait() print("{}: 讨厌啦。".format( self.cond.notify() if __name__ == '__main__': p1 = P1(cond) p2 = P2(cond) print("p1和p2开始处理同一个事情") p2.start() p1.start()
from multiprocessing import Process, Queue, Pool import urllib from bs4 import BeautifulSoup class UrlMaker(Process): # 传参的时候也可以global,甚至target都可以另写 def __init__(self, number): super().__init__() self.number = number def run(self): num = 2024334 for i in range(self.number): url = "{}.html".format(num) q.put(url) print(url) num += 1 q.put("over") def urlParser(file): url = q.get() # 从列表中获取url if url == "over": return { "code": False, } else: html = urllib.request.urlopen(url) # 请求html html_bytes = # 读取字节数据 soup = BeautifulSoup(html_bytes, "html.parser") title = soup.find("div", attrs={"class": "h1title"}).h1.get_text() string = soup.find("div", attrs={"class": "contentbox", "id": "htmlContent"}).get_text() # 获取小说内容 lines = string.split() with open(file, mode="a", encoding="utf-8") as f: # 写入文件 f.write(title + "\r\n") for i, line in enumerate(lines[: -6]): f.write(" " + line + "\r\n") return { "code": True, "url": url, "title": title, } def callback(msg): if msg["code"]: print("Process handled url: {}, title: {}.".format(msg["url"], msg["title"])) else: print("All urls had parsed.") if __name__ == '__main__': q = Queue() # 创建一个队列,用于存放要请求的url pool = Pool(3) # 创建一个进程池,用于取出url并解析和下载正文内容 p1 = UrlMaker(10) # 创建一个进程,用于向url队列中添加url for i in range(20): pool.apply_async(func=urlParser, args=("天龙八部1.txt",), callback=callback) # pool.apply_async: 异步非阻塞模式, 如果用pool.apply,则是阻塞模式(等价于 # func支持函数,和args一起会创建一个进程
# callback参数是回调函数,回调函数的参数就是func执行的结果
# 它本身是一个ApplyResult对象,即 result=pool.apply_async(func, args, callback);result有自己的一些状态码
# result.get():返回结果,如果有必要则等待结果到达。
# result.ready():如果调用完成,返回True。
# result.successful():如果调用完成且没有引发异常,返回True。
# result.wait([timeout]):等待结果变为可用。
# result.terminate():立即终止所有工作进程,同时不执行任何清理或结束任何挂起工作。
p1.start() p1.join() pool.close() # 等待进程执行结束时关闭进程 pool.join() # 进程池对象pool常用的方法: apply_async、apply、close、join
上一篇: cdr怎么绘制简单的插画图?