python多线程与多进程
程序员文章站
2022-07-12 21:31:44
...
一、基本概念
1、进程process
什么是进程。最直观的就是一个个pid,官方的说法就:进程是程序在计算机上的一次执行活动。
从内核的观点看,进程的目的就是担当分配系统资源(CPU时间、内存等)的基本单位。
进程有独立的地址空间,一个进程崩溃后不会对其它进程产生影响。
2、线程thead
线程是进程的一个执行流,是CPU调度和分派的基本单位,它是比进程更小的能独立运行的基本单位。
一个进程由几个线程组成,线程与同属一个进程的其他的线程共享进程所拥有的全部资源。
线程没有独立的地址空间,一个线程死掉就等于整个进程死掉。
3、多线程和多进程
在 Python 中,对于计算密集型任务,多进程占优势,对于 I/O 密集型任务,多线程占优势。
当然对运行一个程序来说,随着 CPU 的增多执行效率肯定会有所提高,这是因为一个程序基本上不会是纯计算或者纯 I/O,所以我们只能相对的去看一个程序到底是计算密集型还是 I/O 密集型。
二、Python多进程技术
python中的多进程主要使用到 multiprocessing 这个库
1、不使用进程池
# -*- coding:utf-8 -*-
from multiprocessing import Process
import time,os
def worker():
print("子进程{}执行中, 父进程{}".format(os.getpid(),os.getppid()))
time.sleep(2)
print("子进程{}终止".format(os.getpid()))
if __name__ == "__main__":
print("本机为",os.cpu_count(),"核 CPU")
print("主进程{}执行中, 开始时间={}".format(os.getpid(), time.strftime('%Y-%m-%d %H:%M:%S')))
start = time.time()
l=[]
# 创建子进程实例
for i in range(10):
p=Process(target=worker,name="worker"+str(i),args=())
l.append(p)
# 开启进程
for i in range(10):
l[i].start()
# 阻塞进程
for i in range(10):
l[i].join()
stop = time.time()
print("主进程终止,结束时间={}".format(time.strftime('%Y-%m-%d %H:%M:%S')))
print("总耗时 %s 秒" % (stop - start))
2、使用进程池
# -*- coding:utf-8 -*-
from multiprocessing import Pool
import time,os
def worker(arg):
print("子进程{}执行中, 父进程{}".format(os.getpid(),os.getppid()))
time.sleep(2)
print("子进程{}终止".format(os.getpid()))
if __name__ == "__main__":
print("本机为",os.cpu_count(),"核 CPU")
print("主进程{}执行中, 开始时间={}".format(os.getpid(), time.strftime('%Y-%m-%d %H:%M:%S')))
start = time.time()
l = Pool(processes=5)
# 创建子进程实例
for i in range(10):
# l.apply(worker,args=(i,)) # 同步执行(Python官方建议废弃)
l.apply_async(worker,args=(i,)) # 异步执行
# 关闭进程池,停止接受其它进程
l.close()
# 阻塞进程
l.join()
stop = time.time()
print("主进程终止,结束时间={}".format(time.strftime('%Y-%m-%d %H:%M:%S')))
print("总耗时 %s 秒" % (stop - start))
三、Python多线程技术
python中的多进程主要使用到 threading 这个库
1、不使用线程池
# -*- coding:utf-8 -*-
from threading import Thread
import time,os
def worker(arg):
print("子线程执行中>>> 编号={}".format(arg))
time.sleep(2)
print("子线程终止>>> 编号={}".format(arg))
if __name__ == "__main__":
print("本机为",os.cpu_count(),"核 CPU") # 本机为4核
l = []
# 创建子线程实例
for i in range(10):
t = Thread(target=worker, name='one', args=(i,))
t.start()
l.append(t)
for p in l:
p.join()
2、使用线程池(multiprocessing库的线程池)
"from multiprocessing import Pool "这样导入的 Pool 表示的是进程池,"from multiprocessing.dummy import Pool"这样导入的 Pool表示的是线程池。
# -*- coding:utf-8 -*-
from multiprocessing.dummy import Pool as ThreadPool
import time,os
def worker(arg):
print("子线程{}执行中".format(arg))
time.sleep(2)
print("子线程{}终止".format(arg))
if __name__ == "__main__":
print("本机为",os.cpu_count(),"核 CPU")
print("主线程执行中, 开始时间={}".format(time.strftime('%Y-%m-%d %H:%M:%S')))
start = time.time()
pool = ThreadPool(5)
results = pool.map(worker, range(10))
pool.close()
pool.join()
stop = time.time()
print("主线程终止,结束时间={}".format(time.strftime('%Y-%m-%d %H:%M:%S')))
print("总耗时 %s 秒" % (stop - start))
3、使用线程池(自定义线程池)
"""
思路
1,将任务放在队列
1)创建队列:(初始化)
2)设置大小,线程池的最大容量
3)真实创建的线程 列表
4)空闲的线程数量
2,着手开始处理任务
1)创建线程
2)空闲线程数量大于0,则不再创建线程
3)创建线程池的数量 不能高于线程池的限制
4)根据任务个数判断 创建线程的数量
2)线程去队列中取任务
1)取任务包(任务包是一个元祖)
2)任务为空时,不再取(终止)
"""
import time
import threading
import queue
stopEvent = object() # 停止任务的标志
class ThreadPool(object):
def __init__(self, max_thread):
# 创建任务队列,可以放无限个任务
self.queue = queue.Queue()
# 指定最大线程数
self.max_thread = max_thread
# 停止标志
self.terminal = False
# 创建真实线程数
self.generate_list = []
# 空闲线程数
self.free_thread = []
def run(self, action, args, callback=None):
"""
线程池执行一个任务
INPUT ->
action:任务函数
args:任务参数
callback:执行完任务的回调函数,成功或者失败的返回值。
"""
# 线程池运行的条件:1)
if len(self.free_thread) == 0 and len(self.generate_list) < self.max_thread:
self.generate_thread()
task = (action, args, callback)
self.queue.put(task)
def callback(self):
"""
回调函数:循环取获取任务,并执行任务函数
"""
# 获取当前线程
current_thread = threading.current_thread()
self.generate_list.append(current_thread)
# 取任务并执行
event = self.queue.get()
# 事件类型是任务
while event != stopEvent: # 重点是这个判断 使任务终止
# 解开任务包 ,(任务是一个元祖)
# 执行任务
# 标记:执行任务前的状态,执行任务后的状态
action, args, callback = event
try:
ret = action(*args)
success = True
except Exception as x:
success = False
ret = x
if callback is not None:
try:
callback(success, ret)
except Exception as e:
print(e)
else:
pass
if not self.terminal:
self.free_thread.append(current_thread)
event = self.queue.get()
self.free_thread.remove(current_thread)
else:
# 停止进行取任务
event = stopEvent
else:
# 不是元祖,不是任务,则清空当前线程,不在去取任务
self.generate_list.remove(current_thread)
def generate_thread(self):
"""
创建一个线程
"""
t = threading.Thread(target=self.callback)
t.start()
# 终止取任务
def terminals(self):
"""
无论是否还有任务,终止线程
"""
self.terminal = True
def close(self):
"""
执行完所有的任务后,所有线程停止
"""
num = len(self.generate_list)
self.queue.empty()
while num:
self.queue.put(stopEvent)
num -= 1
def test(pi):
time.sleep(0.5)
print(pi)
pool = ThreadPool(10)
for i in range(100):
pool.run(action=test, args=(i,))
pool.terminals()
pool.close()