欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

python爬取智联招聘职位信息(多进程)

程序员文章站 2022-05-09 17:42:28
...

测试了下,采用单进程爬取5000条数据大概需要22分钟,速度太慢了点。我们把脚本改进下,采用多进程。

首先获取所有要爬取的URL,在这里不建议使用集合,字典或列表的数据类型来保存这些URL,因为数据量太大,太消耗内存,这里,python的生成器就发挥作用了。

'''
遇到不懂的问题?Python学习交流群:821460695满足你的需求,资料都已经上传群文件,可以自行下载!
'''
def get_urls(total_page,cityname,jobname):
    '''
    获取需要爬取的URL以及部分职位信息
    :param start: 开始的工作条数
    :param cityname: 城市名
    :param jobname: 工作名
    :return: 字典
    '''
    for start in range(total_page):
        url = r'https://fe-api.zhaopin.com/c/i/sou?start={}&pageSize=60&cityId={}&workExperience=-1&education=-1' \
              r'&companyType=-1&employmentType=-1&jobWelfareTag=-1&kw={}&kt=3'.format(start*60,cityname,jobname)
        try:
            rec = requests.get(url)
            if rec.status_code == 200:
                j = json.loads(rec.text)
                results = j.get('data').get('results')
                for job in results:
                    empltype = job.get('emplType')  # 职位类型,全职or校园
                    if empltype=='全职':
                        positionURL = job.get('positionURL') # 职位链接
                        createDate = job.get('createDate') # 招聘信息创建时间
                        updateDate = job.get('updateDate') # 招聘信息更新时间
                        endDate = job.get('endDate') # 招聘信息截止时间
                        positionLabel = job.get('positionLabel')
                        if positionLabel:
                            jobLight_par = (re.search('"jobLight":\[(.*?|[\u4E00-\u9FA5]+)\]',job.get('positionLabel'))) # 职位亮点
                            jobLight = jobLight_par.group(1) if jobLight_par else None
                        else:
                            jobLight = None
                        yield {
                            'positionURL':positionURL,
                            'createDate':createDate,
                            'updateDate':updateDate,
                            'endDate':endDate,
                            'jobLight':jobLight
                        }
        except Exception as e:
            logger.error('get urls faild:%s', e)
            return None

在使用多进程之前,有两个问题需要解决:

1、在爬取过程中,即需要把爬取完成的URL实时保存到old_url这个变量中,又要去查询要爬取的URL是否在这个old_url,那么就要使这个old_url的变量在多个进程之间共享数据。这里使用multiprocessing的Manager()方法

2、每个进程都要把爬取下来的数据保存到同一个CSV文件中,多个进程同时去修改一个CSV,当然会报异常。这里我们引入回调函数来解决整个问题

def mycallback(data):
    if data:
        csv_filename = data.pop('csv_filename')
        with open(csv_filename,'a+',newline='',encoding='utf-8-sig') as f:
            f_csv = csv.DictWriter(f,data.keys())
            f_csv.writerow(data)

好了,解决上述两个问题后,就可以使用进程池Pool()来实现多进程了

'''
遇到不懂的问题?Python学习交流群:821460695满足你的需求,资料都已经上传群文件,可以自行下载!
'''
if __name__=='__main__':
    start_time = datetime.datetime.now()
    logger.info('*' * 20 + "start running spider!" + '*' * 20)
    old_url_l = load_progress('old_url.txt')
    manager = Manager()
    old_url = manager.list(old_url_l)
    if not os.path.exists(output_path):
        os.mkdir(output_path)
    for jobname in job_names:
        for cityname in city_names:
            pool = Pool()
            logger.info('*'*10+'start spider '+'jobname:'+jobname+'city:'+cityname+'*'*10)
            total_page = get_page_nums(cityname,jobname)
            csv_filename=output_path+'/{0}_{1}.csv'.format(jobname,cityname)
            if not os.path.exists(csv_filename):
                write_csv_headers(csv_filename)
            urls = get_urls(total_page, cityname, jobname)
            for url in urls:
                pool.apply_async(get_job_info,args=(url,old_url,csv_filename),callback=mycallback)
            pool.close()
            pool.join()
            logger.info('*'*10+'jobname:'+jobname+'city:'+cityname+' spider finished!'+'*'*10)
    save_progress(set(old_url), 'old_url.txt')
    end_time = datetime.datetime.now()
    logger.info('*' * 20 + "spider finished!Running time:%s" % (end_time - start_time) + '*' * 20)
    print("Running time:%s" % (end_time - start_time))