欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

进程池、线程池、回调函数、协程

程序员文章站 2024-01-11 22:49:28
摘要: 进程池与线程池 同步调用和异步调用 回调函数 协程 一、进程池与线程池: 1、池的概念: 不管是线程还是进程,都不能无限制的开下去,总会消耗和占用资源。 也就是说,硬件的承载能力是有限度的,在保证高效率工作的同时应该还需要保证硬件的资源占用情况,所以需要给硬件设置一个上限来减轻硬件的压力,所 ......

摘要:

  • 进程池与线程池
  • 同步调用和异步调用
  • 回调函数
  • 协程

一、进程池与线程池:

1、池的概念:

  不管是线程还是进程,都不能无限制的开下去,总会消耗和占用资源。

  也就是说,硬件的承载能力是有限度的,在保证高效率工作的同时应该还需要保证硬件的资源占用情况,所以需要给硬件设置一个上限来减轻硬件的压力,所以就有了池的概念。

2、进程池与线程池的使用方法:(进程与线程的创建基本相似,所以进程池与线程池的使用过程也基本一样)

from concurrent.futures import processpoolexecutor  # 导入进程池模块
from concurrent.futures import threadpoolexecutor # 导入线程池模块
import os
import time
import random

# 下面以进程池为例,线程池只是使用导入模块不一样,仅此而已。
def task(name):
    print('name:[%s]|进程:[%s]正在运行' % (name, os.getpid()))
    time.sleep(random.randint(1, 3))   # 模拟进程运行耗费时间。

# 这一步的必要性:在创建进程时,会将代码以模块的方式从头到尾导入加载执行一遍
# (所以创建线程如果不写在main里面的话,这个py文件里面的所有代码都会从头到尾加载执行一遍
# 就会导致在创建进程的时候产生死循环。)
if __name__ == '__main__':
    pool = processpoolexecutor(4)  # 设置线程池的大小,默认等于cpu的核心数。
    for i in range(10):
        pool.submit(task, '进程%s' % i)  # 异步提交(提交后不等待)
    
    pool.shutdown(wait=true)  # 关闭进程池入口不再提交,同时等待进程池全部运行完毕。(类似join方法)
    print('主') # 标识一下主进程的完毕之前的语句
# 运行过程及结果:
name:[进程0]|进程:[4080]正在运行
name:[进程1]|进程:[18336]正在运行
name:[进程2]|进程:[19864]正在运行
name:[进程3]|进程:[25604]正在运行
name:[进程4]|进程:[4080]正在运行
name:[进程5]|进程:[18336]正在运行
name:[进程6]|进程:[4080]正在运行
name:[进程7]|进程:[19864]正在运行
name:[进程8]|进程:[25604]正在运行
name:[进程9]|进程:[18336]正在运行
主

 二、同步调用、异步调用

  同步调用:提交任务,原地等待该任务执行完毕,拿到结果后再执行下一个任务,导致程序串行执行!

from concurrent.futures import processpoolexecutor  # 导入进程池模块
from concurrent.futures import threadpoolexecutor # 导入线程池模块
import os
import time
import random


def task(name):
    print('name:[%s]|进程[%s]正在运行...' % (name, os.getpid()))
    time.sleep(random.randint(1, 3))
    return '拿到[%s]|进程%s的结果...' % (name, os.getpid())

if __name__ == '__main__':
    pool = processpoolexecutor(4)
    result = []  # 创建一个空列表来搜集执行结果
    for i in range(10):
        res = pool.submit(task, '进程%s' % i).result()  # 使用.result()方法得到每次的结果,同步调用
        result.append(res)
    pool.shutdown(wait=true)
    for j in result:
        print(j)
    print('主进程')

 

# 执行结果:
name:[进程0]|进程[3376]正在运行...
name:[进程1]|进程[27124]正在运行...
name:[进程2]|进程[10176]正在运行...
name:[进程3]|进程[28636]正在运行...
name:[进程4]|进程[3376]正在运行...
name:[进程5]|进程[27124]正在运行...
name:[进程6]|进程[10176]正在运行...
name:[进程7]|进程[28636]正在运行...
name:[进程8]|进程[3376]正在运行...
name:[进程9]|进程[27124]正在运行...
拿到[进程0]|进程3376的结果...
拿到[进程1]|进程27124的结果...
拿到[进程2]|进程10176的结果...
拿到[进程3]|进程28636的结果...
拿到[进程4]|进程3376的结果...
拿到[进程5]|进程27124的结果...
拿到[进程6]|进程10176的结果...
拿到[进程7]|进程28636的结果...
拿到[进程8]|进程3376的结果...
拿到[进程9]|进程27124的结果...
主进程

 

  异步调用:提交任务,不去等结果,继续执行。

from concurrent.futures import processpoolexecutor
import os
import random
import time

def task(name):
    time.sleep(random.randint(1, 3))
    print('name: %s 进程[%s]运行...' % (name, os.getpid()))


if __name__ == '__main__':
    pool = processpoolexecutor(4)
    for i in range(10):
        pool.submit(task, '进程%s' % i)   # 异步调用,提交后不等待结果,继续执行代码

    pool.shutdown(wait=true)
    print('主进程')
name: 进程3 进程[10016]运行...
name: 进程0 进程[12736]运行...
name: 进程1 进程[4488]运行...
name: 进程2 进程[3920]运行...
name: 进程5 进程[12736]运行...
name: 进程6 进程[4488]运行...
name: 进程4 进程[10016]运行...
name: 进程9 进程[4488]运行...
name: 进程8 进程[12736]运行...
name: 进程7 进程[3920]运行...
主进程

 三、回调函数:

  上面我们在演示异步调用时候,说过提交任务不等待执行结果,继续往下执行代码,那么,执行的结果我们怎么得到呢?

  可以为进程池和线程池内的每个进程或线程绑定一个函数,该函数在进程或线程的任务执行完毕后自动触发并接收任务的返回值当做参数,这个函数就是回调函数。
from concurrent.futures import threadpoolexecutor
import time
import random
import requests


def task(url):
    print('获取网站[%s]信息' % url)
    response = requests.get(url)  # 下载页面
    time.sleep(random.randint(1, 3))
    return {'url': url, 'content': response.text}  # 返回结果:页面地址和页面内容

futures = []
def back(res):
    res = res.result()  # 取到提交任务的结果(回调函数固定写法)
    res = '网站[%s]内容长度:%s' % (res.get('url'), len(res.get('content')))
    futures.append(res)
    return futures

if __name__ == '__main__':
    urls = [
        'http://www.baidu.com',
        'http://www.dgtle.com/',
        'https://www.bilibili.com/'
    ]
    pool = threadpoolexecutor(4)
    futures = []
    for i in urls:
        pool.submit(task, i).add_done_callback(back)  # 执行完线程后,使用回调函数

    pool.shutdown(wait=true)
    for j in futures:
        print(j)
获取网站[http://www.baidu.com]信息
获取网站[http://www.dgtle.com/]信息
获取网站[https://www.bilibili.com/]信息
网站[http://www.dgtle.com/]内容长度:39360
网站[https://www.bilibili.com/]内容长度:69377
网站[http://www.baidu.com]内容长度:2381

 四:协程(通过单线程实现并发)

我们知道,多个线程执行任务时候,如果其中一个任务遇到io,操作系统会有一种来回'切'的机制,来最大效率利用cpu的使用效率,从而实现多线程并发效果

而协程:就是用单线程实现并发,通过软件代码手段,在代码执行过程中遇到io,自动切换到进程中的另外一个执行的代码,然后再次遇到io,继续切换到另一个

执行的代码。

过程就是:单进程中任务执行中:遇到io,代码层面在单线程中切换代码执行。从而骗过操作系统,让操作系统以为这个单线程好像没经历过io,从而达到该

单线程对cpu使用的效率最大化。

实现过程所需模块:gevent 

from gevent import monkey; monkey.patch_all()  # 监测代码中所有io行为
# gevent模块不能识别它本身以外的所有的io行为,但是它内部封装了一个模块,能够帮助我们识别所有的io行为
from gevent import spawn # 从gevent模块导入spawn,来使用‘切’的方法 import time import random def heng(name): print('%s 哼了一下...' % name) time.sleep(random.randint(1, 3)) print('%s 哼完了' % name) def ha(name): print('%s 哈了一下' % name) time.sleep(random.randint(1, 3)) print('%s 哈完了' % name) start = time.time() # 标记开始时间 s1 = spawn(heng, '王大锤') # 标记并运行heng函数(遇到io,切) s2 = spawn(ha, '至尊宝') # 标记并运行ha函数(遇到io,切) s1.join() s2.join() # s1、s2都执行完毕后才继续执行 print('运行时间:', time.time()-start)

 

王大锤 哼了一下...
至尊宝 哈了一下
王大锤 哼完了
至尊宝 哈完了
运行时间: 2.0049164295196533

 

  进程:资源单位
     线程:执行单位
  协程:单线程下实现并发(能够在多个任务之间切换和保存状态来节省io),这里注意区分操作系统的切换+保存状态是针对多个线程而言,而我们现在是想在单个线程下自己手动实现操作系统的切换+保存状态的功能

注意协程这个概念完全是程序员自己想出来的东西,它对于操作系统来说根本不存在。操作系统只知道进程和线程。并且需要注意的是并不是单个线程下实现切换+保存状态就能提升效率,因为你可能是没有遇到io也切,那反而会降低效率

再回过头来想上面的socket服务端实现并发的例子,单个线程服务端在建立连接的时候无法去干通信的活,在干通信的时候也无法去干连接的活。这两者肯定都会有io,如果能够实现通信io了我就去干建连接,建连接io了我就去干通信,那其实我们就可以实现单线程下实现并发

将单个线程的效率提升到最高,多进程下开多线程,多线程下用协程>>> 实现高并发!!!

协程实现服务端客户端通信

# 服务端:
from gevent import monkey;monkey.patch_all()
from gevent import spawn
import socket
n = 0

def communicate(conn):
    while true:
        try:
            data = conn.recv(1024)
            if len(data) == 0:break
            print(data.decode('utf-8'))
            conn.send(data.upper())
        except connectionreseterror:
            break
    conn.close()


def server(): # 切点
    global n
    server = socket.socket()
    server.bind(('127.0.0.1',8080))
    server.listen(5)
    while true:
        conn, addr = server.accept()
        # n += 1
        spawn(communicate, conn)    # 切点
        # print(n)

if __name__ == '__main__':
    s1 = spawn(server)
    s1.join()
# 客户端
from threading import thread,current_thread
import socket


def client():

    client = socket.socket()
    client.connect(('127.0.0.1',8080))
    n = 1
    while true:
        data = '%s %s'%(current_thread().name, n)
        n += 1
        client.send(data.encode('utf-8'))
        info = client.recv(1024)
        print(info)

if __name__ == '__main__':
    for i in range(500):  # 多线程模拟多客户的访问服务器,进行通信循环。
        t = thread(target=client)
        t.start()

# 原本服务端需要开启500个线程才能跟500个客户端通信,现在只需要一个线程就可以扛住500客户端
# 进程下面开多个线程,线程下面再开多个协程,最大化提升软件运行效率