从零开始的Python学习Episode 22——多线程
多线程
线程
线程是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务。
进程
程序的执行实例称为进程。
每个进程提供执行程序所需的资源。进程具有虚拟地址空间、可执行代码、系统对象的打开句柄、安全上下文、唯一进程标识符、环境变量、优先级类、最小和最大工作集大小以及至少一个执行线程。每个进程都是用一个线程(通常称为主线程)启动的,但是可以从它的任何线程创建额外的线程。
线程和进程的区别
线程共享创建它的进程的地址空间;进程有自己的地址空间。
线程可以直接访问其进程的数据段;进程有自己的父进程数据段副本。
线程可以直接与其进程的其他线程通信;进程必须使用进程间通信来与兄弟进程通信。
新线程很容易创建;新进程需要父进程的重复。
线程可以对同一进程的线程进行相当大的控制;进程只能对子进程进行控制。
对主线程的更改(取消、优先级更改等)可能会影响进程的其他线程的行为;对父进程的更改不会影响子进程。
线程的创建
import threading def foo(num): print('running on thread ',num)if __name__=='__main__': t1 = threading.thread(target=foo, args=(1,)) # 生成一个线程实例 t1.start() # 启动线程
通过继承的方式创建线程
import threading class mythread(threading.thread): def __init__(self, num): threading.thread.__init__(self) self.num = num def run(self): # 定义每个线程要运行的函数,将threading.thread中的run方法进行了重载 print("running on number:%s" % self.num) if __name__ == '__main__': t1 = mythread(1) t2 = mythread(2) t1.start() t2.start()
并发
创建两个线程来同时并发。
import threading import time class mythread(threading.thread): def __init__(self, num): threading.thread.__init__(self) self.num = num def run(self): # 定义每个线程要运行的函数 print("running on number:%s" % self.num) time.sleep(3) if __name__ == '__main__': begin = time.time() t1 = mythread(1) t2 = mythread(2) t1.start() t2.start() end = time.time() print(end-begin)
这样输出的是
running on number:1 running on number:20.002995729446411133
期间只用了零点几秒,几乎是同时输出的,这样可以看出它们是同时进行的。要是是串行的就应该是3.xxxxx秒了。
join()方法
在子线程完成运行之前,这个子线程的父线程将一直被阻塞。即一个线程使用join()方法后,必须等该线程结束后才执行join()之后的代码。
import threading import time class mythread(threading.thread): def __init__(self, num): threading.thread.__init__(self) self.num = num def run(self): # 定义每个线程要运行的函数 print("running on number:%s" % self.num) time.sleep(3) if __name__ == '__main__': begin = time.time() t1 = mythread(1) t2 = mythread(2) t1.start() t2.start() t1.join() t2.join() end = time.time() print(end-begin)
这样输出的是
running on number:1 running on number:2 3.005915880203247
这样就相当于串行了。
守护线程setdaemon
将线程声明为守护线程,必须在start() 方法调用之前设置, 如果不设置为守护线程程序会被无限挂起。这个方法基本和join是相反的。当主线程完成时不需要某个子线程完全运行完就要退出程序,那么就可以将这个子线程设置为守护线程,setdaemon(true)。
import threading import time class mythread(threading.thread): def __init__(self, num): threading.thread.__init__(self) self.num = num def run(self): # 定义每个线程要运行的函数 print("running on number:%s" % self.num) time.sleep(3) if __name__ == '__main__': begin = time.time() t1 = mythread(1) t2 = mythread(2) t1.setdaemon(true) t2.setdaemon(true) t1.start() t2.start() end = time.time() print(end-begin)
这样当主线程完成了之后就会退出,不会等待子线程。
running on number:1 running on number:20.0020024776458740234
同步锁(lock)
import time import threading def addnum(): global num #在每个线程中都获取这个全局变量 # num-=1 temp=num print('--get num:',num ) time.sleep(0.001) num =temp-1 #对此公共变量进行-1操作 num = 100 #设定一个共享变量 thread_list = [] for i in range(100): t = threading.thread(target=addnum) t.start() thread_list.append(t) for t in thread_list: #等待所有线程执行完毕 t.join() print('final num:', num )
但这样最后获得的final num往往不是0,而是其他数,这是因为多个线程在time.sleep()的时候同时拿到了num,所以num是同一个数,而解决方法就是加锁。
死锁与递归锁(rlock)
死锁
import time import threading lock1 = threading.lock() lock2 = threading.lock() class mythread(threading.thread): def run(self): self.f1() self.f2() def f1(self): lock1.acquire() print('%s 拿到一号锁' %self.name) lock2.acquire() print('%s 拿到二号锁' % self.name) time.sleep(2) lock1.release() lock2.release() def f2(self): lock2.acquire() print('%s 拿到二号锁' %self.name) lock1.acquire() print('%s 拿到一号锁' % self.name) lock2.release() lock1.release() if __name__ == '__main__': for i in range(5): t=mythread() t.start() #thread-1 拿到一号锁 thread-1 拿到二号锁 thread-1 拿到二号锁thread-2 拿到一号锁
这里开了5个线程,可是却阻塞住了,原因是在thread1拿到二号锁,thread2拿到一号锁时,f2中在等待一号锁,f1在等待二号锁,结果都等不到,所以产生了死锁。
rlock
import time import threading lock = threading.rlock() class mythread(threading.thread): def run(self): self.f1() self.f2() def f1(self): lock.acquire() print('%s 拿到一号锁' %self.name) lock.acquire() print('%s 拿到二号锁' % self.name) time.sleep(2) lock.release() lock.release() def f2(self): lock.acquire() print('%s 拿到二号锁' %self.name) lock.acquire() print('%s 拿到一号锁' % self.name) lock.release() lock.release() if __name__ == '__main__': for i in range(5): t=mythread() t.start() ''' thread-1 拿到一号锁 thread-1 拿到二号锁 thread-2 拿到一号锁 thread-2 拿到二号锁 thread-3 拿到一号锁 thread-3 拿到二号锁 thread-4 拿到一号锁 thread-4 拿到二号锁 thread-5 拿到一号锁 thread-5 拿到二号锁 thread-1 拿到二号锁 thread-1 拿到一号锁 thread-2 拿到二号锁 thread-2 拿到一号锁 thread-3 拿到二号锁 thread-3 拿到一号锁 thread-4 拿到二号锁 thread-4 拿到一号锁 thread-5 拿到二号锁 thread-5 拿到一号锁 '''
为了支持在同一线程中多次请求同一资源,python提供了“可重入锁”:threading.rlock。rlock内部维护着一个lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次acquire。直到一个线程所有的acquire都被release,其他的线程才能获得资源。
信号量
信号量用来控制线程并发数的,boundedsemaphore或semaphore管理一个内置的计数 器,每当调用acquire()时-1,调用release()时+1。
计数器不能小于0,当计数器为 0时,acquire()将阻塞线程至同步锁定状态,直到其他线程调用release()。(类似于停车位的概念)
boundedsemaphore与semaphore的唯一区别在于前者将在调用release()时检查计数 器的值是否超过了计数器的初始值,如果超过了将抛出一个异常。
import time import threading s = threading.boundedsemaphore(3) class mythread(threading.thread): def run(self): s.acquire() print(self.name) time.sleep(2) s.release() if __name__ == '__main__': t=[] for i in range(10): t.append(mythread()) for i in t: i.start() ''' thread-1 thread-2 thread-3 thread-4thread-6thread-5 thread-7 thread-8thread-9 thread-10 '''
可以看到几乎是三个三个同时输出的。
条件变量同步
有一类线程需要满足条件之后才能够继续执行,python提供了threading.condition 对象用于条件变量线程的支持,它除了能提供rlock()或lock()的方法外,还提供了 wait()、notify()、notifyall()方法。
lock_con=threading.condition([lock/rlock]): 锁是可选选项,不传入锁,对象自动创建一个rlock()。
wait():条件不满足时调用,线程会释放锁并进入等待阻塞; notify():条件创造后调用,通知等待池激活一个线程; notifyall():条件创造后调用,通知等待池激活所有线程。
import time import threading import random lock = threading.condition() class producer(threading.thread): def __init__(self,name): threading.thread.__init__(self) self.name = name def run(self): global product while true: if lock.acquire(): p = random.randint(1,10) print('机器%s生产了%d件产品'%(self.name,p)) product+=p lock.notify() lock.release() time.sleep(2) class consumer(threading.thread): def __init__(self,name): threading.thread.__init__(self) self.name = name def run(self): global product while true: if lock.acquire(): if product>=1: print('客户%s购买了1件产品'%self.name) product -=1 lock.notify() lock.release() time.sleep(2) if __name__=="__main__": product = 0 t = [] for i in range(5): t.append(producer(i)) t.append((consumer(1))) t.append(consumer(2)) for x in t: x.start() ''' 机器0生产了10件产品 机器1生产了4件产品 机器2生产了1件产品 机器3生产了2件产品 机器4生产了3件产品 客户1购买了1件产品 客户2购买了1件产品 机器0生产了8件产品 机器1生产了7件产品 机器2生产了5件产品 机器3生产了1件产品 机器4生产了4件产品 客户2购买了1件产品 客户1购买了1件产品 机器0生产了8件产品 机器1生产了4件产品 机器2生产了5件产品 机器3生产了1件产品 机器4生产了1件产品 客户1购买了1件产品 客户2购买了1件产品 机器0生产了5件产品 机器1生产了2件产品 '''
wait等待notify的通知,当接到通知后,会重新从if acquire()开始执行
同步条件(event)
event.isset():返回event的状态值; event.wait():如果 event.isset()==false将阻塞线程; event.set(): 设置event的状态值为true,所有阻塞池的线程激活进入就绪状态,等待操作系统调度; event.clear():恢复event的状态值为false。
import time import threading import random event1=threading.event() event2=threading.event() event3=threading.event() class producer(threading.thread): def run(self): event1.wait() print('salesman:这个东西100块。') print('salesman:你要吗?') event1.clear() event2.set() event3.wait() print('salesman:好的') event3.clear() class consumer(threading.thread): def run(self): print('customer:这个东西多少钱?') event1.set() event2.wait() print('customer:嗯,帮我包起来') event2.clear() event3.set() if __name__=="__main__": t = [] t.append(producer()) t.append(consumer()) for x in t: x.start() ''' customer:这个东西多少钱? salesman:这个东西100块。 salesman:你要吗? customer:嗯,帮我包起来 salesman:好的 '''
队列queue
创建一个“队列”对象
import queue
q = queue.queue(maxsize = 10)
queue.queue类即是一个队列的同步实现。队列长度可为无限或者有限。可通过queue的构造函数的可选参数maxsize来设定队列长度。如果maxsize小于1就表示队列长度无限。
将一个值放入队列中
q.put(10)
调用队列对象的put()方法在队尾插入一个项目。put()有两个参数,第一个item为必需的,为插入项目的值;第二个block为可选参数,默认为
1。如果队列当前为空且block为1,put()方法就使调用线程暂停,直到空出一个数据单元。如果block为0,put方法将引发full异常。
将一个值从队列中取出
q.get()
调用队列对象的get()方法从队头删除并返回一个项目。可选参数为block,默认为true。如果队列为空且block为true,get()就使调用线程暂停,直至有项目可用。如果队列为空且block为false,队列将引发empty异常。
python queue模块有三种队列及构造函数:
1、python queue模块的fifo队列先进先出。 class queue.queue(maxsize)
2、lifo类似于堆,即先进后出。 class queue.lifoqueue(maxsize)
3、还有一种是优先级队列级别越低越先出来。 class queue.priorityqueue(maxsize)
此包中的常用方法(q = queue.queue()):
q.qsize() 返回队列的大小
q.empty() 如果队列为空,返回true,反之false
q.full() 如果队列满了,返回true,反之false
q.full 与 maxsize 大小对应
q.get([block[, timeout]]) 获取队列,timeout等待时间
q.get_nowait() 相当q.get(false)
非阻塞 q.put(item) 写入队列,timeout等待时间
q.put_nowait(item) 相当q.put(item, false)
q.task_done() 在完成一项工作之后,q.task_done() 函数向任务已经完成的队列发送一个信号
q.join() 实际上意味着等到队列为空,再执行别的操作
import threading,queue from time import sleep from random import randint class production(threading.thread): def run(self): while true: r=randint(0,100) q.put(r) print("生产出来%s号包子"%r) sleep(1) class proces(threading.thread): def run(self): while true: re=q.get() print("吃掉%s号包子"%re) if __name__=="__main__": q=queue.queue(10) threads=[production(),production(),production(),proces()] for t in threads: t.start() ''' 生产出来55号包子 生产出来100号包子 生产出来67号包子 吃掉55号包子 吃掉100号包子 吃掉67号包子 生产出来23号包子生产出来97号包子生产出来41号包子吃掉23号包子 吃掉97号包子 吃掉41号包子 吃掉4号包子 生产出来4号包子 生产出来45号包子吃掉45号包子生产出来84号包子 '''
上一篇: 鸡蛋和红薯能一起吃吗?红薯不能和哪些食物一起吃呢?
下一篇: PHP图象函数