欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

11.多线程、多进程和线程池编程

程序员文章站 2022-06-14 10:18:57
1.1.线程同步Lock和Rlock (1)Lock 用锁会影响性能 用锁会产生死锁 (2)RLock RLock:在同一个线程里面,可以连续多次调用acquire,一定要注意acquire和release的次数相等 1.2.线程同步 - condition 使用condition模拟对话 结果: ......

1.1.线程同步lock和rlock

(1)lock

  • 用锁会影响性能
  • 用锁会产生死锁
import threading
from threading import lock

total = 0
lock = lock()

def add():
    global total
    global local
    for i in range(100000):
        lock.acquire()
        # lock.acquire()   #如果再加把锁会产生死锁
        total += 1
        lock.release()

def desc():
    global total
    global local
    for i in range(100000):
        lock.acquire()     #获取锁
        total -= 1
        lock.release()     #释放锁

thread1 = threading.thread(target=add)
thread2 = threading.thread(target=desc)
thread1.start()
thread2.start()
thread1.join()
thread2.join()
print(total)   #0

(2)rlock

rlock:在同一个线程里面,可以连续多次调用acquire,一定要注意acquire和release的次数相等

import threading
from threading import lock,rlock

total = 0
lock = rlock()

def add():
    global total
    global local
    for i in range(100000):
        #用rlock在同一线程里面,可以多次调用acquire,不会产生死锁
        lock.acquire()
        lock.acquire()
        total += 1
        #release的次数和acquire的次数相等
        lock.release()
        lock.release()

def desc():
    global total
    global local
    for i in range(100000):
        lock.acquire()     #获取锁
        total -= 1
        lock.release()     #释放锁

thread1 = threading.thread(target=add)
thread2 = threading.thread(target=desc)
thread1.start()
thread2.start()
thread1.join()
thread2.join()
print(total)   #0

1.2.线程同步 - condition 

使用condition模拟对话

import threading
from threading import condition

 #通过condition,完成协同读诗
class xiaoai(threading.thread):
    def __init__(self,cond):
        super().__init__(name='小爱')
        self.cond = cond

    def run(self):
        with self.cond:
            #等待
            self.cond.wait()
            print("{} : 在".format(self.name))
            #通知
            self.cond.notify()

            self.cond.wait()
            print("{} : 好啊".format(self.name))
            self.cond.notify()

            self.cond.wait()
            print("{} : 君住长江尾".format(self.name))
            self.cond.notify()

class tianmao(threading.thread):
    def __init__(self,cond):
        super().__init__(name="天猫精灵")
        self.cond = cond

    def run(self):
        with self.cond:
            print("{} : 小爱同学".format(self.name))
            self.cond.notify()
            self.cond.wait()

            print("{} : 我们来对古诗吧".format(self.name))
            self.cond.notify()
            self.cond.wait()

            print("{} : 我在长江头".format(self.name))
            self.cond.notify()
            self.cond.wait()

if __name__ == '__main__':
    cond = threading.condition()
    xiaoai = xiaoai(cond)
    tianmao = tianmao(cond)

    xiaoai.start()
    tianmao.start()

结果:

11.多线程、多进程和线程池编程

 

1.3.线程同步 - semaphore 

控制线程并发数量

#samaphore是用于控制进入数量的锁

import threading
import time

class htmlspider(threading.thread):
    def __init__(self,url,sem):
        super().__init__()
        self.url = url
        self.sem = sem

    def run(self):
        time.sleep(2)
        print("got html text success!")
        self.sem.release()   #释放锁

class urlproducer(threading.thread):
    def __init__(self, sem):
        super().__init__()
        self.sem = sem
    def run(self):
        for i in range(20):
            self.sem.acquire()    #加锁
            html_htread = htmlspider("baidu.com/{}".format(i), self.sem)
            html_htread.start()

if __name__ == '__main__':
    #控制线程并发数量为3
    sem = threading.semaphore(3)
    url_producer = urlproducer(sem)
    url_producer.start()

11.4.threadpoolexecutor线程池

线程池

from concurrent.futures import threadpoolexecutor, as_completed
import time

#为什么要线程池
#主线程中可以获取某一个线程的状态或者某一个任务的状态,以及返回值
#当一个线程完成的时候,主线程立马知道
#futures可以让多线程和多进程编码接口一致

def get_html(times):
    time.sleep(times)
    print("get page {} success".format(times))
    return times

executor = threadpoolexecutor(max_workers=2)

#通过submit提交执行的函数到线程池中,sumbit是立即返回
task1 = executor.submit(get_html, (3))    #函数和参数

#done方法用于判定某个任务是否完成
print(task1.done())      #false
time.sleep(4)
print(task1.done())      #true
#result方法查看task函数执行的结构
print(task1.result())    #3

用as_completed获取任务结束的返回

from concurrent.futures import threadpoolexecutor, as_completed
import time

#为什么要线程池
#主线程中可以获取某一个线程的状态或者某一个任务的状态,以及返回值
#当一个线程完成的时候,主线程立马知道
#futures可以让多线程和多进程编码接口一致

# def get_html(times):
#     time.sleep(times)
#     print("get page {} success".format(times))
#     return times
#
# executor = threadpoolexecutor(max_workers=2)
#
# #通过submit提交执行的函数到线程池中,sumbit是立即返回
# task1 = executor.submit(get_html, (3))    #函数和参数
#
# #done方法用于判定某个任务是否完成
# print(task1.done())      #false
# time.sleep(4)
# print(task1.done())      #true
# #result方法查看task函数执行的结构
# print(task1.result())    #3

def get_html(times):
    time.sleep(times)
    print("get page {} success".format(times))
    return times

executor = threadpoolexecutor(max_workers=2)

#获取已经成功的task的返回
urls = [3,2,4]
all_task = [executor.submit(get_html, (url)) for url in urls]

for future in as_completed(all_task):
    data = future.result()
    print(data)   #已经成功的task函数的return

11.5.进程间通信 - queue

queue

import time

from multiprocessing import process, queue

def producer(queue):
    queue.put("a")
    time.sleep(2)

def consumer(queue):
    time.sleep(2)
    data = queue.get()
    print(data)

if __name__ == '__main__':
    queue = queue(10)
    my_producer = process(target=producer, args=(queue,))
    my_consumer = process(target=consumer, args=(queue,))

    my_producer.start()
    my_consumer.start()
    my_producer.join()
    my_consumer.join()

11.6.进程间通信 - manager

manger

import time

from multiprocessing import process, queue, manager,pool

def producer(queue):
    queue.put("a")
    time.sleep(2)

def consumer(queue):
    time.sleep(2)
    data = queue.get()
    print(data)

if __name__ == '__main__':
    #pool中的进程间通信需要使用manger中的queue
    queue = manager().queue(10)
    pool = pool(2)   #创建进程池

    pool.apply_async(producer, args=(queue, ))
    pool.apply_async(consumer, args=(queue, ))

    pool.close()
    pool.join()

11.7.进程间通信 - pipe

pipe实现进程间通信(只能两个进程之间)

#pipe进程间通信
from multiprocessing import process, pipe

def producer(pipe):
    pipe.send("derek")

def consumer(pipe):
    print(pipe.recv())

if __name__ == '__main__':
    receive_pipe, send_pipe = pipe()
    my_producer = process(target=producer, args=(send_pipe, ))
    my_consumer = process(target=consumer, args=(receive_pipe, ))

    my_producer.start()
    my_consumer.start()
    my_producer.join()
    my_producer.join()