欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

python多线程建立代理ip池

程序员文章站 2022-12-21 14:35:37
之前有写过用单线程建立代理ip池,但是大家很快就会发现,用单线程来一个个测试代理ip实在是太慢了,跑一次要很久才能结束,完全无法忍受。所以这篇文章就是换用多线程来建立ip池,会比用单线程快很多。之所以用多线程而不是多进程,是因为测试时间主要是花费在等待网络传递数据上,处理本地计算的时间很短,用多线程 ......

之前有写过用单线程建立代理ip池,但是大家很快就会发现,用单线程来一个个测试代理ip实在是太慢了,跑一次要很久才能结束,完全无法忍受。所以这篇文章就是换用多线程来建立ip池,会比用单线程快很多。之所以用多线程而不是多进程,是因为测试时间主要是花费在等待网络传递数据上,处理本地计算的时间很短,用多线程能更好地发挥单核性能,而且多线程开销比多进程开销小得多。当然,单核性能会有极限,如果想再提高性能就需要多进程和多线程混用了。当然这里说的是用cpython作为解释器时候的情况,因为绝大多数人用的都是cpython,所以以下说的都是这种情况。

 

受限于个人学识,对多进程和多线程的理解也不是很深刻,如果以后有机会会写写关于并发编程的文章。cpython因为gil锁的原因,多线程无法发挥多核性能,但是可以用多进程来发挥多核性能。注意gil锁不是python语言特性,只是cpython解释器的原因。任何python线程在执行前,都必须获得gil锁,然后每执行100条字节码,解释器就自动释放gil锁,让别的线程执行。所以python线程只能交替执行,即使有多个线程跑在多核cpu上,也只能利用一个核。

 

其实程序主体在之前的文章已经写好了,我们需要的只是稍微做点改进,以适合多线程编程。我的思路是,设置一个线程专门用来爬取待测试ip,其他线程获取待测试ip进行测试。这也是分布式编程的思想。

 

我们首先设置一个队列,用来储存待测试ip。

thread_lock = threading.lock()
test_ip_list = queue()

然后对之前的函数进行一些修改。

def download_page(url, timeout=10):
    headers=hidden_reptile.random_header()
    data = requests.get(url, headers=headers, timeout=timeout)
    return data


def test_ip(test_url):
    while true:
        if test_ip_list.empty():
            return
        ip = test_ip_list.get()
        proxies = {
            'http': ip[0]+':'+ip[1],
            'https': ip[0] + ':' + ip[1]
        }
        try_ip = ip[0]
        try:
            r=requests.get(test_url,headers=hidden_reptile.random_header(),proxies=proxies,timeout=10)
            if r.status_code == 200:
                r.encoding = 'gbk'
                result=re.search('\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}',r.text)
                result=result.group()
                if result[:9]==try_ip[:9]:
                    print('%s:%s 测试通过' % (ip[0],ip[1]))
                    thread_lock.acquire()
                    with open('proxy_ip.txt', 'a') as f:
                        f.write(ip[0] + ':' + ip[1] + '\n')
                    thread_lock.release()
                else:
                    print('%s:%s 携带代理失败,使用了本地ip' %(ip[0],ip[1]))
            else:
                print('%s:%s 请求码不是200' %(ip[0],ip[1]))
        except exception as e:
            print(e)
            print('%s:%s 错误' %(ip[0],ip[1]))


def get_proxies(page_num, ip_url_list):
    for ip_url in ip_url_list:
        for page in range(1, page_num+1):
            print("抓取第%d页代理ip" %page)
            url= ip_url.format(page)
            r=download_page(url)
            r.encoding='utf-8'
            pattern = re.compile('<td class="country">.*?alt="cn" />.*?</td>.*?<td>(.*?)</td>.*?<td>(.*?)</td>', re.s)
            ip_list= re.findall(pattern, r.text)
            for ip in ip_list:
                test_ip_list.put(ip)
            time.sleep(10)
        print('{}抓取结束'.format(ip_url))

注意写入文件的时候需要加进程锁,因为写入的是同一个文件,不加线程锁的话可能一个线程写入到一半,就被其他线程抢了,然后写入其他东西。所有的待测试ip都来自python队列test_ip_list,对其进行操作的时候不用添加线程锁,因为它自带了线程锁。

 

最后,写运行部分。

if __name__ == '__main__':
    number_of_threads = 8
    total_pages = 20
    threads = []
    url = ["http://www.xicidaili.com/nt/{}"]
    test_url = 'http://ip.tool.chinaz.com/'

    t = threading.thread(target=get_proxies, args=(total_pages, url))
    t.setdaemon(true)
    t.start()
    threads.append(t)
    time.sleep(1)
    for i in range(1, number_of_threads):
        t = threading.thread(target=test_ip, args=(test_url,))
        t.setdaemon(true)
        threads.append(t)
        t.start()
    for thread in threads:
        thread.join()

 

如果有其他可以爬取ip的网址可以加到url列表中,total_page是总共爬取的页数。开了第一个线程之后暂停1s,是在等待它添加待测试ip进入队列中。