Python开发--进程、线程、协程详解
什么是进程(process)?
程序并不能单独运行,只有将程序装载到内存中,系统为它分配资源才能运行,而这种执行的程序就称之为进程。程序和进程的区别就在于,程序是指令的集合,它是进程运行的静态描述文本;进程是程序的一次执行活动,属于动态概念。
什么是线程(thread)?
线程是操作系统能够进行运算调度的最小单位。它被包含在进程中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务。
进程与线程的区别?
线程共享内存空间,进程的内存是独立的。
同一个进程的线程之间可以直接交流,但两个进程相互通信必须通过一个中间代理。
创建一个新的线程很简单,创建一个新的进程需要对其父进程进行一次克隆。
一个线程可以控制和操作同一进程里的其他线程,但是进程只能操作子进程。
Python GIL(Global Interpreter Lock)
无论开启多少个线程,有多少个CPU,python在执行的时候在同一时刻只允许一个线程允许。
Python threading模块
直接调用
- import threading,time
- def run_num(num):
- """
- 定义线程要运行的函数
- :param num:
- :return:
- """
- print("running on number:%s"%num)
- time.sleep(3)
- if __name__ == '__main__':
- # 生成一个线程实例t1
- t1 = threading.Thread(target=run_num,args=(1,))
- # 生成一个线程实例t2
- t2 = threading.Thread(target=run_num,args=(2,))
- # 启动线程t1
- t1.start()
- # 启动线程t2
- t2.start()
- # 获取线程名
- print(t1.getName())
- print(t2.getName())
- 输出:
- running on number:1
- running on number:2
- Thread-1
- Thread-2
继承式调用
- import threading,time
- class MyThread(threading.Thread):
- def __init__(self,num):
- threading.Thread.__init__(self)
- self.num = num
- # 定义每个线程要运行的函数,函数名必须是run
- def run(self):
- print("running on number:%s"%self.num)
- time.sleep(3)
- if __name__ == '__main__':
- t1 = MyThread(1)
- t2 = MyThread(2)
- t1.start()
- t2.start()
- 输出:
- running on number:1
- running on number:2
Join and Daemon
Join
Join的作用是阻塞主进程,无法执行join后面的程序。
多线程多join的情况下,依次执行各线程的join方法,前面一个线程执行结束才能执行后面一个线程。
无参数时,则等待该线程结束,才执行后续的程序。
设置参数后,则等待该线程设定的时间后就执行后面的主进程,而不管该线程是否结束。
- import threading,time
- class MyThread(threading.Thread):
- def __init__(self,num):
- threading.Thread.__init__(self)
- self.num = num
- # 定义每个线程要运行的函数,函数名必须是run
- def run(self):
- print("running on number:%s"%self.num)
- time.sleep(3)
- print("thread:%s"%self.num)
- if __name__ == '__main__':
- t1 = MyThread(1)
- t2 = MyThread(2)
- t1.start()
- t1.join()
- t2.start()
- t2.join()
- 输出:
- running on number:1
- thread:1
- running on number:2
- thread:2
设置参数效果如下:
- if __name__ == '__main__':
- t1 = MyThread(1)
- t2 = MyThread(2)
- t1.start()
- t1.join(2)
- t2.start()
- t2.join()
- 输出:
- running on number:1
- running on number:2
- thread:1
- thread:2
Daemon
默认情况下,主线程在退出时会等待所有子线程的结束。如果希望主线程不等待子线程,而是在退出时自动结束所有的子线程,就需要设置子线程为后台线程(daemon)。方法是通过调用线程类的setDaemon()方法。
- import time,threading
- def run(n):
- print("%s".center(20,"*")%n)
- time.sleep(2)
- print("done".center(20,"*"))
- def main():
- for i in range(5):
- t = threading.Thread(target=run,args=(i,))
- t.start()
- t.join(1)
- print("starting thread",t.getName())
- m = threading.Thread(target=main,args=())
- # 将main线程设置位Daemon线程,它作为程序主线程的守护线程,当主线程退出时,m线程也会退出,由m启动的其它子线程会同时退出,不管是否执行完成
- m.setDaemon(True)
- m.start()
- m.join(3)
- print("main thread done".center(20,"*"))
- 输出:
- *********0*********
- starting thread Thread-2
- *********1*********
- ********done********
- starting thread Thread-3
- *********2*********
- **main thread done**
线程锁(互斥锁Mutex)
一个进程下可以启动多个线程,多个线程共享父进程的内存空间,也就意味着每个线程可以访问同一份数据,此时,如果2个线程同时要修改同一份数据就需要线程锁。
- import time,threading
- def addNum():
- # 在每个线程中都获取这个全局变量
- global num
- print("--get num:",num)
- time.sleep(1)
- # 对此公共变量进行-1操作
- num -= 1
- # 设置一个共享变量
- num = 100
- thread_list = []
- for i in range(100):
- t = threading.Thread(target=addNum)
- t.start()
- thread_list.append(t)
- # 等待所有线程执行完毕
- for t in thread_list:
- t.join()
- print("final num:",num)
加锁版本
Lock时阻塞其他线程对共享资源的访问,且同一线程只能acquire一次,如多于一次就出现了死锁,程序无法继续执行。
- import time,threading
- def addNum():
- # 在每个线程中都获取这个全局变量
- global num
- print("--get num:",num)
- time.sleep(1)
- # 修改数据前加锁
- lock.acquire()
- # 对此公共变量进行-1操作
- num -= 1
- # 修改后释放
- lock.release()
- # 设置一个共享变量
- num = 100
- thread_list = []
- # 生成全局锁
- lock = threading.Lock()
- for i in range(100):
- t = threading.Thread(target=addNum)
- t.start()
- thread_list.append(t)
- # 等待所有线程执行完毕
- for t in thread_list:
- t.join()
- print("final num:",num)
GIL VS Lock
GIL保证同一时间只能有一个线程来执行。lock是用户级的lock,与GIL没有关系。
RLock(递归锁)
Rlock允许在同一线程中被多次acquire,线程对共享资源的释放需要把所有锁都release。即n次acquire,需要n次release。
- def run1():
- print("grab the first part data")
- lock.acquire()
- global num
- num += 1
- lock.release()
- return num
- def run2():
- print("grab the second part data")
- lock.acquire()
- global num2
- num2 += 1
- lock.release()
- return num2
- def run3():
- lock.acquire()
- res = run1()
- print("between run1 and run2".center(50,"*"))
- res2 = run2()
- lock.release()
- print(res,res2)
- if __name__ == '__main__':
- num,num2 = 0,0
- lock = threading.RLock()
- for i in range(10):
- t = threading.Thread(target=run3)
- t.start()
- while threading.active_count() != 1:
- print(threading.active_count())
- else:
- print("all threads done".center(50,"*"))
- print(num,num2)
这两种锁的主要区别是,RLock允许在同一线程中被多次acquire。而Lock却不允许这种情况。注意,如果使用RLock,那么acquire和release必须成对出现,即调用了n次acquire,必须调用n次的release才能真正释放所占用的锁。
Semaphore(信号量)
互斥锁同时只允许一个线程更改数据,而Semaphore是同时允许一定数量的线程更改数据,比如售票处有3个窗口,那最多只允许3个人同时买票,后面的人只能等前面任意窗口的人离开才能买票。
- import threading,time
- def run(n):
- semaphore.acquire()
- time.sleep(1)
- print("run the thread:%s"%n)
- semaphore.release()
- if __name__ == '__main__':
- # 最多允许5个线程同时运行
- semaphore = threading.BoundedSemaphore(5)
- for i in range(20):
- t = threading.Thread(target=run,args=(i,))
- t.start()
- while threading.active_count() != 1:
- # print(threading.active_count())
- pass
- else:
- print("all threads done".center(50,"*"))
Timer(定时器)
Timer隔一定时间调用一个函数,如果想实现每隔一段时间就调用一个函数,就要在Timer调用的函数中,再次设置Timer。Timer是Thread的一个派生类。
- import threading
- def hello():
- print("hello,world!")
- # delay 5秒之后执行hello函数
- t = threading.Timer(5,hello)
- t.start()
Event
Python提供了Event对象用于线程间通信,它是有线程设置的信号标志,如果信号标志位为假,则线程等待指导信号被其他线程设置为真。Event对象实现了简单的线程通信机制,它提供了设置信号、清除信号、等待等用于实现线程间的通信。
设置信号
使用Event的set()方法可以设置Event对象内部的信号标志为真。Event对象提供了isSet()方法来判断其内部信号标志的转态,当使用event对象的set()方法后,isSet()方法返回真。
清除信号
使用Event的clear()方法可以清除Event对象内部的信号标志,即将其设为假,当使用Event的clear()方法后,isSet()方法返回假。
等待
Event的wait()方法只有在内部信号为真的时候才会很快的执行并完成返回。当Event对象的内部信号标志为假时,则wait()方法一直等待其为真时才返回。
通过Event来实现两个或多个线程间的交互,下面以红绿灯为例,即启动一个线程做交通指挥灯,生成几个线程做车辆,车辆行驶按红停绿行的规则。
- import threading,time,random
- def light():
- if not event.isSet():
- event.set()
- count = 0
- while True:
- if count < 5:
- print("\033[42;1m--green light on--\033[0m".center(50,"*"))
- elif count < 8:
- print("\033[43;1m--yellow light on--\033[0m".center(50,"*"))
- elif count < 13:
- ifevent.isSet():
- event.clear()
- print("\033[41;1m--red light on--\033[0m".center(50,"*"))
- else:
- count = 0
- event.set()
- time.sleep(1)
- count += 1
- def car(n):
- while 1:
- time.sleep(random.randrange(10))
- ifevent.isSet():
- print("car %s is running..."%n)
- else:
- print("car %s is waiting for the red light..."%n)
- if __name__ == "__main__":
- event = threading.Event()
- Light = threading.Thread(target=light,)
- Light.start()
- for i in range(3):
- t = threading.Thread(target=car,args=(i,))
- t.start()
queue队列
Python中队列是线程间最常用的交换数据的形式。Queue模块是提供队列操作的模块。
创建一个队列对象
- import queue
- q = queue.Queue(maxsize = 10)
queue.Queue类是一个队列的同步实现。队列长度可以无限或者有限。可以通过Queue的构造函数的可选参数maxsize来设定队列长度。如果maxsize小于1表示队列长度无限。
将一个值放入队列中
- q.put("a")
调用队列对象的put()方法在队尾插入一个项目。put()有两个参数,第一个item为必需的,为插入项目的值;第二个block为可选参数,默认为1。如果队列当前为空且block为1,put()方法就使调用线程暂停,直到空出一个数据单元。如果block为0,put()方法将引发Full异常。
将一个值从队列中取出
- q.get()
调用队列对象的get()方法从队头删除并返回一个项目。可选参数为block,默认为True。如果队列为空且block为True,get()就使调用线程暂停,直到有项目可用。如果队列为空且block为False,队列将引发Empty异常。
Python Queue模块有三种队列及构造函数
- # 先进先出
- class queue.Queue(maxsize=0)
- # 先进后出
- class queue.LifoQueue(maxsize=0)
- # 优先级队列级别越低越先出
- class queue.PriorityQueue(maxsize=0)
常用方法
- q = queue.Queue()
- # 返回队列的大小
- q.qsize()
- # 如果队列为空,返回True,反之False
- q.empty()
- # 如果队列满了,返回True,反之False
- q.full()
- # 获取队列,timeout等待时间
- q.get([block[,timeout]])
- # 相当于q.get(False)
- q.get_nowait()
- # 等到队列为空再执行别的操作
- q.join()
生产者消费者模型
在开发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。
为什么要使用生产者和消费者模式
在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。
什么是生产者消费者模式
生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不再等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。
最基本的生产者消费者模型的例子。
- import queue,threading,time
- q = queue.Queue(maxsize=10)
- def Producer():
- count = 1
- while True:
- q.put("骨头%s"%count)
- print("生产了骨头",count)
- count += 1
- def Consumer(name):
- while q.qsize() > 0:
- print("[%s] 取到[%s]并且吃了它..."%(name,q.get()))
- time.sleep(1)
- p = threading.Thread(target=Producer,)
- c1 = threading.Thread(target=Consumer,args=("旺财",))
- c2 = threading.Thread(target=Consumer,args=("来福",))
- p.start()
- c1.start()
- c2.start()
以上就是Python开发--进程、线程、协程详解的详细内容,更多请关注其它相关文章!
上一篇: MongoDB入门学习(二):MongoDB的基本概念和数据类型
下一篇: Hive元数据解析