欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

简易代理IP池的搭建

程序员文章站 2022-05-19 13:32:01
...

一、导论

这段时间在学习代理的相关知识。在爬虫的过程中,经常会遇到目标网站对同一IP的访问频率设置了限制,而设置代理是应对反爬虫的重要有效手段。目前互联网上也有不少免费的代理网站,比如西刺代理、快代理、66ip等等。但是笔者在使用过程中,发现几个问题:一是免费代理确实可用率太低;二是免费代理网站本身也部署了反爬虫的措施,并且对于部分网站比如西刺代理,使用该网站自身的高匿代理也无法进行访问,应该是设置了屏蔽。于是乎笔者谋生了自己搭建一个代理IP池的想法。

二、程序结构

本程序属于简易的IP代理池搭建,适合小规模的爬虫。整个程序分为三个模块:爬取代理模块、存储模块、测试模块。

1.存储模块

本程序存储代理IP的数据库选用的MongoDB,MongoDB是一个非关系型数据库,存取速度都很快。Python中操作MongoDB的第三方模块是pymongo。存储模块包括了存储IP和取出IP两个函数。

# 存储
def insert_to_MongoDB(ip, score):
    if myCol.find_one({"IP": ip}) == None: #重复ip不存储
        myCol.insert_one({"IP": ip, "Score": score})


# 取出
def get_from_MongoDB(n = 0):
    """
    :param n: 要返回的ip个数,默认为0取出全部
    :return:
    """
    r = myCol.find().sort("Score", -1).limit(n)
    return r

2.爬取代理模块

本模块是整个程序的核心,用于在多个代理网站爬取代理IP。这里只有西刺代理和快代理两个代理网站,可以根据需求扩展更多的代理网站。

在获取页面源代码函数(get_html)函数中可以看到,对于访问免费代理网站也被封IP的情况,采取了使用IP池已有的高匿代理进行尝试的措施。因为现有的免费代理网站,根据笔者的观察基本上都对同一IP的访问次数进行了限制,采取这种措施也是提高了代理IP的获取成功率。

各代理网站的爬取,是使用多线程并发运行,在后续的主程序中将可以看到。

# 获取页面源码
def get_html(url):
    headers = {"User-Agent": ua.random}
    try:
        response = requests.get(url=url, headers=headers, timeout=5)
        if response.status_code == 200:
            return response.text
    except Exception:
        pass # 获取源码失败
    # 如果不能访问,则使用ip池的代理ip进行尝试
    proxy_ips = get_from_MongoDB()
    for proxy_ip in proxy_ips:
        proxies = {"http": "http://" + proxy_ip["IP"], "https": "https://" + proxy_ip["IP"]}
        try:
            response_proxy = requests.get(url=url, headers=headers, proxies=proxies, timeout=5)
            if response_proxy.status_code == 200:
                return response_proxy.text
        except Exception:
            pass
    return "" # 若所有代理均不能成功访问,则返回空字符串

# 西刺代理
def xicidaili():
    page = 3 # 要爬取的页数
    ip_list = [] # 临时存储爬取下来的ip
    for p in range(page+1):
        url = "https://www.xicidaili.com/nn/" + str(p+1)
        html = get_html(url)
        if html != "":
            soup = BeautifulSoup(html, 'lxml')
            ips = soup.find_all('tr', class_='odd')
            for i in ips:
                tmp = i.find_all('td')
                ip = tmp[1].text + ':' + tmp[2].text
                ip_list.append(ip)
                print('线程{}爬取ip:{}'.format(threading.current_thread().name, ip))
            time.sleep(3)
        else:
            print('西刺代理获取失败!')
            break
    for item in ip_list:
        queue_lock.acquire()
        insert_to_MongoDB(item, 10)
        queue_lock.release()

# 快代理
def kuaidaili():
    page = 10 # 要爬取的页数
    ip_list = [] # 临时存储爬取下来的ip
    for p in range(page+1):
        url = "https://www.kuaidaili.com/free/inha/{}/".format(p+1)
        html = get_html(url)
        if html != "":
            soup = BeautifulSoup(html, 'lxml')
            ips = soup.select('td[data-title="IP"]')
            ports = soup.select('td[data-title="PORT"]')
            for i in range(len(ips)):
                ip = ips[i].text + ':' + ports[i].text
                ip_list.append(ip)
                print('线程{}爬取ip:{}'.format(threading.current_thread().name, ip))
            time.sleep(3)
        else:
            print('快代理获取失败!')
            break
    for item in ip_list:
        queue_lock.acquire()
        insert_to_MongoDB(item, 10)
        queue_lock.release()

3.测试模块

爬取下来的代理IP需要进行验证是否可用。由于需要检测的IP数量比较多,而每一次检测有几秒的等待时间,所以这里采用了aiohttp实现高并发的请求。aiohttp是基于asyncio实现的 HTTP 框架,专门用于异步处理 HTTP 的请求。而asyncio是 Python 3.4 版本引入的标准库,功能是实现单线程并发,使用协同执行 I/O 操作。

# 评分调整
def adjust_score(ip, myType):
    """
    验证成功的直接评分变为100,未验证成功的减1,评分为0的直接删除
    :param ip:
    :param type: 1 加分,-1 减分
    :return:
    """
    if myType == 1:
        query_ip = {"IP": ip}
        new_value = {"$set": {"Score": 100}}
        myCol.update_one(query_ip, new_value)
    elif myType == -1:
        query_ip = {"IP": ip}
        current_score = myCol.find_one(query_ip)["Score"]
        if current_score == 1:
            myCol.delete_one(query_ip)
        else:
            new_value = {"$set": {"Score": current_score-1}}
            myCol.update_one(query_ip, new_value)


async def ip_test(url, headers, proxy):
    test_proxy = "http://" + proxy
    conn = aiohttp.TCPConnector(ssl=False)
    async with aiohttp.ClientSession(connector=conn) as session:
        try:
            async with session.get(url=url, headers=headers, proxy=test_proxy) as resp:
                if resp.status == 200:
                    adjust_score(proxy, 1)
                else:
                    adjust_score(proxy, -1)
        except:
            adjust_score(proxy, -1)


# ip池测试
def pool_test():
    COUNTS = 100 # 每次测试100个ip
    ua = UserAgent()
    proxy_ips = list(get_from_MongoDB())
    test_url = "http://www.baidu.com" # 可替换为要爬取的网址
    for i in range(0, len(proxy_ips), COUNTS):
        tasks = [ip_test(test_url, {"User-Agent": ua.random}, proxy["IP"]) for proxy in proxy_ips[i:i+COUNTS]]
        loop = asyncio.get_event_loop()
        loop.run_until_complete(asyncio.wait(tasks))
        print("共{}个,已测试{}个".format(len(proxy_ips) + 1, COUNTS + i))
        time.sleep(5)

本程序对于代理IP是否可用的评分,是出自崔庆才老师的《Python 3网络爬虫开发实战》中《代理池的维护》这一节的思路。具体实现方法是对于测试可用的IP,直接评分为100;测试不可用的IP,评分减1,如果评分为0的就从数据库中删除;新爬取添加进库的,由于不可用的几率比较高,统一评分为10。从MongoDB中取出代理IP时,也是按评分从高到低的顺序进行提取。

4.主程序

每次爬取和检测完所有代理IP后,间隔5分钟再次执行爬取和检测,不断补充新的代理IP和淘汰不可用的代理IP。

from fake_useragent import UserAgent
import requests
from bs4 import BeautifulSoup
import pymongo
import threading
import aiohttp
import asyncio
import time


if __name__ == '__main__':
    # 连接MongoDB数据库
    myClient = pymongo.MongoClient("mongodb://localhost:27017/")
    myDB = myClient["IPpool"]
    myCol = myDB["pool"]

    # 伪装用户代理
    ua = UserAgent()

    # 间隔5分钟爬取和测试一次
    while 1:
        # 爬取模块线程
        queue_lock = threading.Lock()
        threads = []
        proxy_dict = {"kuaidaili": kuaidaili, "xicidaili": xicidaili}
        crawler_start(proxy_dict)

        # 测试模块线程
        print("代理ip爬取完毕,开始进行测试!")
        pool_test()
        print("测试完毕!")
        time.sleep(300)

其中爬虫多线程的启动封装在crawler_start()函数里

# 爬取代理ip线程启动
def crawler_start(proxy_dict):
    global threads
    for proxy in proxy_dict.keys():
        thread = threading.Thread(target=proxy_dict[proxy], name=proxy)
        thread.start()
        threads.append(thread)
    for t in threads:  # 等待所有线程完成
        t.join()

这个地方要给大家推荐一个很好用的库,fake_useragent,它包含了很多User-Agent,使用UserAgent().random可以给你提供一个随机选择的User-Agent,非常实用方便。本程序所有headers中的User-Agent都是用的这个。

三、完整代码

from fake_useragent import UserAgent
import requests
from bs4 import BeautifulSoup
import pymongo
import threading
import aiohttp
import asyncio
import time

# 存储
def insert_to_MongoDB(ip, score):
    if myCol.find_one({"IP": ip}) == None: #重复ip不存储
        myCol.insert_one({"IP": ip, "Score": score})

# 取出
def get_from_MongoDB(n = 0):
    """
    :param n: 要返回的ip个数,默认为0取出全部
    :return:
    """
    r = myCol.find().sort("Score", -1).limit(n)
    return r


# 获取页面源码
def get_html(url):
    headers = {"User-Agent": ua.random}
    try:
        response = requests.get(url=url, headers=headers, timeout=5)
        if response.status_code == 200:
            return response.text
    except Exception:
        pass # 获取源码失败
    # 如果不能访问,则使用ip池的代理ip进行尝试
    proxy_ips = get_from_MongoDB()
    for proxy_ip in proxy_ips:
        proxies = {"http": "http://" + proxy_ip["IP"], "https": "https://" + proxy_ip["IP"]}
        try:
            response_proxy = requests.get(url=url, headers=headers, proxies=proxies, timeout=5)
            if response_proxy.status_code == 200:
                return response_proxy.text
        except Exception:
            pass
    return "" # 若所有代理均不能成功访问,则返回空字符串

# 西刺代理
def xicidaili():
    page = 3 # 要爬取的页数
    ip_list = [] # 临时存储爬取下来的ip
    for p in range(page+1):
        url = "https://www.xicidaili.com/nn/" + str(p+1)
        html = get_html(url)
        if html != "":
            soup = BeautifulSoup(html, 'lxml')
            ips = soup.find_all('tr', class_='odd')
            for i in ips:
                tmp = i.find_all('td')
                ip = tmp[1].text + ':' + tmp[2].text
                ip_list.append(ip)
                print('线程{}爬取ip:{}'.format(threading.current_thread().name, ip))
            time.sleep(3)
        else:
            print('西刺代理获取失败!')
            break
    for item in ip_list:
        queue_lock.acquire()
        insert_to_MongoDB(item, 10)
        queue_lock.release()

# 快代理
def kuaidaili():
    page = 10 # 要爬取的页数
    ip_list = [] # 临时存储爬取下来的ip
    for p in range(page+1):
        url = "https://www.kuaidaili.com/free/inha/{}/".format(p+1)
        html = get_html(url)
        if html != "":
            soup = BeautifulSoup(html, 'lxml')
            ips = soup.select('td[data-title="IP"]')
            ports = soup.select('td[data-title="PORT"]')
            for i in range(len(ips)):
                ip = ips[i].text + ':' + ports[i].text
                ip_list.append(ip)
                print('线程{}爬取ip:{}'.format(threading.current_thread().name, ip))
            time.sleep(3)
        else:
            print('快代理获取失败!')
            break
    for item in ip_list:
        queue_lock.acquire()
        insert_to_MongoDB(item, 10)
        queue_lock.release()


# 评分调整
def adjust_score(ip, myType):
    """
    验证成功的直接评分变为100,未验证成功的减1,评分为0的直接删除
    :param ip:
    :param type: 1 加分,-1 减分
    :return:
    """
    if myType == 1:
        query_ip = {"IP": ip}
        new_value = {"$set": {"Score": 100}}
        myCol.update_one(query_ip, new_value)
    elif myType == -1:
        query_ip = {"IP": ip}
        current_score = myCol.find_one(query_ip)["Score"]
        if current_score == 1:
            myCol.delete_one(query_ip)
        else:
            new_value = {"$set": {"Score": current_score-1}}
            myCol.update_one(query_ip, new_value)
           
async def ip_test(url, headers, proxy):
    test_proxy = "http://" + proxy
    conn = aiohttp.TCPConnector(ssl=False)
    async with aiohttp.ClientSession(connector=conn) as session:
        try:
            async with session.get(url=url, headers=headers, proxy=test_proxy) as resp:
                if resp.status == 200:
                    adjust_score(proxy, 1)
                else:
                    adjust_score(proxy, -1)
        except:
            adjust_score(proxy, -1)

# ip池测试
def pool_test():
    COUNTS = 100 # 每次测试100个ip
    ua = UserAgent()
    proxy_ips = list(get_from_MongoDB())
    test_url = "http://www.baidu.com" # 可替换为要爬取的网址
    for i in range(0, len(proxy_ips), COUNTS):
        tasks = [ip_test(test_url, {"User-Agent": ua.random}, proxy["IP"]) for proxy in proxy_ips[i:i+COUNTS]]
        loop = asyncio.get_event_loop()
        loop.run_until_complete(asyncio.wait(tasks))
        print("共{}个,已测试{}个".format(len(proxy_ips) + 1, COUNTS + i))
        time.sleep(5)


# 爬取代理ip线程启动
def crawler_start(proxy_dict):
    global threads
    for proxy in proxy_dict.keys():
        thread = threading.Thread(target=proxy_dict[proxy], name=proxy)
        thread.start()
        threads.append(thread)
    for t in threads:  # 等待所有线程完成
        t.join()


if __name__ == '__main__':
    # 连接MongoDB数据库
    myClient = pymongo.MongoClient("mongodb://localhost:27017/")
    myDB = myClient["IPpool"]
    myCol = myDB["pool"]

    # 伪装用户代理
    ua = UserAgent()

    # 间隔5分钟爬取和测试一次
    while 1:
        # 爬取模块线程
        queue_lock = threading.Lock()
        threads = []
        proxy_dict = {"kuaidaili": kuaidaili, "xicidaili": xicidaili}
        crawler_start(proxy_dict)

        # 测试模块线程
        print("代理ip爬取完毕,开始进行测试!")
        pool_test()
        print("测试完毕!")
        time.sleep(300)

程序跑起来后,其他需要用到代理IP的爬虫程序,就可以从MongoDB数据库里选取代理IP了,实测还是挺好用的。

参考文献:
[1]崔庆才.《Python 3网络爬虫开发实战》.人民邮电出版社,2018.04