欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Python如何搭建代理池

程序员文章站 2023-11-29 11:31:46
由于爬虫工作往往有大量数据需要爬取,便需要大量的备用IP更换,这时就需要用到代理IP池。将大量可以用于更换的代理IP汇聚要一起,便于管理和调用,IP池就这样产生了。IP池有一下特征:它里面的IP是持续补充的,会有源源不断的新的IP被加入到池子中。它里面的IP是有生命周期的,一但失效就会被清除出 IP池;它里面的IP是可以被任意取出,方便爬虫用户使用的。免费ip其实是不适合搭建代理池的,因为数量上面不具备优势,而且很耗时,大家需要用时间来一一排查,要做就要做好,建议大家还是选择专业一点的提供商。代理池主要...

由于爬虫工作往往有大量数据需要爬取,便需要大量的备用IP更换,这时就需要用到代理IP池。将大量可以用于更换的代理IP汇聚要一起,便于管理和调用,IP池就这样产生了。IP池有一下特征:它里面的IP是持续补充的,会有源源不断的新的IP被加入到池子中。它里面的IP是有生命周期的,一但失效就会被清除出 IP池;它里面的IP是可以被任意取出,方便爬虫用户使用的。

免费ip其实是不适合搭建代理池的,因为数量上面不具备优势,而且很耗时,大家需要用时间来一一排查,要做就要做好,建议大家还是选择专业一点的提供商。

代理池主要分四个部分:IP抓取,IP有效性检测,IP存储,webAPI提供。示意图如下:
Python如何搭建代理池
数据库选择redis,它的有序集合可以给插入值赋于一个"分数"用于排序存取值等等,我们可以利用分数绑定ip的可用性进行提取及检测管理。Redis有序集合的基本操作

  • 爬取模块

该模块主要是采用爬虫去付费api或者代理网站来获取ip,加入数据库并设置初始分数50分。
在这里我采用了aiohttp异步采集,提高效率。所有爬虫基于父类开发。

class BaseCrawler(object):
    urls = []
    # new_loop = asyncio.new_event_loop()
    # asyncio.set_event_loop(new_loop)
    LOOP = asyncio.get_event_loop()
    asyncio.set_event_loop(LOOP)

    @retry(stop_max_attempt_number=3, retry_on_result=lambda x: x is None)
    async def _get_page(self, url):
        async with aiohttp.ClientSession() as session:
            try:
                async with session.get(
                        url,  timeout=10
                ) as resp:
                    # print(dir(resp.content),resp.content)
                    return await resp.text()
            except:
                return ''

    def crawl(self):
        """
        crawl main method
        """
        for url in self.urls:
            app_logger.info(f'fetching {url}')
            # html = self.LOOP.run_until_complete(asyncio.gather(self._get_page(url)))
            html = self.LOOP.run_until_complete(self._get_page(url))
            # print('html', html)
            for proxy in self.parse(html):
                # app_logger.info(f'fetched proxy {proxy.string()} from {url}')
                yield proxy

注:一般采取付费的代理平台,IP稳定寿命长。

  • 测试模块

先明确策略,代理可用则设置为100分,不可用则减一定分值,分值减为0后视为完全不可用且从数据库中删除,当需要使用代理时从redis中优先取出100分ip中的随机一个,若无100分的则取最高分中的一个。检测也采用异步模式,最开始制定测试网址(默认为百度)。

class Tester(object):
    """
    tester for testing proxies in queue
    """

    def __init__(self):
        self.redis = RedisClient()
        self.loop = asyncio.get_event_loop()

    async def test(self, proxy):
        """
        test single proxy
        :param proxy:
        :return:
        """
        async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=False)) as session:
            try:
                app_logger.debug(f'testing {proxy}')
                async with session.get(TEST_URL, proxy=f'http://{proxy}', timeout=TEST_TIMEOUT,
                                       allow_redirects=False) as response:
                    if response.status in TEST_VALID_STATUS:
                        self.redis.set_max(proxy)
                        app_logger.debug(f'proxy {proxy} is valid, set max score')
                    else:
                        self.redis.decrease(proxy)
                        app_logger.debug(f'proxy {proxy} is invalid, decrease score')
            except EXCEPTIONS:
                self.redis.decrease(proxy)
                app_logger.debug(f'proxy {proxy} is invalid, decrease score')

    def run(self):
        """
        test main method
        :return:
        """
        # event loop of aiohttp
        app_logger.info('stating tester...')
        count = self.redis.count()
        app_logger.debug(f'{count} proxies to test')
        for i in range(0, count, TEST_BATCH):
            # start end end offset
            start, end = i, min(i + TEST_BATCH, count)
            app_logger.debug(f'testing proxies from {start} to {end} indices')
            proxies = self.redis.batch(start, end)
            tasks = [self.test(proxy) for proxy in proxies]
            # run tasks using event loop
            self.loop.run_until_complete(asyncio.wait(tasks))
  • API接口模块

该模块以Flask搭建简易web服务,提供随机获取有效IP及总IP数的接口。

__all__ = ['app']

app = Flask(__name__)


def get_conn():
    """
    get redis client object
    :return:
    """
    if not hasattr(g, 'redis'):
        g.redis = RedisClient()
    return g.redis


@app.route('/')
def index():
    """
    get home page, you can define your own templates
    :return:
    """
    return '<h2>Welcome to Proxy Pool System</h2>'


@app.route('/random')
def get_proxy():
    """
    get a random proxy
    :return: get a random proxy
    """
    conn = get_conn()
    return conn.random()


@app.route('/count')
def get_count():
    """
    get the count of proxies
    :return: count, int
    """
    conn = get_conn()
    return str(conn.count())

以上为代理池的基本功能,另外需要再创建一个调度器协调控制各功能模块,检测与获取IP需要定时进行,web服务在后台运行。定时采用Python的定时管理库apscheduler,同时通过multiprocessing管理各功能模块多进程运行。

class Scheduler(object):
    """Scheduler"""

    def run_tester(self, cycle=CYCLE_TESTER):
        """
        定时启动检测器
        :param cycle:
        :return:
        """
        if not ENABLE_TESTER:
            app_logger.info('tester not enabled, exit')
            return
        tester = Tester()
        sch.add_job(tester.run, 'interval', seconds=cycle, id='run_tester')
        sch.start()

    def run_getter(self, cycle=CYCLE_GETTER):
        """
        定时开启爬虫补充代理
        :param cycle:
        :return:
        """
        if not ENABLE_GETTER:
            app_logger.info('getter not enabled, exit')
            return
        getter = Getproxies()
        sch.add_job(getter.run, 'interval', seconds=cycle, id='run_getter')
        sch.start()

    def run_server(self):
        """web服务端开启"""
        if not ENABLE_SERVER:
            app_logger.info('server not enabled, exit')
            return
        app.run(host=API_HOST, port=API_PORT, threaded=API_THREADED)

    def run(self):
        """调度器启动"""
        global tester_process, getter_process, server_process
        try:
            app_logger.info('starting proxypool...')
            if ENABLE_TESTER:
                tester_process = multiprocessing.Process(target=self.run_tester)
                app_logger.info(f'starting tester, pid {tester_process.pid}...')
                tester_process.start()

            if ENABLE_GETTER:
                getter_process = multiprocessing.Process(target=self.run_getter)
                # print(dir(getter_process),getter_process)
                app_logger.info(f'starting getter, pid{getter_process.pid}...')
                getter_process.start()

            if ENABLE_SERVER:
                server_process = multiprocessing.Process(target=self.run_server)
                app_logger.info(f'starting server, pid{server_process.pid}...')
                server_process.start()

            tester_process.join()
            getter_process.join()
            server_process.join()
        except KeyboardInterrupt:
            app_logger.info('received keyboard interrupt signal')
            tester_process.terminate()
            getter_process.terminate()
            server_process.terminate()
        finally:
            # must call join method before calling is_alive
            tester_process.join()
            getter_process.join()
            server_process.join()
            app_logger.info(f'tester is {"alive" if tester_process.is_alive() else "dead"}')
            app_logger.info(f'getter is {"alive" if getter_process.is_alive() else "dead"}')
            app_logger.info(f'server is {"alive" if server_process.is_alive() else "dead"}')
            app_logger.info('proxy terminated')

以上为代理池搭建的过程,有兴趣可以去我的GitHub上看源码

本文地址:https://blog.csdn.net/weixin_44606644/article/details/107083770