欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

荐 Python内存管理与多线程

程序员文章站 2024-01-18 15:11:31
1. 内存管理机制1.1 赋值语句的内存分析赋值语句都是“引用”,可以这样理解,但是,这种“引用”是可以改变指向的非赋值语句时对于可变类型的数据, 创建一块新内存(Set、Dictionary、List)对于不可变数据类型简单数据,存在使用“引用”,不存在创建新内存复杂大数据,创建新内存1.2 垃圾回收机制1.2.1 垃圾自动回收以引用计数为主,分代收集为辅# 引用计数# 1.每个对象都有存有指向该对象的引用总数# 2.查看某个对象的引用计数 -->...

1. 内存管理机制

1.1 赋值语句的内存分析

  1. 赋值语句都是“引用”,可以这样理解,但是,这种“引用”是可以改变指向的
  2. 非赋值语句时
    1. 对于可变类型的数据, 创建一块新内存(Set、Dictionary、List)
    2. 对于不可变数据类型
      • 简单数据,存在使用“引用”,不存在创建新内存

      • 复杂大数据,创建新内存

1.2 垃圾回收机制

1.2.1 垃圾自动回收

  1. 以引用计数为主,分代收集为辅

    # 引用计数
    # 1.每个对象都有存有指向该对象的引用总数
    # 2.查看某个对象的引用计数 --> sys.getrefcount(obj)
    # 3.可以使用del关键字删除某个引用
    
    # 分代回收
    # python将所有的对象分为0,1,2三代
    # 所有的新建对象都是0代对象
    # 当某一代对象经历垃圾回收,依然存活,那么它就被归入下一代对象
    
  2. 如果一个对象的引用数为0,python虚拟机就会回收这个对象的内存

  3. 引用计数的缺陷是循环引用问题

  4. 垃圾自动回收总结

    # 垃圾自动回收
    # 满足特定条件,自动启动垃圾回收
    # 当python运行时,会记录其中分配对象(object allocation)和取消分配对象(object deallocation)的次数
    # 当两者的差值高于某个阀值时,垃圾回收才会启动
    # 查看阀值gc.get_threshold()
    

1.2.2 垃圾手动回收

# 垃圾手动回收
# gc.collect()手动回收
# objgraph模块中的count(obj)记录当前类产生的实例对象的个数

1.3 内存管理机制

1.3.1 内存池机制

当创建大量消耗小内存的对象时,频繁调用new/malloc会导致大量的内存碎片,致使效率降低。内存池的概念就是在内存中申请一定数量的,大小相等的内存块留作备用,当有新的内存需求时,就先从内存池中分配内存给这个需求,不够了之后再申请新的内存。这样做最显著的优势就是能够减少内存碎片,提升效率

1.3.2 Python3内存管理机制——Pymalloc

  1. 针对小对象(<=512bytes),pymalloc会在内存池中申请内存空间
  2. 当>512bytes,则会PyMem_RawMalloc和PyMem_RawRealloc()来申请新的内存空间

2. Python多线程

2.1 进程、线程、协程介绍

2.1.1 关系介绍

  1. 都运行在操作系统
  2. 一个应用中至少一个进程
  3. 一个进程中至少一个线程
  4. 协程在一个进程或者一个线程中执行

2.1.2 进程介绍

  1. 进程是一个执行中的程序
  2. 每个进程都拥有自己的地址空间、内存、数据栈以及其他用于跟踪执行的辅助数据
  3. 操作系统管理其上所有进程的执行,并为这些进程合理地分配时间
  4. 进程也可以通过派生(fork或spawn)新的进程来执行其他任务

2.1.3 线程介绍

  1. 在同一个进程下执行,并共享相同的上下文

  2. 一个进程中的各个线程与主线程共享同一片数据空间

  3. 线程包括开始、执行顺序和结束三部分

  4. 它可以被抢占(中断)和临时挂起(睡眠)–让步

  5. 线程一般是以并发方式执行

    # 并发
    # 并发是一种属性,是程序、算法或问题的属性
    # 并行只是并发问题的可能方法之一
    # 如果两个事件互不影响,则两个事件是并发的
    

2.1.4 协程介绍

  1. 协程就是协同多任务
  2. 协程在一个进程或者一个线程中执行
  3. 不需要锁的机制(自己调动,在一个进程或者一个线程中执行)
  4. 对多核CPU的利用——多进程+协程

2.2 对多核利用及GIL概念

2.2.1 对多核的利用

  1. 单核CPU系统中,不存在真正的并发
  2. GIL——全局解释器锁
  3. GIL只是强制在任何时候只有一个线程可以执行python代码
  4. I/O密集型应用与CPU密集型应用

2.2.2 GIL执行顺序

  1. 设置GIL
  2. 切换进一个线程去运行
  3. 执行下面操作之一
    • 指定数量的字节码zhiling
    • 线程主动让出控制权(可以调用time.sleep(0)来完成)
  4. 把线程设置回睡眠状态(切换出线程)
  5. 解锁GIL
  6. 重复上述步骤

2.3 线程

2.3.1 线程的threading模块(对象、属性、方法)

threading模块可以代替thread模块

对象 描述
Thread 表示一个执行线程的对象
Lock 锁对象
RLock 可重入锁对象,使单一线程可以(再次)获得已持有的锁(递归锁)
Condition 条件变量对象,使得一个线程等待另一个线程满足特定的“条件”,比如改变状态或某个数据值
Event 条件变量的通用版本,任意数量的线程等待某个事件的发生,在该事件发生后所有线程将被激活
Semaphore 为线程间共享的有限资源提供了一个“计时器”,如果没有可用资源时会被阻塞
BoundedSemap 与Semaphore相似,不过它不允许超过初始值
Timer 与Thread相似,不过它要在运行前等待一段时间
Barrier 创建一个“障碍”,必须达到指定数量的线程后才可以继续
属性 描述
name 线程名
ident 线程的标识符
daemon 布尔标志,表示这个线程是否是守护线程
方法 描述
__init__() 实例化一个线程对象,需要有一个可调用的target,以及其参数args或kwargs
start() 开始执行该线程
run() 定义线程功能的方法(通常在子类中被重写)
join(timeout=None) 直至启动的线程终止之前一直挂起;除非给出了timeout(秒),否则会一直阻塞
getName() 返回线程名
setName(name) 设定线程名
isAlivel()/is_alive() 布尔标识,表示这个线程是否还存活
isDaemon() 如果是守护线程,则返回True;否则,返回False
setDaemon() 把线程的守护标志设置为布尔值daemonic(必须在线程start()之前调用)

2.3.2 不面向对象实现线程

  1. threading.Thead创建线程

  2. start()启动线程

  3. join()挂起线程

    import threading
    def run():
        for i in range(1,5):
            print(threading.currentThread().getName()+"---"+str(i))
    def main():
        print(threading.currentThread().getName())
        thread=threading.Thread(target=run,name="thread-loop")
        thread.start()
        thread.join()
    if __name__ == '__main__':
        main()
        
    Result:
    	MainThread
        thread-loop---1
        thread-loop---2
        thread-loop---3
        thread-loop---4
    

2.3.3 面向对象实现线程

import threading,time
class MyThread(threading.Thread):
    def run(self):
        for i in range(0,5):
            print(threading.currentThread().getName()+"---"+str(i))
            time.sleep(1)
if __name__ == '__main__':
    print(threading.currentThread().getName())
    thread=MyThread(name="loopthread")
    thread.start()
    thread.join()
    
Result:
    MainThread
    loopthread---0
    loopthread---1
    loopthread---2
    loopthread---3
    loopthread---4

2.3.4 多线程并发问题

  • 一个线程没操作完,另外线程就开始操作

    import threading,time
    balance=0
    def resource(n):
        global balance
        balance+=n
        time.sleep(1)
        balance-=n
        print(threading.currentThread().getName()+"  n="+str(n)+"  balance="+str(balance))
    class MyThread(threading.Thread):
        def __init__(self,n,*args,**kwargs):
            super().__init__(*args,**kwargs)
            self.n=n
    
        def run(self):
            for i in range(0,5):
                resource(self.n)
    if __name__ == '__main__':
        thread1=MyThread(n=5,name="thread1")
        thread2 = MyThread(n=8,name="thread2")
        thread1.start()
        thread2.start()
        thread1.join()
        thread2.join()
        
    Result:
        thread2  n=8  balance=5thread1  n=5  balance=0
    
        thread1  n=5  balance=8
        thread2  n=8  balance=5
        thread1  n=5  balance=8
        thread2  n=8  balance=5
        thread2  n=8  balance=5thread1  n=5  balance=0
        
    

2.3.5 多线程中的锁

  1. lock threading.Lock() 获取锁 lock threading.RLock()(对于Lock()只能锁一次来说,RLock()可以多次锁定)

  2. void lock.acquire() 获取锁

  3. void lock.release()释放锁

  4. 锁定时最好利用 try...catch...finally

  5. 可以使用 with lock:自动解锁

    import threading,time
    balance=0
    lock=threading.Lock()
    def resource(n):
        global balance
        with lock:
            balance+=n
            time.sleep(1)
            balance-=n
            print(threading.currentThread().getName()+"  n="+str(n)+"  balance="+str(balance))
    class MyThread(threading.Thread):
        def __init__(self,n,*args,**kwargs):
            super().__init__(*args,**kwargs)
            self.n=n
    
        def run(self):
            for i in range(0,5):
                resource(self.n)
    if __name__ == '__main__':
        thread1=MyThread(n=5,name="thread1")
        thread2 = MyThread(n=8,name="thread2")
        thread1.start()
        thread2.start()
        thread1.join()
        thread2.join()
        
    Result:
    	thread1  n=5  balance=0
        thread1  n=5  balance=0
        thread1  n=5  balance=0
        thread1  n=5  balance=0
        thread1  n=5  balance=0
        thread2  n=8  balance=0
        thread2  n=8  balance=0
        thread2  n=8  balance=0
        thread2  n=8  balance=0
        thread2  n=8  balance=0
    

2.3.6 线程的调度和优化(利用线程池优化)

  1. 建立线程池,不用每次再建立线程,可以对之前的线程直接拿来使用
# 不使用多线程
import time,threading

def run(n):
    time.sleep(1)
    print(threading.current_thread().name+"   "+str(n))
def low():
    global n
    begin_time=time.time()
    for i in range(0,10):
        run(i)
    print("time:"+str(time.time()-begin_time))
if __name__ == '__main__':
    low()
    
Result:
    MainThread   0
    MainThread   1
    MainThread   2
    MainThread   3
    MainThread   4
    MainThread   5
    MainThread   6
    MainThread   7
    MainThread   8
    MainThread   9
    time:10.00597333908081
# 使用多线程,不使用线程池
import time,threading
count=0
def run(n):
    time.sleep(1)
    print(threading.current_thread().name+"   "+str(n))
def normal():
    global count
    begin_time=time.time()
    allthread=[]
    for i in range(0,3):
        if i!=2:
            for m in range(0,4):  # 在计算机中一次只能开4个线程,这个是不利用线程池,不重复使用线程
                    thread=threading.Thread(target=run,args=(count,))
                    count+=1
                    thread.start()
                    allthread.append(thread)
            for thread in allthread:
                thread.join()
        else:
            for m in range(0,2):  # 最后一轮只用开2个线程
                    thread=threading.Thread(target=run,args=(count,))
                    count+=1
                    thread.start()
                    allthread.append(thread)
            for thread in allthread:
                thread.join()
    print("time:"+str(time.time()-begin_time))
if __name__ == '__main__':
    normal()
    
Result:
    Thread-1   0
    Thread-2   1
    Thread-4   3Thread-3   2

    Thread-5   4
    Thread-6   5
    Thread-7   6
    Thread-8   7
    Thread-9   8
    Thread-10   9
    time:3.0081448554992676
# 第一种线程池 multiprocessing.dummy.Pool
import time,threading
from multiprocessing.dummy import Pool

def run(n):
    time.sleep(1)
    print(threading.current_thread().name+"   "+str(n))
def pool():
    begin_time=time.time()
    n_list=range(10)
    pool=Pool(4)
    pool.map(run,n_list)
    pool.close()
    pool.join()
    print("time:"+str(time.time()-begin_time))
if __name__ == '__main__':
    pool()
    
Result:
    Thread-1   0
    Thread-4   3
    Thread-2   1Thread-3   2

    Thread-2   7Thread-1   4Thread-3   6

    Thread-4   5

    Thread-3   8Thread-2   9

    time:3.029418706893921
# 第二种线程池(不用start()、join())concurrent.futures.thread.ThreadPoolExecutor
import time,threading
from concurrent.futures.thread import ThreadPoolExecutor

def run(n):
    time.sleep(1)
    print(threading.current_thread().name+"   "+str(n))
def pool():
    begin_time=time.time()
    n_list=range(10)
    with ThreadPoolExecutor(max_workers=4) as pool:
        pool.map(run,n_list)
    print("time:"+str(time.time()-begin_time))
if __name__ == '__main__':
    pool()
    
Result:
    ThreadPoolExecutor-0_0   0
    ThreadPoolExecutor-0_2   2
    ThreadPoolExecutor-0_3   3
    ThreadPoolExecutor-0_1   1
    ThreadPoolExecutor-0_0   4
    ThreadPoolExecutor-0_2   5ThreadPoolExecutor-0_3   6

    ThreadPoolExecutor-0_1   7
    ThreadPoolExecutor-0_0   8
    ThreadPoolExecutor-0_3   9
    time:3.0045437812805176

数据太少,无法准确判断线程池处理大量数据的明显优势;在大量数据的处理时,线程池效率高,第二种线程池比第一种线程池效率高

2.4 进程

2.4.1 进程的multiprocessing模块

  1. 创建进程 Process multiprocessing.Process()
  2. 启动进程 void Process.start()
  3. 挂起进程 void Process.join()
  4. 获取进程的ID int os.getpid()

2.4.1 不面向对象实现进程

import multiprocessing
import os
import time


def somthing(process_name):
    print("process_name:{}".format(process_name))
    time.sleep(2)
    print("process_id:{}".format(os.getpid()))
if __name__ == '__main__':
    process=multiprocessing.Process(target=somthing,args=("my_progress",))
    process.start()
    process.join()
    
Result:
    process_name:my_progress
	process_id:12188

2.4.2 面向对象实现进程

import os
import time
from multiprocessing.context import Process

class MyProgress(Process):
    def __init__(self,my_progress_name,*args,**kwargs):
        self.my_progress_name=my_progress_name
        self.name="i will be covered"
        print("super__init__:{}".format(self.name))
        # slef.name会被父类的构造方法所覆盖
        super().__init__(*args,**kwargs)
        print("after_super__init__:{}".format(self.name))

    def run(self):
        print("----------run------------")
        print("my_process_name:{}".format(self.my_progress_name))
        print("process_name:{}".format(self.name))
        time.sleep(2)
        print("process_id:{}".format(os.getpid()))
        print("-----------run-----------")
if __name__ == '__main__':
    progress=MyProgress("myProgress")
    progress.start()
    progress.join()
    
Result:
    super__init__:i will be covered
    after_super__init__:MyProgress-1
    ----------run------------
    my_process_name:myProgress
    process_name:MyProgress-1
    process_id:10096
    -----------run-----------

2.4.3 进程之间的通信

  • 通过Queue、Pipes等实现进程之间的通信
import time
from multiprocessing import Process,Queue,current_process
class WriteProgress(Process):
    def __init__(self,queue,*args,**kwargs):
        self.queue=queue
        super().__init__(*args,**kwargs)
    def run(self):
        msg=["This is first msg",
             "This is second msg",
             "This is third msg",
             "This is fourth msg",
             "This is fifth msg"]
        for msg_item in msg:
            self.queue.put(msg_item)
            print("{}--send--msg:{}".format(current_process(),msg_item))
            time.sleep(2)

class ReadProgress(Process):
    def __init__(self,queue,*args,**kwargs):
        self.queue=queue
        super().__init__(*args,**kwargs)
    def run(self):
        while True:
            msg=self.queue.get()
            print("{}--get--msg:{}".format(current_process(),msg))
if __name__ == '__main__':
    queue=Queue()
    write=WriteProgress(queue)
    read=ReadProgress(queue)
    write.start()
    read.start()
    write.join()
    write.join()
    read.terminate()
    
Result:
    <WriteProgress(WriteProgress-1, started)>--send--msg:This is first msg
    <ReadProgress(ReadProgress-2, started)>--get--msg:This is first msg
    <WriteProgress(WriteProgress-1, started)>--send--msg:This is second msg
    <ReadProgress(ReadProgress-2, started)>--get--msg:This is second msg
    <WriteProgress(WriteProgress-1, started)>--send--msg:This is third msg
    <ReadProgress(ReadProgress-2, started)>--get--msg:This is third msg
    <WriteProgress(WriteProgress-1, started)>--send--msg:This is fourth msg
    <ReadProgress(ReadProgress-2, started)>--get--msg:This is fourth msg
    <WriteProgress(WriteProgress-1, started)>--send--msg:This is fifth msg
    <ReadProgress(ReadProgress-2, started)>--get--msg:This is fifth msg

2.4.4 多进程的锁

  • Lock() RLock() Condition()
  • RLock()可以多次锁定
  • Lock()只能锁一次,否则造成死锁
# 不使用锁,会让进程”随意“执行,一会这个进程执行,一会另一个程执行
from multiprocessing import Process,current_process
import time

class WriteProgress(Process):
    def __init__(self,my_name,*args,**kwargs):
        self.my_name=my_name
        super().__init__(*args,**kwargs)
    def run(self):
        for i in range(0,3):
            print("progress:{}--progress_number{}".format(self.my_name,i))
            time.sleep(2)
if __name__ == '__main__':
    write1=WriteProgress("Progress1")
    write2=WriteProgress("Progress2")
    write3=WriteProgress("Progress3")
    write1.start()
    write2.start()
    write3.start()
    write1.join()
    write2.join()
    write3.join()
    
Result:
    progress:Progress1--progress_number0
    progress:Progress2--progress_number0
    progress:Progress3--progress_number0
    progress:Progress1--progress_number1
    progress:Progress2--progress_number1
    progress:Progress3--progress_number1
    progress:Progress1--progress_number2
    progress:Progress2--progress_number2
    progress:Progress3--progress_number2
# 使用Lock(),会让一个进程执行好后其它进程再执行
from multiprocessing import Process, Lock
import time

class WriteProgress(Process):
    def __init__(self,lock,my_name,*args,**kwargs):
        self.my_name=my_name
        self.lock=lock
        super().__init__(*args,**kwargs)
    def run(self):
        with self.lock:
            for i in range(0,3):
                print("progress:{}--progress_number:{}".format(self.my_name,i))
                time.sleep(2)
if __name__ == '__main__':
    lock=Lock()
    write1=WriteProgress(lock,"Progress1")
    write2=WriteProgress(lock,"Progress2")
    write3=WriteProgress(lock,"Progress3")
    write1.start()
    write2.start()
    write3.start()
    write1.join()
    write2.join()
    write3.join()
    
Result:
    progress:Progress1--progress_number:0
    progress:Progress1--progress_number:1
    progress:Progress1--progress_number:2
    progress:Progress2--progress_number:0
    progress:Progress2--progress_number:1
    progress:Progress2--progress_number:2
    progress:Progress3--progress_number:0
    progress:Progress3--progress_number:1
    progress:Progress3--progress_number:2

2.4.5 进程池

# 利用进程池,同步任务
# 同步任务,每一个任务执行完后才能拿到返回值,而且拿到的就是返回的值,而不是对象
from multiprocessing import Pool,current_process
import time


def run(i):
    time.sleep(1)
    return "Progress:{}--assignment_number:{}".format(current_process(), i)

if __name__ == '__main__':
    pool=Pool(2)
    for i in range(0,10):
        result=pool.apply(run,args=(i,))
        print(result)
    pool.close()
    pool.join()
    
Result:
        Progress:<SpawnProcess(SpawnPoolWorker-1, started daemon)>--assignment_number:0
    Progress:<SpawnProcess(SpawnPoolWorker-2, started daemon)>--assignment_number:1
    Progress:<SpawnProcess(SpawnPoolWorker-1, started daemon)>--assignment_number:2
    Progress:<SpawnProcess(SpawnPoolWorker-2, started daemon)>--assignment_number:3
    Progress:<SpawnProcess(SpawnPoolWorker-1, started daemon)>--assignment_number:4
    Progress:<SpawnProcess(SpawnPoolWorker-2, started daemon)>--assignment_number:5
    Progress:<SpawnProcess(SpawnPoolWorker-1, started daemon)>--assignment_number:6
    Progress:<SpawnProcess(SpawnPoolWorker-2, started daemon)>--assignment_number:7
    Progress:<SpawnProcess(SpawnPoolWorker-1, started daemon)>--assignment_number:8
    Progress:<SpawnProcess(SpawnPoolWorker-2, started daemon)>--assignment_number:9
# 利用进程池,异步任务
# 异步任务,可以先那到返回值对象,对象拿到的快
# 对象拿完之后,再对任务进行执行,将数据放入对象之中
from multiprocessing import Pool,current_process
import time
def run(i):
    # print("Progress:{}--assignment_number:{}".format(current_process(), i))
    time.sleep(1)
    return "Progress:{}--assignment_number:{}".format(current_process(), i)

if __name__ == '__main__':
    pool=Pool(2)
    list=[]
    for i in range(0,10):
        result=pool.apply_async(run,args=(i,))
        print(result) # 拿到返回值对象对象很快
        list.append(result)
    pool.close()
    pool.join()
    for i in range(0,10):
        print(list[i].get())
        
Result:
    <multiprocessing.pool.ApplyResult object at 0x0000019C8399FAC8>
    <multiprocessing.pool.ApplyResult object at 0x0000019C8399FBA8>
    <multiprocessing.pool.ApplyResult object at 0x0000019C8399FC50>
    <multiprocessing.pool.ApplyResult object at 0x0000019C8399FCF8>
    <multiprocessing.pool.ApplyResult object at 0x0000019C8399FDA0>
    <multiprocessing.pool.ApplyResult object at 0x0000019C8399FE48>
    <multiprocessing.pool.ApplyResult object at 0x0000019C8399FFD0>
    <multiprocessing.pool.ApplyResult object at 0x0000019C839B90B8>
    <multiprocessing.pool.ApplyResult object at 0x0000019C839B9160>
    <multiprocessing.pool.ApplyResult object at 0x0000019C839B9208>
    ## 几乎是肉眼可见的一下子打印,但是在之前有较长一段时间
    Progress:<SpawnProcess(SpawnPoolWorker-1, started daemon)>--assignment_number:0
    Progress:<SpawnProcess(SpawnPoolWorker-2, started daemon)>--assignment_number:1
    Progress:<SpawnProcess(SpawnPoolWorker-1, started daemon)>--assignment_number:2
    Progress:<SpawnProcess(SpawnPoolWorker-2, started daemon)>--assignment_number:3
    Progress:<SpawnProcess(SpawnPoolWorker-1, started daemon)>--assignment_number:4
    Progress:<SpawnProcess(SpawnPoolWorker-2, started daemon)>--assignment_number:5
    Progress:<SpawnProcess(SpawnPoolWorker-1, started daemon)>--assignment_number:6
    Progress:<SpawnProcess(SpawnPoolWorker-2, started daemon)>--assignment_number:7
    Progress:<SpawnProcess(SpawnPoolWorker-1, started daemon)>--assignment_number:8
    Progress:<SpawnProcess(SpawnPoolWorker-2, started daemon)>--assignment_number:9

2.5 协程

2.5.1 Python3.5之前协程的实现

  • yield 生成器来实现
def yield_test(): # 协程
    while True:
        n=(yield)
        print(n)
if __name__ == '__main__':
    result=yield_test()
    next(result)
    result.send("send=ok")
    result.send("hello")
    
Result:
    send=ok
	hello

2.5.2 Python3.5之后协程的实现

  1. 使用 asyncawait 关键字来实现
  2. async 关键字
  • 定义特殊的函数
  • 被调用时,不执行里面的代码,而是返回一个协程对象
  • 在时间循环中调度其执行前,协程对象不执行任何操作
  1. await 关键字
  • 等待协程执行完成
  • 当遇到阻塞调用的函数的时候,使用 await 方法将协程的控制权让出,以便loop调用其它的协程
  1. asyncio 模块
    • get_event_loop() 获取事件循环队列
    • run_until_complete() 注册任务到队列
    • 在事件循环中调度其执行前,协程对象不执行任何操作
    • asyncio 模块用于事件循环
    • Bool asyncio.iscoroutinefunction(function) 判断是否为协程函数
import asyncio
async def do():
    print("we are doing it......")
    await asyncio.sleep(2)
obj=do()
# 得到事件循环队列
loop=asyncio.get_event_loop()
# 注册任务
task=loop.create_task(obj)
print(task)
# 等待协程任务执行完成
loop.run_until_complete(task)
print(task)

Result:
    <Task pending coro=<do() running at D:/study/python/MyTest/test.py:150>>
    we are doing it......
    ## wait 2 second
    <Task finished coro=<do() done, defined at D:/study/python/MyTest/test.py:150> result=None>

2.5.3 协程通信之嵌套调用

# 一个协程函数使用另一个协程函数的返回值
import asyncio
async def add(x,y):
    print("We are adding.......")
    await asyncio.sleep(1)
    print("We added completely")
    return x+y

async def get_add(x,y):
    result=await add(x,y)
    print("result={}".format(result))

loop=asyncio.get_event_loop()
loop.run_until_complete(get_add(10,50))
loop.close()

Result:
    We are adding.......
    We added completely
    result=60

2.5.4 协程通信之队列

import asyncio
async def add(queue):
    for i in range(0,5):
        await asyncio.sleep(1)
        await queue.put(i) # put()得到一个协程对象
        print("add number-->{},size-->{}".format(i,queue.qsize()))
async def reduce(queue):
    for i in range(0,10):
        result=await queue.get()  # get()得到一个协程对象
        print("reduce number-->{},size:{}".format(result,queue.qsize()))
if __name__ == '__main__':
    queue=asyncio.Queue(maxsize=5)
    add1=add(queue)
    add2=add(queue)
    reduce=reduce(queue)
    loop=asyncio.get_event_loop()
    loop.run_until_complete(asyncio.gather(add1,add2,reduce))
    loop.close()
    
Result:
  	add number-->0,size-->1
    add number-->0,size-->2
    reduce number-->0,size:1
    reduce number-->0,size:0
    add number-->1,size-->1
    add number-->1,size-->2
    reduce number-->1,size:1
    reduce number-->1,size:0
    add number-->2,size-->1
    add number-->2,size-->2
    reduce number-->2,size:1
    reduce number-->2,size:0
    add number-->3,size-->1
    add number-->3,size-->2
    reduce number-->3,size:1
    reduce number-->3,size:0
    add number-->4,size-->1
    add number-->4,size-->2
    reduce number-->4,size:1
    reduce number-->4,size:0  

本文地址:https://blog.csdn.net/weixin_46005735/article/details/107193529