欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

爬虫实战:IP代理池

程序员文章站 2022-05-30 22:49:04
...

任务描述:

爬取西刺代理、云代理、快代理等网站的免费代理IP,并实现入库、验证和接口功能

任务拆解:

  1. 发送网络请求并解析数据,获取免费代理IP,并入库
  2. 利用redis中的有序集合存储数据,可实现根据优先级获取代理IP
  3. 验证代理IP的有效性,并相应的修改代理IP的优先级
  4. 创建flask接口

1 组件概览

1)spider: 发送网络请求及解析数据
2)proxies_client: 与redis的交互
3)proxies_downloader: 代理IP的下载和入库
4)proxies_verify: 验证代理IP的有效性
5)proxies_api : flask接口
爬虫实战:IP代理池

2 spider组件

1)base.py文件
base.py封装了基本的发送请求的函数以及一些参数配置。是其他爬虫类的基类。为了防止数据入库的时候字段写错,这里还借助了scrapy.Item的功能。

import requests
import scrapy
import logging
from requests.exceptions import HTTPError
import time

logging.basicConfig(level=logging.INFO,
                format='%(asctime)s %(threadName)s %(levelname)s %(message)s',
                datefmt='%Y-%m-%d %H:%M:%S')

class ProxyItem(scrapy.Item):
    scheme = scrapy.Field()
    proxy = scrapy.Field()

class BaseSpider(object):
    encoding = 'utf-8'
    base_url = ''
    headers = {
        'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.6; rv:25.0) Gecko/20100101 Firefox/25.0', 
    }
    page = 1
    count = 0

    def __init__(self):
        msg = "【访问网页】 :爬虫【{}】正在下载网页信息".format(self.__class__)
        logging.info(msg)

    @property
    def start_urls(self):
        for i in range(1,self.page+1):
            yield self.base_url.format(i)

    def get_response(self,url):
        time.sleep(0.5)
        response = requests.get(url,headers=self.headers)
        return response.content.decode(self.encoding)

    def parse(self,response):
        yield None

    @property
    def proxies(self):
        for url in self.start_urls:
            logging.info('【访问网页】 : 正在从网页【{}】下载数据'.format(url))
            response = self.get_response(url)
            for item in self.parse(response):
                yield item

2)kuaidaili.py文件
从https://www.kuaidaili.com/ 中爬取免费代理IP并解析数据

from proxies_pool.spider.base import BaseSpider, ProxyItem
from lxml import etree

page = 5

class KuaiSpider(BaseSpider):
    def __init__(self):
        super(self.__class__,self).__init__()
        self.base_url = 'https://www.kuaidaili.com/free/inha/{}/'
        self.page = page
        
    def parse(self, response):
        html = etree.HTML(response)
        trs = html.xpath('.//*[@id="list"]/table/tbody/tr')
        for tr in trs:
            item = ProxyItem()
            item['scheme'] = tr.xpath('./td[4]/text()')[0].lower()
            item['proxy'] = item['scheme'] + "://" + tr.xpath('./td[1]/text()')[0] + ":" + \
                            tr.xpath('./td[2]/text()')[0]
            self.count += 1
            yield item

if __name__ == '__main__':
    spider = KuaiSpider()
    num = 0
    for item in spider.proxies:
        print(item)
    print(num)

3)xicidaili.py文件
从https://www.xicidaili.com/ 中爬取免费代理IP并解析数据

from proxies_pool.spider.base import BaseSpider, ProxyItem
from lxml import etree

page = 2

class XiciSpider(BaseSpider):
    def __init__(self):
        super(self.__class__, self).__init__()
        self.base_url = 'https://www.xicidaili.com/nn/{}'
        self.page = page

    def parse(self,response):
        html = etree.HTML(response)
        trs = html.xpath('.//table[@id="ip_list"]/tr')
        for tr in trs[1:]:
            item = ProxyItem()
            item['scheme'] = tr.xpath('./td[6]/text()')[0].lower()
            item['proxy'] = item['scheme'] + "://" + tr.xpath('./td[2]/text()')[0] + ":" + tr.xpath('./td[3]/text()')[0]
            self.count += 1
            yield item

if __name__ == '__main__':
    spider = XiciSpider()
    for item in spider.proxies:
        print(item)
    print(spider.count)

4)yundaili.py文件
从http://www.ip3366.net/ 中爬取免费代理IP并解析数据

from proxies_pool.spider.base import BaseSpider, ProxyItem
from lxml import etree

page = 5

class YunSpider(BaseSpider):
    def __init__(self):
        super(self.__class__, self).__init__()
        self.base_url = 'http://www.ip3366.net/free/?stype=1&page={}'
        self.page = page
        self.encoding = 'gbk'

    def parse(self, response):
        html = etree.HTML(response)
        trs = html.xpath('.//*[@id="list"]/table/tbody/tr')
        for tr in trs:
            item = ProxyItem()
            item['scheme'] = tr.xpath('./td[4]/text()')[0].lower()
            item['proxy'] = item['scheme'] + "://" + tr.xpath('./td[1]/text()')[0] + ":" + \
                            tr.xpath('./td[2]/text()')[0]
            self.count += 1
            yield item

if __name__ == '__main__':
    spider = YunSpider()
    for item in spider.proxies:
        print(item)
    print(spider.count)

3 proxies_client.py组件

与client的交互,实现包括代理IP的增加和删除,score初始化,score修改以及查询等功能。

代理IP的score的修改规则:新增代理IP的score初始化为10,每访问成功一次,score调整为100,每访问失败一次,score减1,当score减到小于0时,则从数据库中删除。

'''代理IP出库和入库的规则(Redis)'''
from redis import StrictRedis
from proxies_pool.setting import REDIS_DB, REDIS_HOST, REDIS_PORT
import logging
import random

logging.basicConfig(level=logging.INFO,
                format='%(asctime)s %(threadName)s %(levelname)s %(message)s',
                datefmt='%Y-%m-%d %H:%M:%S')

# 分数规则设置
max_score = 100
initial_score = 10
min_score = 0

class EmptyPool(Exception):
    pass

class ProxiesClient(object):
    def __init__(self):
        self.client = StrictRedis(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB, decode_responses=True)

    def add_new(self, key, proxy):
        '''
        将爬取的代理IP入库。若添加成功返回True,如代理IP已经存在,则返回False
        key: http或https,将不同协议的地址存入到数据库不同的键中
        proxy: 待添加的代理IP
        '''
        key = key.lower()
        if not self.client.zscore(key, proxy):
            self.client.zadd(key,{proxy: initial_score})
            msg = "【新增IP】:IP地址为【{}】,并将score初始化为【{}】".format(proxy,initial_score)
            logging.info(msg)
            return True
        else:
            return False

    def max_score(self, key, proxy):
        '''
        当代理IP访问成功时,将score设置为最大值。
        '''
        key = key.lower()
        self.client.zadd(key,{proxy:max_score})
        msg = "【有效IP】:代理IP【{}】访问成功,将score设置为【{}】".format(proxy,max_score)
        logging.info(msg)

    def reduce_score(self, key, proxy):
        '''
        当代理IP访问不成功时,将score在原基础上减1
        '''
        key = key.lower()
        new_score = self.client.zincrby(key,-1,proxy)
        if new_score >=0:
            msg = "【失效IP】:代理IP【{}】访问失败,score从【{}】减为【{}】".format(proxy,new_score+1,new_score)
        else:
            self.client.zrem(key,proxy)
            msg = "【失效IP】:代理IP【{}】访问失败,score减为【{}】,从数据库中移除".format(proxy,min_score)
        logging.info(msg)

    def random_proxy(self, key):
        '''
        从数据库中随机获取一个代理IP地址。
        规则:先取出分数为100的代理IP,如果不存在,则从分数排名前20的序列中随机返回一个
        '''
        target_list = self.client.zrangebyscore(key,max_score,max_score)
        if not target_list:
            target_list = self.client.zrevrange(key,0,20)
        if not target_list:
            raise EmptyPool('pool is empty')
        return random.choice(target_list)

    def get_score(self, proxy):
        '''获取proxy的分数'''
        key = proxy.split(':',1)[0].lower()
        score = self.client.zscore(key,proxy)
        return score

    @property
    def http_proxies(self):
        '''
        获取全部http协议的代理IP
        '''
        if self.http_count:
            return self.client.zrange('http',0,-1)
        else:
            raise EmptyPool('当前数据库没有http代理IP')

    @property
    def https_proxies(self):
    	'''
        获取全部https协议的代理IP
        '''
        if self.https_count:
            return self.client.zrange('https', 0, -1)
        else:
            raise EmptyPool('当前数据库没有https代理IP')

    @property
    def all_proxies(self):
    	'''
        获取全部代理IP
        '''
        if self.all_count:
            proxies = self.client.zrange('http',0,-1) + self.client.zrange('https',0,-1)
            return proxies
        else:
            raise EmptyPool('当前数据库没有代理IP')

    @property
    def http_count(self):
        '''获取http代理IP的个数'''
        number = self.client.zcard('http')
        return number

    @property
    def https_count(self):
        '''获取https代理IP的个数'''
        number = self.client.zcard('https')
        return number

    @property
    def all_count(self):
    	'''获取代理IP的个数'''
        return self.http_count+self.https_count

if __name__  ==  '__main__':
    client = ProxiesClient()
    print(client.all_proxies)

4 proxies_downloader.py组件

实现代理IP的下载及入库初始化的功能。这里借用了动态导包导入模块。模块的名称都放在setting.py文件中的SPIDER_LIST中。

"""实现代理IP的下载及入库功能"""
from setting import SPIDER_LIST
import importlib
from proxies_pool.proxies_client import ProxiesClient
from threading import Thread,Lock

class ProxiesDownloader(object):
    def __init__(self):
        self.client = ProxiesClient()
        self._original_proxy_num = self.client.all_count
        self._count = 0
        self._lock = Lock()

    @property
    def _spiders(self):
        spider_list = []
        for spider_name in SPIDER_LIST:
            module_name = spider_name.rsplit(".", 1)[0]
            class_name = spider_name.rsplit('.', 1)[1]
            module = importlib.import_module(module_name)
            spider = getattr(module, class_name)
            spider_list.append(spider())
        return spider_list

    def download(self):
        th_list = []
        for spider in self._spiders:
            th = Thread(target=self._download,args=(spider,))
            th_list.append(th)
            th.setDaemon(True)
            th.start()
        for th in th_list:
            th.join()

    def _download(self,spider):
        try:
            for item in spider.proxies:
                self.client.add_new(key=item['scheme'], proxy=item['proxy'])
                self._lock.acquire()
                self._count += 1
                self._lock.release()
        except Exception as e:
            print(e)

    @property
    def download_count(self):
        return self._count

    @property
    def new_add_count(self):
        return self.client.all_count-self._original_proxy_num

if __name__ == '__main__':
    downloader = ProxiesDownloader()
    downloader.download()
    print('*'*100)
    print('下载完毕')
    print('共爬取了【{}】个代理IP'.format(downloader.download_count))
    print('新入库了【{}】个代理IP'.format(downloader.new_add_count))

5 proxies_verify.py组件

验证代理IP的有效性。http的验证地址为“http://www.baidu.com”; https的验证地址为“https://www.baidu.com

由于验证代理IP比较耗时间,因此用协程运行。

"""验证代理IP的有效性"""
from gevent import pool,monkey
monkey.patch_all()
import requests
import time
from fake_useragent import UserAgent
from proxies_pool.proxies_client import ProxiesClient
from queue import Queue
from requests.exceptions import RequestException
import logging

logging.basicConfig(level=logging.INFO,
                format='%(asctime)s %(threadName)s %(levelname)s : %(message)s',
                datefmt='%Y-%m-%d %H:%M:%S')

class ProxiesVerify(object):
    def __init__(self):
        self.client = ProxiesClient()
        self.user_agent = UserAgent()
        self.base_url = '{}://www.baidu.com/'
        self.proxies_queue = Queue()
        self.pool = pool.Pool()

    def _send_response(self):
        time.sleep(1)
        proxy = self.proxies_queue.get()
        scheme = proxy.split(':',1)[0]
        proxies = {
            scheme : proxy
        }
        test_url = self.base_url.format(scheme)
        headers = {'User_Agent':self.user_agent.random}
        try:
            response = requests.get(test_url,headers=headers,proxies=proxies,timeout=5)
            if response.status_code == 200:
                self.client.max_score(scheme,proxy)
            else:
                self.client.reduce_score(scheme,proxy)
                logging.info('【请求异常】:异常原因【请求代码异常】')
        except RequestException as e:
            msg = '【请求异常】,异常原因【{}】'.format(e.args[0])
            logging.info(msg)
            self.client.reduce_score(scheme,proxy)
        finally:
            self.proxies_queue.task_done()

    def _proxies_queue(self):
        for proxy in self.client.all_proxies:
            self.proxies_queue.put(proxy)

    def verify(self):
        self.pool.apply_async(self._proxies_queue)
        time.sleep(3)
        for i in range(20):
            self.pool.apply_async(self._send_response,callback=self._test_task)
        self.proxies_queue.join()

    def _test_task(self,item):
        self.pool.apply_async(self._send_response,callback=self._test_task)

if __name__ == '__main__':
    verify = ProxiesVerify()
    verify.verify()

6 proxies_api.py组件

网络接口

"""网络接口访问"""
from flask import Flask
from proxies_pool.setting import FLASK_API_PORT,FLASK_API_HOST
from proxies_pool.proxies_client import ProxiesClient
import json

host = FLASK_API_HOST
port = FLASK_API_PORT

class ProxiesApi(object):
    def __init__(self):
        self.app = Flask(__name__)
        self.client = ProxiesClient()

        @self.app.route('/http')
        def http_proxies():
            return json.dumps(self.client.http_proxies)

        @self.app.route('/https')
        def https_proxies():
            return json.dumps(self.client.https_proxies)

        @self.app.route('/http/random')
        def http_random():
            return json.dumps(self.client.random_proxy('http'))

        @self.app.route('/https/random')
        def https_random():
            return json.dumps(self.client.random_proxy('https'))

        @self.app.route('/')
        def homepage():
            return 'Welcome to proxies pool'

    def api(self):
        self.app.run(host=host,port=port)

if __name__ == '__main__':
    app = ProxiesApi()
    app.api()

7 setting.py组件

# redis配置
REDIS_DB = 1
REDIS_HOST = 'localhost'
REDIS_PORT = '6379'


# spider_name
SPIDER_LIST = [
    'spider.kuaidaili.KuaiSpider',
    'spider.xicidaili.XiciSpider',
    'spider.yundaili.YunSpider'
]

# flask 接口
FLASK_API_HOST = '127.1.1.1'
FLASK_API_PORT = '8080'

8 结果展示

1 下载代理IP并入库

爬虫实战:IP代理池

2 验证代理IP有效性
爬虫实战:IP代理池
3 网络接口
爬虫实战:IP代理池