简易代理IP池的搭建
一、导论
这段时间在学习代理的相关知识。在爬虫的过程中,经常会遇到目标网站对同一IP的访问频率设置了限制,而设置代理是应对反爬虫的重要有效手段。目前互联网上也有不少免费的代理网站,比如西刺代理、快代理、66ip等等。但是笔者在使用过程中,发现几个问题:一是免费代理确实可用率太低;二是免费代理网站本身也部署了反爬虫的措施,并且对于部分网站比如西刺代理,使用该网站自身的高匿代理也无法进行访问,应该是设置了屏蔽。于是乎笔者谋生了自己搭建一个代理IP池的想法。
二、程序结构
本程序属于简易的IP代理池搭建,适合小规模的爬虫。整个程序分为三个模块:爬取代理模块、存储模块、测试模块。
1.存储模块
本程序存储代理IP的数据库选用的MongoDB,MongoDB是一个非关系型数据库,存取速度都很快。Python中操作MongoDB的第三方模块是pymongo。存储模块包括了存储IP和取出IP两个函数。
# 存储
def insert_to_MongoDB(ip, score):
if myCol.find_one({"IP": ip}) == None: #重复ip不存储
myCol.insert_one({"IP": ip, "Score": score})
# 取出
def get_from_MongoDB(n = 0):
"""
:param n: 要返回的ip个数,默认为0取出全部
:return:
"""
r = myCol.find().sort("Score", -1).limit(n)
return r
2.爬取代理模块
本模块是整个程序的核心,用于在多个代理网站爬取代理IP。这里只有西刺代理和快代理两个代理网站,可以根据需求扩展更多的代理网站。
在获取页面源代码函数(get_html)函数中可以看到,对于访问免费代理网站也被封IP的情况,采取了使用IP池已有的高匿代理进行尝试的措施。因为现有的免费代理网站,根据笔者的观察基本上都对同一IP的访问次数进行了限制,采取这种措施也是提高了代理IP的获取成功率。
各代理网站的爬取,是使用多线程并发运行,在后续的主程序中将可以看到。
# 获取页面源码
def get_html(url):
headers = {"User-Agent": ua.random}
try:
response = requests.get(url=url, headers=headers, timeout=5)
if response.status_code == 200:
return response.text
except Exception:
pass # 获取源码失败
# 如果不能访问,则使用ip池的代理ip进行尝试
proxy_ips = get_from_MongoDB()
for proxy_ip in proxy_ips:
proxies = {"http": "http://" + proxy_ip["IP"], "https": "https://" + proxy_ip["IP"]}
try:
response_proxy = requests.get(url=url, headers=headers, proxies=proxies, timeout=5)
if response_proxy.status_code == 200:
return response_proxy.text
except Exception:
pass
return "" # 若所有代理均不能成功访问,则返回空字符串
# 西刺代理
def xicidaili():
page = 3 # 要爬取的页数
ip_list = [] # 临时存储爬取下来的ip
for p in range(page+1):
url = "https://www.xicidaili.com/nn/" + str(p+1)
html = get_html(url)
if html != "":
soup = BeautifulSoup(html, 'lxml')
ips = soup.find_all('tr', class_='odd')
for i in ips:
tmp = i.find_all('td')
ip = tmp[1].text + ':' + tmp[2].text
ip_list.append(ip)
print('线程{}爬取ip:{}'.format(threading.current_thread().name, ip))
time.sleep(3)
else:
print('西刺代理获取失败!')
break
for item in ip_list:
queue_lock.acquire()
insert_to_MongoDB(item, 10)
queue_lock.release()
# 快代理
def kuaidaili():
page = 10 # 要爬取的页数
ip_list = [] # 临时存储爬取下来的ip
for p in range(page+1):
url = "https://www.kuaidaili.com/free/inha/{}/".format(p+1)
html = get_html(url)
if html != "":
soup = BeautifulSoup(html, 'lxml')
ips = soup.select('td[data-title="IP"]')
ports = soup.select('td[data-title="PORT"]')
for i in range(len(ips)):
ip = ips[i].text + ':' + ports[i].text
ip_list.append(ip)
print('线程{}爬取ip:{}'.format(threading.current_thread().name, ip))
time.sleep(3)
else:
print('快代理获取失败!')
break
for item in ip_list:
queue_lock.acquire()
insert_to_MongoDB(item, 10)
queue_lock.release()
3.测试模块
爬取下来的代理IP需要进行验证是否可用。由于需要检测的IP数量比较多,而每一次检测有几秒的等待时间,所以这里采用了aiohttp实现高并发的请求。aiohttp是基于asyncio实现的 HTTP 框架,专门用于异步处理 HTTP 的请求。而asyncio是 Python 3.4 版本引入的标准库,功能是实现单线程并发,使用协同执行 I/O 操作。
# 评分调整
def adjust_score(ip, myType):
"""
验证成功的直接评分变为100,未验证成功的减1,评分为0的直接删除
:param ip:
:param type: 1 加分,-1 减分
:return:
"""
if myType == 1:
query_ip = {"IP": ip}
new_value = {"$set": {"Score": 100}}
myCol.update_one(query_ip, new_value)
elif myType == -1:
query_ip = {"IP": ip}
current_score = myCol.find_one(query_ip)["Score"]
if current_score == 1:
myCol.delete_one(query_ip)
else:
new_value = {"$set": {"Score": current_score-1}}
myCol.update_one(query_ip, new_value)
async def ip_test(url, headers, proxy):
test_proxy = "http://" + proxy
conn = aiohttp.TCPConnector(ssl=False)
async with aiohttp.ClientSession(connector=conn) as session:
try:
async with session.get(url=url, headers=headers, proxy=test_proxy) as resp:
if resp.status == 200:
adjust_score(proxy, 1)
else:
adjust_score(proxy, -1)
except:
adjust_score(proxy, -1)
# ip池测试
def pool_test():
COUNTS = 100 # 每次测试100个ip
ua = UserAgent()
proxy_ips = list(get_from_MongoDB())
test_url = "http://www.baidu.com" # 可替换为要爬取的网址
for i in range(0, len(proxy_ips), COUNTS):
tasks = [ip_test(test_url, {"User-Agent": ua.random}, proxy["IP"]) for proxy in proxy_ips[i:i+COUNTS]]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
print("共{}个,已测试{}个".format(len(proxy_ips) + 1, COUNTS + i))
time.sleep(5)
本程序对于代理IP是否可用的评分,是出自崔庆才老师的《Python 3网络爬虫开发实战》中《代理池的维护》这一节的思路。具体实现方法是对于测试可用的IP,直接评分为100;测试不可用的IP,评分减1,如果评分为0的就从数据库中删除;新爬取添加进库的,由于不可用的几率比较高,统一评分为10。从MongoDB中取出代理IP时,也是按评分从高到低的顺序进行提取。
4.主程序
每次爬取和检测完所有代理IP后,间隔5分钟再次执行爬取和检测,不断补充新的代理IP和淘汰不可用的代理IP。
from fake_useragent import UserAgent
import requests
from bs4 import BeautifulSoup
import pymongo
import threading
import aiohttp
import asyncio
import time
if __name__ == '__main__':
# 连接MongoDB数据库
myClient = pymongo.MongoClient("mongodb://localhost:27017/")
myDB = myClient["IPpool"]
myCol = myDB["pool"]
# 伪装用户代理
ua = UserAgent()
# 间隔5分钟爬取和测试一次
while 1:
# 爬取模块线程
queue_lock = threading.Lock()
threads = []
proxy_dict = {"kuaidaili": kuaidaili, "xicidaili": xicidaili}
crawler_start(proxy_dict)
# 测试模块线程
print("代理ip爬取完毕,开始进行测试!")
pool_test()
print("测试完毕!")
time.sleep(300)
其中爬虫多线程的启动封装在crawler_start()函数里
# 爬取代理ip线程启动
def crawler_start(proxy_dict):
global threads
for proxy in proxy_dict.keys():
thread = threading.Thread(target=proxy_dict[proxy], name=proxy)
thread.start()
threads.append(thread)
for t in threads: # 等待所有线程完成
t.join()
这个地方要给大家推荐一个很好用的库,fake_useragent,它包含了很多User-Agent,使用UserAgent().random可以给你提供一个随机选择的User-Agent,非常实用方便。本程序所有headers中的User-Agent都是用的这个。
三、完整代码
from fake_useragent import UserAgent
import requests
from bs4 import BeautifulSoup
import pymongo
import threading
import aiohttp
import asyncio
import time
# 存储
def insert_to_MongoDB(ip, score):
if myCol.find_one({"IP": ip}) == None: #重复ip不存储
myCol.insert_one({"IP": ip, "Score": score})
# 取出
def get_from_MongoDB(n = 0):
"""
:param n: 要返回的ip个数,默认为0取出全部
:return:
"""
r = myCol.find().sort("Score", -1).limit(n)
return r
# 获取页面源码
def get_html(url):
headers = {"User-Agent": ua.random}
try:
response = requests.get(url=url, headers=headers, timeout=5)
if response.status_code == 200:
return response.text
except Exception:
pass # 获取源码失败
# 如果不能访问,则使用ip池的代理ip进行尝试
proxy_ips = get_from_MongoDB()
for proxy_ip in proxy_ips:
proxies = {"http": "http://" + proxy_ip["IP"], "https": "https://" + proxy_ip["IP"]}
try:
response_proxy = requests.get(url=url, headers=headers, proxies=proxies, timeout=5)
if response_proxy.status_code == 200:
return response_proxy.text
except Exception:
pass
return "" # 若所有代理均不能成功访问,则返回空字符串
# 西刺代理
def xicidaili():
page = 3 # 要爬取的页数
ip_list = [] # 临时存储爬取下来的ip
for p in range(page+1):
url = "https://www.xicidaili.com/nn/" + str(p+1)
html = get_html(url)
if html != "":
soup = BeautifulSoup(html, 'lxml')
ips = soup.find_all('tr', class_='odd')
for i in ips:
tmp = i.find_all('td')
ip = tmp[1].text + ':' + tmp[2].text
ip_list.append(ip)
print('线程{}爬取ip:{}'.format(threading.current_thread().name, ip))
time.sleep(3)
else:
print('西刺代理获取失败!')
break
for item in ip_list:
queue_lock.acquire()
insert_to_MongoDB(item, 10)
queue_lock.release()
# 快代理
def kuaidaili():
page = 10 # 要爬取的页数
ip_list = [] # 临时存储爬取下来的ip
for p in range(page+1):
url = "https://www.kuaidaili.com/free/inha/{}/".format(p+1)
html = get_html(url)
if html != "":
soup = BeautifulSoup(html, 'lxml')
ips = soup.select('td[data-title="IP"]')
ports = soup.select('td[data-title="PORT"]')
for i in range(len(ips)):
ip = ips[i].text + ':' + ports[i].text
ip_list.append(ip)
print('线程{}爬取ip:{}'.format(threading.current_thread().name, ip))
time.sleep(3)
else:
print('快代理获取失败!')
break
for item in ip_list:
queue_lock.acquire()
insert_to_MongoDB(item, 10)
queue_lock.release()
# 评分调整
def adjust_score(ip, myType):
"""
验证成功的直接评分变为100,未验证成功的减1,评分为0的直接删除
:param ip:
:param type: 1 加分,-1 减分
:return:
"""
if myType == 1:
query_ip = {"IP": ip}
new_value = {"$set": {"Score": 100}}
myCol.update_one(query_ip, new_value)
elif myType == -1:
query_ip = {"IP": ip}
current_score = myCol.find_one(query_ip)["Score"]
if current_score == 1:
myCol.delete_one(query_ip)
else:
new_value = {"$set": {"Score": current_score-1}}
myCol.update_one(query_ip, new_value)
async def ip_test(url, headers, proxy):
test_proxy = "http://" + proxy
conn = aiohttp.TCPConnector(ssl=False)
async with aiohttp.ClientSession(connector=conn) as session:
try:
async with session.get(url=url, headers=headers, proxy=test_proxy) as resp:
if resp.status == 200:
adjust_score(proxy, 1)
else:
adjust_score(proxy, -1)
except:
adjust_score(proxy, -1)
# ip池测试
def pool_test():
COUNTS = 100 # 每次测试100个ip
ua = UserAgent()
proxy_ips = list(get_from_MongoDB())
test_url = "http://www.baidu.com" # 可替换为要爬取的网址
for i in range(0, len(proxy_ips), COUNTS):
tasks = [ip_test(test_url, {"User-Agent": ua.random}, proxy["IP"]) for proxy in proxy_ips[i:i+COUNTS]]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
print("共{}个,已测试{}个".format(len(proxy_ips) + 1, COUNTS + i))
time.sleep(5)
# 爬取代理ip线程启动
def crawler_start(proxy_dict):
global threads
for proxy in proxy_dict.keys():
thread = threading.Thread(target=proxy_dict[proxy], name=proxy)
thread.start()
threads.append(thread)
for t in threads: # 等待所有线程完成
t.join()
if __name__ == '__main__':
# 连接MongoDB数据库
myClient = pymongo.MongoClient("mongodb://localhost:27017/")
myDB = myClient["IPpool"]
myCol = myDB["pool"]
# 伪装用户代理
ua = UserAgent()
# 间隔5分钟爬取和测试一次
while 1:
# 爬取模块线程
queue_lock = threading.Lock()
threads = []
proxy_dict = {"kuaidaili": kuaidaili, "xicidaili": xicidaili}
crawler_start(proxy_dict)
# 测试模块线程
print("代理ip爬取完毕,开始进行测试!")
pool_test()
print("测试完毕!")
time.sleep(300)
程序跑起来后,其他需要用到代理IP的爬虫程序,就可以从MongoDB数据库里选取代理IP了,实测还是挺好用的。
参考文献:
[1]崔庆才.《Python 3网络爬虫开发实战》.人民邮电出版社,2018.04
上一篇: Java常用工具类练习题
下一篇: Python入门小案例