爬虫实战:IP代理池
任务描述:
爬取西刺代理、云代理、快代理等网站的免费代理IP,并实现入库、验证和接口功能
任务拆解:
- 发送网络请求并解析数据,获取免费代理IP,并入库
- 利用redis中的有序集合存储数据,可实现根据优先级获取代理IP
- 验证代理IP的有效性,并相应的修改代理IP的优先级
- 创建flask接口
1 组件概览
1)spider: 发送网络请求及解析数据
2)proxies_client: 与redis的交互
3)proxies_downloader: 代理IP的下载和入库
4)proxies_verify: 验证代理IP的有效性
5)proxies_api : flask接口
2 spider组件
1)base.py文件
base.py封装了基本的发送请求的函数以及一些参数配置。是其他爬虫类的基类。为了防止数据入库的时候字段写错,这里还借助了scrapy.Item的功能。
import requests
import scrapy
import logging
from requests.exceptions import HTTPError
import time
logging.basicConfig(level=logging.INFO,
format='%(asctime)s %(threadName)s %(levelname)s %(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
class ProxyItem(scrapy.Item):
scheme = scrapy.Field()
proxy = scrapy.Field()
class BaseSpider(object):
encoding = 'utf-8'
base_url = ''
headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.6; rv:25.0) Gecko/20100101 Firefox/25.0',
}
page = 1
count = 0
def __init__(self):
msg = "【访问网页】 :爬虫【{}】正在下载网页信息".format(self.__class__)
logging.info(msg)
@property
def start_urls(self):
for i in range(1,self.page+1):
yield self.base_url.format(i)
def get_response(self,url):
time.sleep(0.5)
response = requests.get(url,headers=self.headers)
return response.content.decode(self.encoding)
def parse(self,response):
yield None
@property
def proxies(self):
for url in self.start_urls:
logging.info('【访问网页】 : 正在从网页【{}】下载数据'.format(url))
response = self.get_response(url)
for item in self.parse(response):
yield item
2)kuaidaili.py文件
从https://www.kuaidaili.com/ 中爬取免费代理IP并解析数据
from proxies_pool.spider.base import BaseSpider, ProxyItem
from lxml import etree
page = 5
class KuaiSpider(BaseSpider):
def __init__(self):
super(self.__class__,self).__init__()
self.base_url = 'https://www.kuaidaili.com/free/inha/{}/'
self.page = page
def parse(self, response):
html = etree.HTML(response)
trs = html.xpath('.//*[@id="list"]/table/tbody/tr')
for tr in trs:
item = ProxyItem()
item['scheme'] = tr.xpath('./td[4]/text()')[0].lower()
item['proxy'] = item['scheme'] + "://" + tr.xpath('./td[1]/text()')[0] + ":" + \
tr.xpath('./td[2]/text()')[0]
self.count += 1
yield item
if __name__ == '__main__':
spider = KuaiSpider()
num = 0
for item in spider.proxies:
print(item)
print(num)
3)xicidaili.py文件
从https://www.xicidaili.com/ 中爬取免费代理IP并解析数据
from proxies_pool.spider.base import BaseSpider, ProxyItem
from lxml import etree
page = 2
class XiciSpider(BaseSpider):
def __init__(self):
super(self.__class__, self).__init__()
self.base_url = 'https://www.xicidaili.com/nn/{}'
self.page = page
def parse(self,response):
html = etree.HTML(response)
trs = html.xpath('.//table[@id="ip_list"]/tr')
for tr in trs[1:]:
item = ProxyItem()
item['scheme'] = tr.xpath('./td[6]/text()')[0].lower()
item['proxy'] = item['scheme'] + "://" + tr.xpath('./td[2]/text()')[0] + ":" + tr.xpath('./td[3]/text()')[0]
self.count += 1
yield item
if __name__ == '__main__':
spider = XiciSpider()
for item in spider.proxies:
print(item)
print(spider.count)
4)yundaili.py文件
从http://www.ip3366.net/ 中爬取免费代理IP并解析数据
from proxies_pool.spider.base import BaseSpider, ProxyItem
from lxml import etree
page = 5
class YunSpider(BaseSpider):
def __init__(self):
super(self.__class__, self).__init__()
self.base_url = 'http://www.ip3366.net/free/?stype=1&page={}'
self.page = page
self.encoding = 'gbk'
def parse(self, response):
html = etree.HTML(response)
trs = html.xpath('.//*[@id="list"]/table/tbody/tr')
for tr in trs:
item = ProxyItem()
item['scheme'] = tr.xpath('./td[4]/text()')[0].lower()
item['proxy'] = item['scheme'] + "://" + tr.xpath('./td[1]/text()')[0] + ":" + \
tr.xpath('./td[2]/text()')[0]
self.count += 1
yield item
if __name__ == '__main__':
spider = YunSpider()
for item in spider.proxies:
print(item)
print(spider.count)
3 proxies_client.py组件
与client的交互,实现包括代理IP的增加和删除,score初始化,score修改以及查询等功能。
代理IP的score的修改规则:新增代理IP的score初始化为10,每访问成功一次,score调整为100,每访问失败一次,score减1,当score减到小于0时,则从数据库中删除。
'''代理IP出库和入库的规则(Redis)'''
from redis import StrictRedis
from proxies_pool.setting import REDIS_DB, REDIS_HOST, REDIS_PORT
import logging
import random
logging.basicConfig(level=logging.INFO,
format='%(asctime)s %(threadName)s %(levelname)s %(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
# 分数规则设置
max_score = 100
initial_score = 10
min_score = 0
class EmptyPool(Exception):
pass
class ProxiesClient(object):
def __init__(self):
self.client = StrictRedis(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB, decode_responses=True)
def add_new(self, key, proxy):
'''
将爬取的代理IP入库。若添加成功返回True,如代理IP已经存在,则返回False
key: http或https,将不同协议的地址存入到数据库不同的键中
proxy: 待添加的代理IP
'''
key = key.lower()
if not self.client.zscore(key, proxy):
self.client.zadd(key,{proxy: initial_score})
msg = "【新增IP】:IP地址为【{}】,并将score初始化为【{}】".format(proxy,initial_score)
logging.info(msg)
return True
else:
return False
def max_score(self, key, proxy):
'''
当代理IP访问成功时,将score设置为最大值。
'''
key = key.lower()
self.client.zadd(key,{proxy:max_score})
msg = "【有效IP】:代理IP【{}】访问成功,将score设置为【{}】".format(proxy,max_score)
logging.info(msg)
def reduce_score(self, key, proxy):
'''
当代理IP访问不成功时,将score在原基础上减1
'''
key = key.lower()
new_score = self.client.zincrby(key,-1,proxy)
if new_score >=0:
msg = "【失效IP】:代理IP【{}】访问失败,score从【{}】减为【{}】".format(proxy,new_score+1,new_score)
else:
self.client.zrem(key,proxy)
msg = "【失效IP】:代理IP【{}】访问失败,score减为【{}】,从数据库中移除".format(proxy,min_score)
logging.info(msg)
def random_proxy(self, key):
'''
从数据库中随机获取一个代理IP地址。
规则:先取出分数为100的代理IP,如果不存在,则从分数排名前20的序列中随机返回一个
'''
target_list = self.client.zrangebyscore(key,max_score,max_score)
if not target_list:
target_list = self.client.zrevrange(key,0,20)
if not target_list:
raise EmptyPool('pool is empty')
return random.choice(target_list)
def get_score(self, proxy):
'''获取proxy的分数'''
key = proxy.split(':',1)[0].lower()
score = self.client.zscore(key,proxy)
return score
@property
def http_proxies(self):
'''
获取全部http协议的代理IP
'''
if self.http_count:
return self.client.zrange('http',0,-1)
else:
raise EmptyPool('当前数据库没有http代理IP')
@property
def https_proxies(self):
'''
获取全部https协议的代理IP
'''
if self.https_count:
return self.client.zrange('https', 0, -1)
else:
raise EmptyPool('当前数据库没有https代理IP')
@property
def all_proxies(self):
'''
获取全部代理IP
'''
if self.all_count:
proxies = self.client.zrange('http',0,-1) + self.client.zrange('https',0,-1)
return proxies
else:
raise EmptyPool('当前数据库没有代理IP')
@property
def http_count(self):
'''获取http代理IP的个数'''
number = self.client.zcard('http')
return number
@property
def https_count(self):
'''获取https代理IP的个数'''
number = self.client.zcard('https')
return number
@property
def all_count(self):
'''获取代理IP的个数'''
return self.http_count+self.https_count
if __name__ == '__main__':
client = ProxiesClient()
print(client.all_proxies)
4 proxies_downloader.py组件
实现代理IP的下载及入库初始化的功能。这里借用了动态导包导入模块。模块的名称都放在setting.py文件中的SPIDER_LIST中。
"""实现代理IP的下载及入库功能"""
from setting import SPIDER_LIST
import importlib
from proxies_pool.proxies_client import ProxiesClient
from threading import Thread,Lock
class ProxiesDownloader(object):
def __init__(self):
self.client = ProxiesClient()
self._original_proxy_num = self.client.all_count
self._count = 0
self._lock = Lock()
@property
def _spiders(self):
spider_list = []
for spider_name in SPIDER_LIST:
module_name = spider_name.rsplit(".", 1)[0]
class_name = spider_name.rsplit('.', 1)[1]
module = importlib.import_module(module_name)
spider = getattr(module, class_name)
spider_list.append(spider())
return spider_list
def download(self):
th_list = []
for spider in self._spiders:
th = Thread(target=self._download,args=(spider,))
th_list.append(th)
th.setDaemon(True)
th.start()
for th in th_list:
th.join()
def _download(self,spider):
try:
for item in spider.proxies:
self.client.add_new(key=item['scheme'], proxy=item['proxy'])
self._lock.acquire()
self._count += 1
self._lock.release()
except Exception as e:
print(e)
@property
def download_count(self):
return self._count
@property
def new_add_count(self):
return self.client.all_count-self._original_proxy_num
if __name__ == '__main__':
downloader = ProxiesDownloader()
downloader.download()
print('*'*100)
print('下载完毕')
print('共爬取了【{}】个代理IP'.format(downloader.download_count))
print('新入库了【{}】个代理IP'.format(downloader.new_add_count))
5 proxies_verify.py组件
验证代理IP的有效性。http的验证地址为“http://www.baidu.com”; https的验证地址为“https://www.baidu.com”
由于验证代理IP比较耗时间,因此用协程运行。
"""验证代理IP的有效性"""
from gevent import pool,monkey
monkey.patch_all()
import requests
import time
from fake_useragent import UserAgent
from proxies_pool.proxies_client import ProxiesClient
from queue import Queue
from requests.exceptions import RequestException
import logging
logging.basicConfig(level=logging.INFO,
format='%(asctime)s %(threadName)s %(levelname)s : %(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
class ProxiesVerify(object):
def __init__(self):
self.client = ProxiesClient()
self.user_agent = UserAgent()
self.base_url = '{}://www.baidu.com/'
self.proxies_queue = Queue()
self.pool = pool.Pool()
def _send_response(self):
time.sleep(1)
proxy = self.proxies_queue.get()
scheme = proxy.split(':',1)[0]
proxies = {
scheme : proxy
}
test_url = self.base_url.format(scheme)
headers = {'User_Agent':self.user_agent.random}
try:
response = requests.get(test_url,headers=headers,proxies=proxies,timeout=5)
if response.status_code == 200:
self.client.max_score(scheme,proxy)
else:
self.client.reduce_score(scheme,proxy)
logging.info('【请求异常】:异常原因【请求代码异常】')
except RequestException as e:
msg = '【请求异常】,异常原因【{}】'.format(e.args[0])
logging.info(msg)
self.client.reduce_score(scheme,proxy)
finally:
self.proxies_queue.task_done()
def _proxies_queue(self):
for proxy in self.client.all_proxies:
self.proxies_queue.put(proxy)
def verify(self):
self.pool.apply_async(self._proxies_queue)
time.sleep(3)
for i in range(20):
self.pool.apply_async(self._send_response,callback=self._test_task)
self.proxies_queue.join()
def _test_task(self,item):
self.pool.apply_async(self._send_response,callback=self._test_task)
if __name__ == '__main__':
verify = ProxiesVerify()
verify.verify()
6 proxies_api.py组件
网络接口
"""网络接口访问"""
from flask import Flask
from proxies_pool.setting import FLASK_API_PORT,FLASK_API_HOST
from proxies_pool.proxies_client import ProxiesClient
import json
host = FLASK_API_HOST
port = FLASK_API_PORT
class ProxiesApi(object):
def __init__(self):
self.app = Flask(__name__)
self.client = ProxiesClient()
@self.app.route('/http')
def http_proxies():
return json.dumps(self.client.http_proxies)
@self.app.route('/https')
def https_proxies():
return json.dumps(self.client.https_proxies)
@self.app.route('/http/random')
def http_random():
return json.dumps(self.client.random_proxy('http'))
@self.app.route('/https/random')
def https_random():
return json.dumps(self.client.random_proxy('https'))
@self.app.route('/')
def homepage():
return 'Welcome to proxies pool'
def api(self):
self.app.run(host=host,port=port)
if __name__ == '__main__':
app = ProxiesApi()
app.api()
7 setting.py组件
# redis配置
REDIS_DB = 1
REDIS_HOST = 'localhost'
REDIS_PORT = '6379'
# spider_name
SPIDER_LIST = [
'spider.kuaidaili.KuaiSpider',
'spider.xicidaili.XiciSpider',
'spider.yundaili.YunSpider'
]
# flask 接口
FLASK_API_HOST = '127.1.1.1'
FLASK_API_PORT = '8080'
8 结果展示
1 下载代理IP并入库
2 验证代理IP有效性
3 网络接口