python(十)线程与进程(中):进程、协程
一、上节回顾:线程 vs 进程
1、线程:一组指令 内存共享
同时修改同一份数据时必须加锁,metex 互斥锁 递归锁 join 等待线程结束启动一个线程:
def run(): print("") t= threading.Thread(target=run, args(n,)) t.start() t.join # 等待所有结果,先把所有线程存起来守护线程:setDaemon(True) (start之前)
服务于非守护线程 信号量 事件events
队列 queue :两个主要作用
解耦,是程序之间实现松耦合 提高处理效率FIFO = first in first out LIFO = last in first out
队列和列表区别,队列取完就完了
生产者消费者模型2、多线程使用场景
全局解释器锁的存在让多线程只有一个线程在执行,所以python里的多线程是假的多线程,不管多少核,同一时间只能在一个核上运行。
所有我们利用多线程的优势只是利用了它的什么优势呢?利用了CPU上下文切换的优势,看上去说并发的效果。
什么时候用多线程呢?
io操作不占用cpu,计算占用cpu
大量计算,耗cpu的用单线程
python多线程,不适合cpu密集操作型的任务,适合io密集型的任务
IO密集开多线程,计算密集开多进程
二、多进程
多进程:进程之间是独立的,
python的线程是用的操作系统的原生线程、python的进程也是用的操作系统的原生进程。
原生进程是由操作系统去维护的,python只是通过C代码库去起了一个进程,真正进程的管理还是通过操作系统去完成的。
操作系统的进程管理是没有全局解释器锁的,进程只是是独立的,根本不需要锁的概念。
1、多进程的基本语法
进程:资源的集合,至少包含一个线程
python使用多核运算,使用python多进程
多进程和多线程的使用基本是一样的
import multiprocessing muitiprocessing.Process
import multiprocessing import threading import time def thread_run(i,n): print("在进程%s的线程%s"%(i,n)) def run(i): print("进程:%s "%i) time.sleep(1) for n in range(2): t = threading.Thread(target=thread_run,args=(i,n)) t.start() if __name__ == '__main__': # 这个必须要有 for i in range(4): p = multiprocessing.Process(target=run,args=(i,)) p.start()
如果我想取我的进程号,怎么去取呢?
from multiprocessing import Process import os def info(title): # 打印进程信息 print(title) print('module name:', __name__) # 模块名 print('parent process:', os.getppid()) # 父进程ID print('process id:', os.getpid()) # 进程ID print("\n") def f(name): info('\033[31;1mcalled from child process function f\033[0m') # 打印子进程信息 print('hello', name) if __name__ == '__main__': info('\033[32;1mmain process line\033[0m') # 打印当前进程信息 p = Process(target=f, args=('FGF',)) # 子进程 p.start() # p.join()
2、进程间数据交互
前面提到进程间内存是独立的,但是想要访问,怎么办呢?
有下面几种方式:(万变不离其宗,需要个中间件(翻译))
使用方法跟threading里的queue差不多
from multiprocessing import Process, Queue def f(qq): qq.put([42, None, 'hello']) # 子进程中放数据 if __name__ == '__main__': q = Queue() # 定义一个Queue p = Process(target=f, args=(q,)) p.start() # 启动子进程 print(q.get()) # 主进程获取数据并打印 p.join()
如果把线程queue传给子进程,传不了,那么父进程的Queue是怎么传递的?
看上去像数据共享,实际上是克隆了一个Queue,把自己的Queue克隆了一份交给了子进程。
但是为了数据共享,子进程会把Queue pickle序列化到一个中间的地方,中间位置再把数据反序列化给其他进程。
类似socket、如电话线,一人在这头,一人在那头
from multiprocessing import Process, Pipe def f(conn): conn.send([42, None, 'hello from child']) conn.send([42, None, 'hello from child2']) print("from parent:",conn.recv()) conn.close() if __name__ == '__main__': parent_conn, child_conn = Pipe() # 名字自定义 p = Process(target=f, args=(child_conn,)) p.start() print(parent_conn.recv()) # prints [42, None, 'hello from child'] print(parent_conn.recv()) # prints [42, None, 'hello from child2'] parent_conn.send("[42, None, 'hello']") # prints "[42, None, 'hello']" p.join()数据共享 Managers
上面两种方式只是实现了数据的传递,还没有实现数据的共享,如实现数据共享,就要用到Managers。
from multiprocessing import Process,Manager import os def f(dict1,list1): dict1[os.getpid()] = os.getpid() # 往字典里放当前PID list1.append(os.getpid()) # 往列表里放当前PID print(list1) if __name__ == "__main__": with Manager() as manager: d = manager.dict() # 生成一个字典,可在多个进程间共享和传递 l = manager.list(range(5)) #生成一个列表,可在多个进程间共享和传递 p_list = [] # 存进程列表 for i in range(10): p = Process(target=f,args=(d,l)) p.start() p_list.append(p) for res in p_list: # 等待结果 res.join() print('\n%s' %d)
要不要加锁呢,不用加锁,Managers默认就帮你处理了,内部有锁控制。
进程里面也有一个锁进程不是内存独立的么,要锁还有毛用?来看一下:
from multiprocessing import Process, Lock def f(l, i): l.acquire() # acquire一把锁 try: print('hello world', i) finally: l.release() if __name__ == '__main__': lock = Lock() # 生成锁实例 for num in range(10): Process(target=f, args=(lock, num)).start()
因为屏幕共享,会出现打印乱的问题。所以加锁
3、进程池
创建一个子进程就是克隆一份父进程空间给子进程,开销非常大。假如父进程空间1G,创建几个子进程内存空间就占满了,所有有进程池的限制。
同一时间有多少进程在运行。
线程是没有线程池的,(你可以自己搞:通过信号量搞线程池)
进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。
进程池中有两个方法:
apply : 同步、串行 apply_async : 异步、并行def Foo(i): time.sleep(2) print("\033[31min process %s\033[0m"%os.getpid()) return i def Bar(arg): print("--> ecex done:", arg, os.getpid()) # 回调 # 回调函数:通过PID,可见是主进程调用的,不是子进程调用的 if __name__ == "__main__": # windows下面必须有这句 pool = Pool(processes=4) # 允许进程池同时放入4个进程 print("主进程:%s\n%s"%(os.getpid(),'*'*22)) for i in range(10): pool.apply_async(func=Foo, args=(i,), callback=Bar) # 回调,参数为前面函数的返回结果 # pool.apply(func=Foo, args=(i,)) 串行 # pool.apply_async(func=Foo, args=(i,)) 并行 pool.close() # 一定先关闭进程池再join等待已运行的结束,自己试试区别 pool.join() # 进程池中进程执行完毕后在关闭。如果注释,那么程序直接关闭
三、协程
1、协程介绍
协程,又称微线程,纤程。英文名Coroutine。协程是一种用户态的轻量级线程。
协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。因此:
协程能保留上一次调用时的状态(即所有局部状态的一个特定组合),每次过程重入时,就相当于进入上一次调用的状态,换种说法:进入上一次离开时所处逻辑流的位置。
线程的切换,会保存到CPU的寄存器里。
CPU感觉不到协程的存在,协程是用户自己控制的。
之前通过yield做的生产者消费者模型,就是协程,在单线程下实现并发效果。
协程的好处:
无需线程上下文切换的开销 无需数据操作锁定及同步的开销 方便切换控制流,简化编程模型 高并发+高扩展性+低成本:一个CPU支持上万的协程都不是问题。所以很适合用于高并发处理。缺点:
无法利用多核资源:协程的本质是个单线程,它不能同时将 单个CPU 的多个核用上,协程需要和进程配合才能运行在多CPU上.当然我们日常所编写的绝大部分应用都没有这个必要,除非是cpu密集型应用。 进行阻塞(Blocking)操作(如IO时)会阻塞掉整个程序2、使用yield实现协程操作例子
import time def consumer(name): print("--->starting eating baozi...") while True: new_baozi = yield print("[%s] is eating baozi %s" % (name,new_baozi)) # time.sleep(1) def producer(): r = con.__next__() r = con2.__next__() n = 0 while n < 5: n +=1 print("\033[32;1m[producer]\033[0m is making baozi %s" %n ) con.send(n) con2.send(n) time.sleep(1) if __name__ == '__main__': con = consumer("c1") # 第一次调用只是生成器,next的时候才回生成 con2 = consumer("c2") p = producer()
为了保证并发效果,在什么时候切换呢?遇到IO操作就切换。
但什么时候切回去呢?IO操作完了就切回去,但是程序是怎么实现的呢?
我们来看一下
3、Greenlet 一个封装好的协程
from greenlet import greenlet def test1(): print(12) gr2.switch() print(34) gr2.switch() def test2(): print(56) gr1.switch() print(78) gr1 = greenlet(test1) gr2 = greenlet(test2) gr1.switch()
4、Gevent 自动切换
Greenlet 手动切换;Gevent 自动切换,封装了Greenlet
Gevent 是一个第三方库,可以轻松通过gevent实现并发同步或异步编程,在gevent中用到的主要模式是Greenlet
它是以C扩展模块形式接入Python的轻量级协程。 Greenlet全部运行在主程序操作系统进程的内部,但它们被协作式地调度。
import gevent def foo(): print("Running in foo") gevent.sleep(2) print("swich to foo again") # 来回切换,直到sleep结束 def bar(): print("Running in bar") gevent.sleep(1) print("swich back to bar") def func3(): print("Running in func3") gevent.sleep(0) # 只触发一次切换操作 print("swich func3 again") gevent.joinall([ gevent.spawn(foo), # 生成 gevent.spawn(bar), gevent.spawn(func3), ])协程gevent并发爬网页
from urllib import request import gevent, time # 注意!:Gevent检测不到urllib的io操作,还是串行的,让它知道就需要打补丁 from gevent import monkey monkey.patch_all() # 把当前程序的所有IO操作给我做单独的做上标记 def f(url): print("Get %s" %url) resp = request.urlopen(url) data = resp.read() # with open("url.html", 'wb') as f: # f.write(data) print("%d bytes received from %s" %(len(data), url)) print("异步时间统计中……") # 协程实现 async_start_time = time.time() gevent.joinall([ gevent.spawn(f, "https://www.python.org"), gevent.spawn(f, "https://www.yahoo.com"), gevent.spawn(f, "https://github.com"), ]) print("\033[32;1m异步cost:\033[0m",time.time()-async_start_time) #------------------------以下只为对比效果--------------------------- print("同步步时间统计中……") urls = [ "https://www.python.org", "https://www.yahoo.com", "https://github.com", ] start_time = time.time() for url in urls: f(url) print("\033[32;1m同步cost:\033[0m",time.time()-start_time)通过gevent实现单线程下的多socket并发
服务端:
import sys import socket import time import gevent from gevent import socket,monkey monkey.patch_all() def server(port): s = socket.socket() s.bind(('0.0.0.0', port)) s.listen(500) while True: cli, addr = s.accept() # 每个连接起一个协程 gevent.spawn(handle_request, cli) def handle_request(conn): try: while True: data = conn.recv(1024) print("recv:", data) conn.send(data) if not data: conn.shutdown(socket.SHUT_WR) # 类似break except Exception as ex: print(ex) finally: conn.close() if __name__ == '__main__': server(8001)
客户端:
import socket HOST = 'localhost' # The remote host PORT = 8001 # The same port as used by the server s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.connect((HOST, PORT)) while True: msg = bytes(input(">>> "),encoding="utf8") s.sendall(msg) data = s.recv(1024) #print(data) print('Received', repr(data)) # 内置方法repr:格式化输出 s.close()
上一篇: ORA-12514:RMAN连接报错解决