进程通信
1.独立的进程内存空间与共享的服务器进程空间
进程是独立的,互不干扰的独立内存空间
打印出a为1
也就是说,主进程中的a不受子进程中a的干扰,他们是相互独立的
那进程之间怎么通信那?
进程间通信的解决方案
创建一个服务器进程并返回一个管理器,管理器开辟一块空间(用于进程间数据共享)并返回一个代理,把这个代理通过args传递给其他进程,其他进程就可以通过这个代理访问共享的空间了。
from multiprocessing import Manager,Process
mgr = Manager() #创建服务器进程,并返回与其通信的管理器
list = mgr.list() #通过管理器在服务器进程中开辟一个列表空间,并返回一个代理
print(list)
def func(list):
list.append('a')
#把列表传递给子进程,子进程就可以通过这个代理,来通过共享空间来进行通信
p = Process(target=func,args=(list,))
p.start()
p.join()
print(list)
[]
['a']
2.线程间共享的全局变量与同步锁的基本概念
线程间全局变量的共享
线程属于同一个进程,因此它们之间共享内存区域。 因此全局变量是公共的。
from threading import Thread
a = 1
def func():
global a
a = 2
t = Thread(target=func)
t.start()
t.join()
print(a)
2
共享内存间存在竞争问题
from threading import Thread
data = 0
n = 10000000 #加减操作的次数,如果预期结果不出现,那就多加几个0
def add(n):
global data
for i in range(n): #对全局变量data进行n次+1操作
data += 1
def sub(n):
global data
for i in range(n): #对全局变量data进行n次-1操作
data -= 1
t_add = Thread(target=add, args=(n,))
t_sub = Thread(target=sub, args=(n,))
t_add.start()
t_sub.start()
t_add.join()
t_sub.join()
print(data)
-2225363
由于线程间共享内存存在竞争,全局变量data不会严格按照两个进程的全部操作完整进行(先加n,后减n,最后为零),会出现一定缺失。
使用锁来控制共享资源的访问
lock = th.Lock() 生成锁
lock.acquire() 使用锁
lock.release() 释放锁
或使用释放合一 with lock:
import threading as th
data = 0
n = 10000000 #加减操作的次数
lock = th.Lock() #生成一把锁
def add(n):
global data
lock.acquire()
for i in range(n): #对全局变量data进行n次+1操作
data += 1
lock.release()
def sub(n):
global data
with lock:
for i in range(n): #对全局变量data进行n次-1操作
data -= 1
t_add = th.Thread(target=add, args=(n,))
t_sub = th.Thread(target=sub, args=(n,))
t_add.start()
t_sub.start()
t_add.join()
t_sub.join()
print(data)
0
3.线程与进程安全队列
使用原因
安全队列自带锁,可以避免因竞争出现的结果有误问题。
线程安全队列
导入:import queue
que = queue.Queue()
- 入队: put(item) #队列满,put阻塞,下同
- 出队: get() #队列空,get阻塞,下同
- 测试空: empty() # 近似,小概率出现判断失误,如某一进程正在放数据
- 测试满: full() # 近似
- 队列长度: qsize() # 近似
- 任务结束: task_done()
- 等待完成: join()
进程安全队列
导入:from multiprocess import Manager
mgr = Manager()
que = mgr.Queue()
- 入队: put(item)
- 出队: get()
- 测试空: empty() # 近似
- 测试满: full() # 近似
- 队列长度: qsize() # 近似
其他问题解释
问:队列算公共资源嘛?
答:如果只是一个线程/进程在使用,那么它并不算公共资源。 但是一旦多个线程/进程在同时使用,那么它就是一个公共资源。
问:我们是否需要对其加锁?
答:如果被当作公共资源使用,那么按理说是必须要加锁的。 但是,线程安全或进程安全的队列中已经帮我们实现了锁。 因此我们不需 要再自己使用锁来同步。
4.生产者与消费者模型
生产者与消费者模型的概念
所谓,生产者与消费者模型,本质上是把进程通信的问题分开考虑
生产者,只需要往队列里面生产资源(生产者不需要关心消费者)
消费者,只需要从队列里面消费资源(消费者也不需要关心生产者)
消费者与生产者模式的应用----Web服务器与Web框架之间的关系
生产者-----客户端
消费者-----web框架进程
安全队列-----web服务器
每当有客户端发送请求时,web服务器接受所有的用户请求,web框架负责从服务器中拿数据并分配线程。
多线程版本
import threading as th
import queue
import random
class Consume(th.Thread):
def __init__(self,queue):
super().__init__()
self.queue=queue
def run(self):
while True:
item=self.queue.get()
print("消费者,消费了%s"%item)
class Produce(th.Thread):
def __init__(self,queue):
super().__init__()
self.queue= queue
def run(self):
while True:
item = random.randint(0,99) #生成资源的随机编号
self.queue.put(item)
print("生产者,生产:%s"%item)
queue = queue.Queue(3) #设置线程安全队列长度为3
p = Produce(queue)
c = Consume(queue)
p.start()
c.start()
生产者,生产:30
生产者,生产:46
生产者,生产:73
生产者,生产:83
消费者,消费了30
消费者,消费了46
生产者,生产:71
消费者,消费了73
生产者,生产:47
消费者,消费了83
生产者,生产:74
消费者,消费了71
生产者,生产:4
消费者,消费了47
多进程版本
from multiprocessing import Process,Manager
import random
con_queue=Manager().Queue(3)
class Consume(Process):
def __init__(self,con_queue):
super().__init__()
self.queue=con_queue
def run(self):
while True:
item=self.queue.get()
print("消费者,消费:%s"%item)
class Produce(Process):
def __init__(self,con_queue):
super().__init__()
self.queue = con_queue
def run(self):
while True:
item = random.randint(0,99)
self.queue.put(item)
print("生产者,生产:%s"%item)
p = Produce(con_queue)
c = Consume(con_queue)
p.start()
c.start()
p.join()
c.join()