欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

进程通信

程序员文章站 2022-07-01 17:19:16
...

1.独立的进程内存空间与共享的服务器进程空间

进程是独立的,互不干扰的独立内存空间

进程通信

打印出a为1

也就是说,主进程中的a不受子进程中a的干扰,他们是相互独立的

那进程之间怎么通信那?

进程间通信的解决方案

进程通信

创建一个服务器进程并返回一个管理器,管理器开辟一块空间(用于进程间数据共享)并返回一个代理,把这个代理通过args传递给其他进程,其他进程就可以通过这个代理访问共享的空间了。

from multiprocessing import Manager,Process

mgr = Manager() #创建服务器进程,并返回与其通信的管理器
list = mgr.list() #通过管理器在服务器进程中开辟一个列表空间,并返回一个代理
print(list)

def func(list):
    list.append('a')
#把列表传递给子进程,子进程就可以通过这个代理,来通过共享空间来进行通信
p = Process(target=func,args=(list,))
p.start()
p.join()
print(list)
[]
['a']

2.线程间共享的全局变量与同步锁的基本概念

线程间全局变量的共享

线程属于同一个进程,因此它们之间共享内存区域。 因此全局变量是公共的。

from threading import Thread

a = 1
def func():
    global a
    a = 2

t = Thread(target=func)
t.start()
t.join()

print(a)
2

共享内存间存在竞争问题

from threading import Thread

data = 0
n = 10000000     #加减操作的次数,如果预期结果不出现,那就多加几个0

def add(n):
    global data
    for i in range(n): #对全局变量data进行n次+1操作
        data += 1

def sub(n):
    global data
    for i in range(n): #对全局变量data进行n次-1操作
        data -= 1

t_add = Thread(target=add, args=(n,))
t_sub = Thread(target=sub, args=(n,))
t_add.start()
t_sub.start()
t_add.join()
t_sub.join()

print(data)
-2225363

由于线程间共享内存存在竞争,全局变量data不会严格按照两个进程的全部操作完整进行(先加n,后减n,最后为零),会出现一定缺失。

使用锁来控制共享资源的访问

lock = th.Lock() 生成锁

lock.acquire() 使用锁

lock.release() 释放锁

或使用释放合一 with lock:

import threading as th

data = 0
n = 10000000     #加减操作的次数
lock = th.Lock() #生成一把锁

def add(n):
    global data
    lock.acquire()
    for i in range(n): #对全局变量data进行n次+1操作
        data += 1
    lock.release()
def sub(n):
    global data
    with lock:
        for i in range(n): #对全局变量data进行n次-1操作
            data -= 1
t_add = th.Thread(target=add, args=(n,))
t_sub = th.Thread(target=sub, args=(n,))
t_add.start()
t_sub.start()
t_add.join()
t_sub.join()

print(data)
0

3.线程与进程安全队列

使用原因

安全队列自带锁,可以避免因竞争出现的结果有误问题。

线程安全队列 

导入:import queue

          que = queue.Queue()

  • 入队: put(item) #队列满,put阻塞,下同
  • 出队: get() #队列空,get阻塞,下同
  • 测试空: empty()  # 近似,小概率出现判断失误,如某一进程正在放数据
  • 测试满: full()  # 近似
  • 队列长度: qsize()  # 近似
  • 任务结束: task_done()
  • 等待完成: join()

进程安全队列 

导入:from multiprocess import Manager

          mgr = Manager()

         que = mgr.Queue()

  • 入队: put(item)
  • 出队: get()
  • 测试空: empty()  # 近似
  • 测试满: full()  # 近似
  • 队列长度: qsize()  # 近似

其他问题解释

问:队列算公共资源嘛?

答:如果只是一个线程/进程在使用,那么它并不算公共资源。 但是一旦多个线程/进程在同时使用,那么它就是一个公共资源。

问:我们是否需要对其加锁?

答:如果被当作公共资源使用,那么按理说是必须要加锁的。 但是,线程安全或进程安全的队列中已经帮我们实现了锁。 因此我们不需           要再自己使用锁来同步。

4.生产者与消费者模型

生产者与消费者模型的概念

进程通信

所谓,生产者与消费者模型,本质上是把进程通信的问题分开考虑

生产者,只需要往队列里面生产资源(生产者不需要关心消费者)

消费者,只需要从队列里面消费资源(消费者也不需要关心生产者)

消费者与生产者模式的应用----Web服务器与Web框架之间的关系

进程通信

生产者-----客户端

消费者-----web框架进程

安全队列-----web服务器

每当有客户端发送请求时,web服务器接受所有的用户请求,web框架负责从服务器中拿数据并分配线程。

多线程版本

import threading as th
import queue
import random

class Consume(th.Thread):
    def __init__(self,queue):
        super().__init__()
        self.queue=queue
    def run(self):
        while True:
            item=self.queue.get()
            print("消费者,消费了%s"%item)


class Produce(th.Thread):
    def __init__(self,queue):
        super().__init__()
        self.queue= queue
    def run(self):
        while True:
            item = random.randint(0,99) #生成资源的随机编号
            self.queue.put(item)
            print("生产者,生产:%s"%item)
queue = queue.Queue(3) #设置线程安全队列长度为3

p = Produce(queue)
c = Consume(queue)
p.start()
c.start()
生产者,生产:30
生产者,生产:46
生产者,生产:73
生产者,生产:83
消费者,消费了30
消费者,消费了46
生产者,生产:71
消费者,消费了73
生产者,生产:47
消费者,消费了83
生产者,生产:74
消费者,消费了71
生产者,生产:4
消费者,消费了47

多进程版本

from multiprocessing import  Process,Manager
import random

con_queue=Manager().Queue(3)

class Consume(Process):
    def __init__(self,con_queue):
        super().__init__()
        self.queue=con_queue
    def run(self):
        while True:
            item=self.queue.get()
            print("消费者,消费:%s"%item)
class Produce(Process):
    def __init__(self,con_queue):
        super().__init__()
        self.queue = con_queue
    def run(self):
        while True:
            item = random.randint(0,99)
            self.queue.put(item)
            print("生产者,生产:%s"%item)
p = Produce(con_queue)
c = Consume(con_queue)
p.start()
c.start()
p.join()
c.join()

 

上一篇: Socket通信

下一篇: TCP协议