欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

进程池、线程池及回调函数使用

程序员文章站 2022-06-12 20:05:56
...

一、线程池与进程池

池表示容器

线程就是装线程的容器

为什么要装到容器中

  1. 可以避免频繁的创建和销毁(进程/线程)来的资源开销

  2. 可以限制同时存在的线程数量 以保证服务器不会应为资源不足而导致崩溃

  3. 帮我们管理了线程的生命周期

  4. 管理了任务的分配

    import os
    import time
    from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
    from threading import activeCount,enumerate,currentThread
    
    # # 创建一个线程池   指定最多可以容纳两个线程
    # pool = ThreadPoolExecutor(20)
    #
    # def task():
    #     print(currentThread().name)
    #
    # # 提交任务到池子中
    # pool.submit(task)
    # pool.submit(task)
    #
    # print(enumerate())
    
    # 进程池的使用
    
    def task():
        time.sleep(1)
        print(os.getpid())
    
    
    if __name__ == '__main__':
        pool = ProcessPoolExecutor(2)
        pool.submit(task)
        pool.submit(task)
        pool.submit(task)

如果进程不结束 池子里面的进程或线程 也是一直存活的

二、同步异步

阻塞 非阻塞 程序的状态

并行 并发 串行 处理任务的方式

1、同步

指的是 提交任务后必须在原地等待 直到任务结束 ===阻塞????

2、异步

提交任务后不需要在原地等待 可以继续往下执行代码

异步效率高于同步 ,异步任务将导致一个问题 就是 任务的发起方不知道任务何时 处理完毕

异步同步指的是提交任务的方式

3、解决方法:

​ 1.轮询 重复的隔一段时间就问一次

        效率低 无法及时获取结果  不推荐

​ 2.让任务的执行方主动通知 (异步回调)

​ 可以及时拿到任务的结果 推荐方式

异步回调使用案例:

# 异步回调
from threading import Thread
# 具体的任务
def task(callback):
    print("run")
    for i in range(100000000):
        1+1
    callback("ok")
   

#回调函数 参数为任务的结果
def finished(res):
    print("任务完成!",res)


print("start")
t = Thread(target=task,args=(finished,))
t.start()  #执行task时 没有导致主线程卡主 而是继续运行
print("over")

线程池中回调的使用

# 使用案例:
def task(num):
    time.sleep(1)
    print(num)
    return "hello python"

def callback(obj):
    print(obj.result())


pool = ThreadPoolExecutor()
res = pool.submit(task,123)
res.add_done_callback(callback)
print("over") 

三、线程池回到函数案例(爬取校花网)

import requests
import re
from concurrent.futures import ThreadPoolExecutor

pool = ThreadPoolExecutor(50)

# 爬虫三部曲

# 一 发送请求
def get_page(url):
    print('%s GET start ...' % url)
    index_res = requests.get(url)
    return index_res.text

# 二 解析数据
# 解析主页
def parse_index(index_page):
    # 拿到主页的返回结果
    res = index_page.result()
    detail_urls = re.findall('<div class="items">.*?href="(.*?)"', res, re.S)
    # print(detail_urls)

    for detail_url in detail_urls:
        if not detail_url.startswith('http'):
            detail_url = 'http://www.xiaohuar.com' + detail_url

        pool.submit(get_page, detail_url).add_done_callback(parse_detail)
        # yield detail_url

# 解析详情页
def parse_detail(detail_page):
    res = detail_page.result()

    video_urls = re.findall('<source src="(.*?.mp4)">', res, re.S)
    print("==",video_urls)

    if video_urls:
        video_urls = video_urls[0]
        pool.submit(save_video, video_urls)

    # print(video_urls)


# 三 保存数据
import uuid
def save_video(video_url):
    try:
        print("---",video_url)
        res = requests.get(video_url)
        with open(r'/Users/haiyuanyang/Desktop/python9/movie/%s.mp4' % uuid.uuid4(), 'wb') as f:
            f.write(res.content)
            f.flush()
            print('%s done ...' % video_url)

    except Exception:
        pass


if __name__ == '__main__':
    base_url = 'http://www.xiaohuar.com/list-3-{}.html'
    for line in range(10):
        index_url = base_url.format(line)
        pool.submit(get_page, index_url).add_done_callback(parse_index)