欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

多线程京东抓包爬取

程序员文章站 2024-03-25 13:26:22
...

                                   多线程京东抓包爬取

    考虑到上次利用splash动态渲染爬取京东商品信息效率有限,此次是对京东网站进行逆向分析爬取,利用requests模拟浏览器请求爬取商品信息,并加上多线程爬取,爬取效率得到了大大提高。

多线程京东抓包爬取

    爬取商品的数据包含:商品名,商品ID,作者,价格(折后价、折前价和电子版价格(若有)),书籍排名,评论统计(评论总数、好评数、差评数、好评率、默认好评数),评论内容。

    数据以json形式保存到非关系数据库elasticsearch中。

多线程京东抓包爬取

代码实现:

import requests
from bs4 import BeautifulSoup
import time
import re
from scrapy.selector import Selector
import json
from e_commerce.module.es_mapping import ProductInfoType
from threading import Thread
from queue import Queue


sessions = requests.session() #创建一个会话
page_url = "https://search.jd.com/Search"
comment_count_url = 'https://sclub.jd.com/comment/productCommentSummaries.action'
comment_url = 'https://sclub.jd.com/comment/productPageComments.action'
start_time = time.time()
exit_ParseThread = False
exit_SaveThread = False

def parse_Product(page_queue , info_queue):
    #解析列表商品信息
    #获取第一页商品数据,可遍历page获取更多页面商品数据
    while True:
        if page_queue.empty():
            break
        page = page_queue.get()  #1,3,5,...
        params_1 = {
            "keyword": "python" ,
            "enc" : "utf-8" ,
            "qrst" : "1",
            "rt" : '1' ,
            "stop" : '1' ,
            "vt" : '2',
            "wq" : 'python',
            "page" : page ,
            # "s" : '1' ,
            # "click" : '0'
        }
        headers = {
            'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8',
            'accept-encoding': 'gzip, deflate, br',
            'accept-language': 'zh-CN,zh;q=0.9',
            'cache-control': 'max-age=0',
            'upgrade-insecure-requests': '1',
            'user-agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.99 Safari/537.36',
        }
        #请求京东前30个商品信息
        pro_res = sessions.get(page_url , headers= headers , params= params_1)
        # print("parse front 30 product info")
        parseIdAndPrice(pro_res.content , info_queue)
        params_2 = params_1.copy()
        params_2.update({
            "page" : page + 1 ,
            # "s" : '31' ,
            # "click" : '',
            'scrolling': 'y',
            'log_id': str(int(time.time()*100000)/100000) ,
            'tpl': '2_M' ,
        })
        headers = {
            'referer': 'https://search.jd.com/Search?keyword=python&enc=utf-8&qrst=1&rt=1&stop=1&vt=2&wq=python&page=1&s=1&click=0',
            'user-agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.99 Safari/537.36',
            'x-requested-with': 'XMLHttpRequest',
        }
        #请求京东页面后30个商品信息
        last_res = sessions.get(url= page_url , params = params_2 , headers = headers)
        # print("parse last 30 product info")
        parseIdAndPrice(last_res.content , info_queue)

def parseIdAndPrice(response , info_queue):
    #解析商品id和price方法,构造提取商品信息入口
    # productIdAndPrice = set()
    soup = BeautifulSoup(response , 'lxml')
    product_id = soup.find_all('li' , 'gl-item')
    for info in product_id:
        id = info['data-sku']
        price = info.get_text()
        price =  float(re.findall('¥(\d+.\d+)?.*' , price)[0])
        info_queue.put((id , price))  #将商品id、price put入queue*享
        # break  #方便调试
    

def parse(info_queue , data_queue):
    #解析详细商品信息
    while not exit_ParseThread:
        id, price = info_queue.get()
        url = 'https://item.jd.com/{0}.html'.format(id)

        headers = {
            'referer': 'https://search.jd.com/Search?keyword=python&enc=utf-8&qrst=1&rt=1&stop=1&vt=2&wq=python&page=1&s=1&click=0',
            'upgrade-insecure-requests': '1',
            'user-agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.99 Safari/537.36',
        }
        response = sessions.get(url= url , headers = headers)
        print("request product detail info url")
        # print(requests.utils.dict_from_cookiejar(sessions.cookies))
        info_part1 = parse_PartInfo(response , id , price)
        info_part2 = parse_CommentCount(url , id)
        info_part3 = parse_comments(id)
        data_queue.put((info_part1 , info_part2 , info_part3 ))
        

def parse_PartInfo(response , id , price):
    #请求、解析商品页面的部分信息,如title、author、price、rank
    soup = BeautifulSoup(response.content, 'lxml')
    title = soup.find('div', 'sku-name').text
    try:
        author_soup =soup.find('div', id="p-author").find('a')
        author = author_soup.text
    except:
        author = 'None'

    cat_list = soup.find_all('body')[0]["class"]
    cat = []
    for cat_raw in cat_list[:3]:
        cat.append(re.match('cat-\d-(\d+)', cat_raw).group(1))

    area = requests.utils.dict_from_cookiejar(sessions.cookies)['ipLoc-djd']
    area = re.sub('(\d+)-(\d+)-(\d+)-(\d+)', r'\1_\2_\3_\4', area)

    headers = {
        'Referer': 'https://item.jd.com/{0}.html?jd_pop=4944111d-b052-45e8-af80-fcefae1f0588&abt=0'.format(id),
        'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.99 Safari/537.36',
    }
    params = {
        'skuId': '11487324',
        'cat': ','.join(cat),
        'area': area,  # 不能少
        'callback': 'book_jsonp_callback',
    }
    #请求、解析价格和排名
    res = sessions.get(url='https://c.3.cn/book', params=params, headers=headers)
    print("request the price and rank")
    match_obj = re.match('.*({.*})', res.text)
    if match_obj:
        info_str = match_obj.group(1)
        info_dict = eval(info_str)
    else:
        info_dict = {}
    if 'ebookId' in info_dict:
        ebookId = info_dict['ebookId']
    else:
        ebookId = 'None'
    if 'rank' in info_dict:
        rank = info_dict['rank']
    else:
        rank = 0
    if 'p' in info_dict:
        e_price = float(info_dict['p'])
    else:
        e_price = 0.0
    if 'm' in info_dict:
        pre_price = float(info_dict['m'])
    else:
        pre_price = 0.0
    # for e in ['ebookId' , 'rank' , 'p' , 'm']:
    #     if e not in info_dict:
    #         e = 'None'

    back_dict = {
        'title':title,
        'author':author,
        'ebookId': ebookId,
        'e_price': e_price,
        'pre_price' : pre_price,
        'now_price' : price ,
        'rank' : rank ,
        'product_url' : response.url ,
        'product_id' : id
    }
    return back_dict

def parse_CommentCount(url , product_id):
    #解析评论统计数
    headers = {
        'referer': url,
        'user-agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.99 Safari/537.36',
    }
    params = {
        'referenceIds': product_id,
        # 'callback': 'jQuery5872511',
        '_': int(time.time() * 1000)
    }
    res2 = sessions.get(url=comment_count_url, headers=headers, params=params)
    print("request the comment count")
    try:
        res_json = json.loads(res2.text)
        comments = res_json['CommentsCount'][0]
        CommentCount = comments['CommentCount']
        GoodCount = comments['GoodCount']
        DefaultGoodCount = comments['DefaultGoodCount']
        GoodRate = comments['GoodRate']
        PoorCount = comments['PoorCount']
        comments_dict = {
            'CommentCount': CommentCount,
            'GoodCount': GoodCount,
            'DefaultGoodCount': DefaultGoodCount,
            'GoodRate': GoodRate,
            'PoorCount': PoorCount
        }
        return comments_dict
    except:
        return {}


def parse_comments(id):
    #解析前10条评论内容
    #可改变遍历page获取更过评论
    params = {
        'productId': id,
        'score': '0',
        'sortType': '5',
        'page': '0',
        'pageSize': '10',
        'isShadowSku': '0',
        'fold': '1',
    }
    headers = {
        'referer': 'https://item.jd.com/{0}.html?jd_pop=4944111d-b052-45e8-af80-fcefae1f0588&abt=0'.format(id),
        'user-agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.99 Safari/537.36'
    }
    comment_content = []
    for i in range(2):   #解析前2页评论
        params.update({
            'page' : str(i)
        })
        response = sessions.get(url= comment_url , params= params , headers= headers)
        print("request comment content , this page is %s"%(params['page']))
        try:
            res_json = json.loads(response.text)
            comments_list = res_json['comments']
        # except Exception as e:
        #     print('Reason1:' , e)
        #     res_json = json.loads(response.content.decode('utf-8'))
        #     comments_list = res_json['comments']
        except BaseException as e:
            print('Reason:', e)
            comments_list = []
        for comment in comments_list:
            comment_content.append(comment['content'])
    return comment_content

def save_to_es(data_queue):
    # print("New thread to process")
    i = 0
    while not exit_SaveThread:
        info_part1 , info_part2 , info_part3 = data_queue.get()
        product_item = ProductInfoType()
        product_item.title = info_part1['title']
        product_item.product_id = info_part1['product_id']
        product_item.product_url = info_part1['product_url']
        product_item.author = info_part1['author']
        product_item.price.pre_price = info_part1['pre_price']
        product_item.price.now_price = info_part1['now_price']
        product_item.price.e_price = info_part1['e_price']   #嵌套
        product_item.rank = info_part1['rank']
        product_item.comments = {'CommentCount':info_part2['CommentCount'],
                                 'GoodCount': info_part2['GoodCount'],
                                 'DefaultGoodCount' : info_part2['DefaultGoodCount'],
                                 'GoodRate' : info_part2['GoodRate'],
                                 'PoorCount' : info_part2['PoorCount'],
                                 'content' : info_part3}
        product_item.save()
        i += 1
        print('data is uploaded to es ' , i)
        print('\n')
    i = None

if __name__ == '__main__':
    # parse_Product()
    page_queue , info_queue , data_queue = Queue() , Queue() , Queue()
    for i in range(1 , 100 , 2):
        page_queue.put(i)
    #各三个线程
    page_thread_list = []
    info_thread_list = []
    save_thread_list = []
    for i in range(3):
        #每线程设置三个线程
        page_thread = Thread(target= parse_Product , args= (page_queue , info_queue))
        info_thread = Thread(target= parse , args= (info_queue , data_queue))
        save_thread = Thread(target= save_to_es , args= (data_queue , ))
        #将全部线程保存到列表
        page_thread_list.append(page_thread)
        info_thread_list.append(info_thread)
        save_thread_list.append(save_thread)
        #开启线程
        page_thread.start()
        info_thread.start()
        save_thread.start()

    while not page_queue.empty():
        pass

    #让主线程等待子线程结束
    for t in page_thread_list:
        t.join()
    #当info队列为空时通知线程退出
    while not info_queue.empty():
        pass
    exit_ParseThread = True
    for t in info_thread_list:
        t.join()

    #当data队列为空时通知线程退出
    while not data_queue.empty():
        pass
    exit_SaveThread = True
    for t in save_thread_list:
        t.join()

    end_time = time.time()
    print('time:',end_time - start_time)

 创建elasticsreach的映射表代码:

from elasticsearch_dsl import DocType , InnerObjectWrapper , Date , Text , Integer ,Keyword , Nested , Completion ,analyzer ,tokenizer , Float
from elasticsearch_dsl.connections import connections
from elasticsearch_dsl.analysis import CustomAnalyzer as _CustomAnalyzer
import datetime

# Define a default Elasticsearch client
connections.create_connection(hosts=['localhost']) #允许连接到多个服务器

class CustomAnalyzer(_CustomAnalyzer):   #自定义analyzer
    def get_analysis_definition(self):   #不做任何处理,但这样可以防止因analyzer的bug导致报错
        return {}

ik_analyzer = CustomAnalyzer("ik_max_word" , filter= ["lowercase"])#filter作用为大小写转换

# my_analyzer = analyzer('my_analyzer',
#     tokenizer=tokenizer('trigram', 'nGram', min_gram=3, max_gram=3),
#     filter=['lowercase']
# )

class Comment(InnerObjectWrapper):
    created_at = Date()
    def age(self):
        return datetime.datetime.now() - self.created_at

class ProductInfoType(DocType):
    #create the data type order to store to elasticsearch
    suggest = Completion(analyzer= ik_analyzer)
    title = Text(analyzer= 'ik_max_word')
    product_id = Keyword()
    product_url = Keyword()
    author = Text(analyzer= 'ik_max_word')
    price = Nested(
        doc_class = Comment ,
        properties={
            "pre_price" : Float(),
            "now_price" : Float(),
            "e_price" : Float()
        }
    )
    rank = Integer()
    comments = Nested(
        doc_class=Comment,
        properties= {
            'CommentCount' : Integer(),
            'GoodCount' : Integer(),
            'DefaultGoodCount' : Integer(),
            'GoodRate' : Float(),
            'PoorCount' : Integer(),
            'content' : Text(analyzer= 'ik_max_word')
        }
    )

    class Meta:
        index = 'jd'
        doc_type = "product"

if __name__ == '__main__':
    ProductInfoType.init()
    print("------es index is created------")