多线程京东抓包爬取
程序员文章站
2024-03-25 13:26:22
...
多线程京东抓包爬取
考虑到上次利用splash动态渲染爬取京东商品信息效率有限,此次是对京东网站进行逆向分析爬取,利用requests模拟浏览器请求爬取商品信息,并加上多线程爬取,爬取效率得到了大大提高。
爬取商品的数据包含:商品名,商品ID,作者,价格(折后价、折前价和电子版价格(若有)),书籍排名,评论统计(评论总数、好评数、差评数、好评率、默认好评数),评论内容。
数据以json形式保存到非关系数据库elasticsearch中。
代码实现:
import requests
from bs4 import BeautifulSoup
import time
import re
from scrapy.selector import Selector
import json
from e_commerce.module.es_mapping import ProductInfoType
from threading import Thread
from queue import Queue
sessions = requests.session() #创建一个会话
page_url = "https://search.jd.com/Search"
comment_count_url = 'https://sclub.jd.com/comment/productCommentSummaries.action'
comment_url = 'https://sclub.jd.com/comment/productPageComments.action'
start_time = time.time()
exit_ParseThread = False
exit_SaveThread = False
def parse_Product(page_queue , info_queue):
#解析列表商品信息
#获取第一页商品数据,可遍历page获取更多页面商品数据
while True:
if page_queue.empty():
break
page = page_queue.get() #1,3,5,...
params_1 = {
"keyword": "python" ,
"enc" : "utf-8" ,
"qrst" : "1",
"rt" : '1' ,
"stop" : '1' ,
"vt" : '2',
"wq" : 'python',
"page" : page ,
# "s" : '1' ,
# "click" : '0'
}
headers = {
'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8',
'accept-encoding': 'gzip, deflate, br',
'accept-language': 'zh-CN,zh;q=0.9',
'cache-control': 'max-age=0',
'upgrade-insecure-requests': '1',
'user-agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.99 Safari/537.36',
}
#请求京东前30个商品信息
pro_res = sessions.get(page_url , headers= headers , params= params_1)
# print("parse front 30 product info")
parseIdAndPrice(pro_res.content , info_queue)
params_2 = params_1.copy()
params_2.update({
"page" : page + 1 ,
# "s" : '31' ,
# "click" : '',
'scrolling': 'y',
'log_id': str(int(time.time()*100000)/100000) ,
'tpl': '2_M' ,
})
headers = {
'referer': 'https://search.jd.com/Search?keyword=python&enc=utf-8&qrst=1&rt=1&stop=1&vt=2&wq=python&page=1&s=1&click=0',
'user-agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.99 Safari/537.36',
'x-requested-with': 'XMLHttpRequest',
}
#请求京东页面后30个商品信息
last_res = sessions.get(url= page_url , params = params_2 , headers = headers)
# print("parse last 30 product info")
parseIdAndPrice(last_res.content , info_queue)
def parseIdAndPrice(response , info_queue):
#解析商品id和price方法,构造提取商品信息入口
# productIdAndPrice = set()
soup = BeautifulSoup(response , 'lxml')
product_id = soup.find_all('li' , 'gl-item')
for info in product_id:
id = info['data-sku']
price = info.get_text()
price = float(re.findall('¥(\d+.\d+)?.*' , price)[0])
info_queue.put((id , price)) #将商品id、price put入queue*享
# break #方便调试
def parse(info_queue , data_queue):
#解析详细商品信息
while not exit_ParseThread:
id, price = info_queue.get()
url = 'https://item.jd.com/{0}.html'.format(id)
headers = {
'referer': 'https://search.jd.com/Search?keyword=python&enc=utf-8&qrst=1&rt=1&stop=1&vt=2&wq=python&page=1&s=1&click=0',
'upgrade-insecure-requests': '1',
'user-agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.99 Safari/537.36',
}
response = sessions.get(url= url , headers = headers)
print("request product detail info url")
# print(requests.utils.dict_from_cookiejar(sessions.cookies))
info_part1 = parse_PartInfo(response , id , price)
info_part2 = parse_CommentCount(url , id)
info_part3 = parse_comments(id)
data_queue.put((info_part1 , info_part2 , info_part3 ))
def parse_PartInfo(response , id , price):
#请求、解析商品页面的部分信息,如title、author、price、rank
soup = BeautifulSoup(response.content, 'lxml')
title = soup.find('div', 'sku-name').text
try:
author_soup =soup.find('div', id="p-author").find('a')
author = author_soup.text
except:
author = 'None'
cat_list = soup.find_all('body')[0]["class"]
cat = []
for cat_raw in cat_list[:3]:
cat.append(re.match('cat-\d-(\d+)', cat_raw).group(1))
area = requests.utils.dict_from_cookiejar(sessions.cookies)['ipLoc-djd']
area = re.sub('(\d+)-(\d+)-(\d+)-(\d+)', r'\1_\2_\3_\4', area)
headers = {
'Referer': 'https://item.jd.com/{0}.html?jd_pop=4944111d-b052-45e8-af80-fcefae1f0588&abt=0'.format(id),
'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.99 Safari/537.36',
}
params = {
'skuId': '11487324',
'cat': ','.join(cat),
'area': area, # 不能少
'callback': 'book_jsonp_callback',
}
#请求、解析价格和排名
res = sessions.get(url='https://c.3.cn/book', params=params, headers=headers)
print("request the price and rank")
match_obj = re.match('.*({.*})', res.text)
if match_obj:
info_str = match_obj.group(1)
info_dict = eval(info_str)
else:
info_dict = {}
if 'ebookId' in info_dict:
ebookId = info_dict['ebookId']
else:
ebookId = 'None'
if 'rank' in info_dict:
rank = info_dict['rank']
else:
rank = 0
if 'p' in info_dict:
e_price = float(info_dict['p'])
else:
e_price = 0.0
if 'm' in info_dict:
pre_price = float(info_dict['m'])
else:
pre_price = 0.0
# for e in ['ebookId' , 'rank' , 'p' , 'm']:
# if e not in info_dict:
# e = 'None'
back_dict = {
'title':title,
'author':author,
'ebookId': ebookId,
'e_price': e_price,
'pre_price' : pre_price,
'now_price' : price ,
'rank' : rank ,
'product_url' : response.url ,
'product_id' : id
}
return back_dict
def parse_CommentCount(url , product_id):
#解析评论统计数
headers = {
'referer': url,
'user-agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.99 Safari/537.36',
}
params = {
'referenceIds': product_id,
# 'callback': 'jQuery5872511',
'_': int(time.time() * 1000)
}
res2 = sessions.get(url=comment_count_url, headers=headers, params=params)
print("request the comment count")
try:
res_json = json.loads(res2.text)
comments = res_json['CommentsCount'][0]
CommentCount = comments['CommentCount']
GoodCount = comments['GoodCount']
DefaultGoodCount = comments['DefaultGoodCount']
GoodRate = comments['GoodRate']
PoorCount = comments['PoorCount']
comments_dict = {
'CommentCount': CommentCount,
'GoodCount': GoodCount,
'DefaultGoodCount': DefaultGoodCount,
'GoodRate': GoodRate,
'PoorCount': PoorCount
}
return comments_dict
except:
return {}
def parse_comments(id):
#解析前10条评论内容
#可改变遍历page获取更过评论
params = {
'productId': id,
'score': '0',
'sortType': '5',
'page': '0',
'pageSize': '10',
'isShadowSku': '0',
'fold': '1',
}
headers = {
'referer': 'https://item.jd.com/{0}.html?jd_pop=4944111d-b052-45e8-af80-fcefae1f0588&abt=0'.format(id),
'user-agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.99 Safari/537.36'
}
comment_content = []
for i in range(2): #解析前2页评论
params.update({
'page' : str(i)
})
response = sessions.get(url= comment_url , params= params , headers= headers)
print("request comment content , this page is %s"%(params['page']))
try:
res_json = json.loads(response.text)
comments_list = res_json['comments']
# except Exception as e:
# print('Reason1:' , e)
# res_json = json.loads(response.content.decode('utf-8'))
# comments_list = res_json['comments']
except BaseException as e:
print('Reason:', e)
comments_list = []
for comment in comments_list:
comment_content.append(comment['content'])
return comment_content
def save_to_es(data_queue):
# print("New thread to process")
i = 0
while not exit_SaveThread:
info_part1 , info_part2 , info_part3 = data_queue.get()
product_item = ProductInfoType()
product_item.title = info_part1['title']
product_item.product_id = info_part1['product_id']
product_item.product_url = info_part1['product_url']
product_item.author = info_part1['author']
product_item.price.pre_price = info_part1['pre_price']
product_item.price.now_price = info_part1['now_price']
product_item.price.e_price = info_part1['e_price'] #嵌套
product_item.rank = info_part1['rank']
product_item.comments = {'CommentCount':info_part2['CommentCount'],
'GoodCount': info_part2['GoodCount'],
'DefaultGoodCount' : info_part2['DefaultGoodCount'],
'GoodRate' : info_part2['GoodRate'],
'PoorCount' : info_part2['PoorCount'],
'content' : info_part3}
product_item.save()
i += 1
print('data is uploaded to es ' , i)
print('\n')
i = None
if __name__ == '__main__':
# parse_Product()
page_queue , info_queue , data_queue = Queue() , Queue() , Queue()
for i in range(1 , 100 , 2):
page_queue.put(i)
#各三个线程
page_thread_list = []
info_thread_list = []
save_thread_list = []
for i in range(3):
#每线程设置三个线程
page_thread = Thread(target= parse_Product , args= (page_queue , info_queue))
info_thread = Thread(target= parse , args= (info_queue , data_queue))
save_thread = Thread(target= save_to_es , args= (data_queue , ))
#将全部线程保存到列表
page_thread_list.append(page_thread)
info_thread_list.append(info_thread)
save_thread_list.append(save_thread)
#开启线程
page_thread.start()
info_thread.start()
save_thread.start()
while not page_queue.empty():
pass
#让主线程等待子线程结束
for t in page_thread_list:
t.join()
#当info队列为空时通知线程退出
while not info_queue.empty():
pass
exit_ParseThread = True
for t in info_thread_list:
t.join()
#当data队列为空时通知线程退出
while not data_queue.empty():
pass
exit_SaveThread = True
for t in save_thread_list:
t.join()
end_time = time.time()
print('time:',end_time - start_time)
创建elasticsreach的映射表代码:
from elasticsearch_dsl import DocType , InnerObjectWrapper , Date , Text , Integer ,Keyword , Nested , Completion ,analyzer ,tokenizer , Float
from elasticsearch_dsl.connections import connections
from elasticsearch_dsl.analysis import CustomAnalyzer as _CustomAnalyzer
import datetime
# Define a default Elasticsearch client
connections.create_connection(hosts=['localhost']) #允许连接到多个服务器
class CustomAnalyzer(_CustomAnalyzer): #自定义analyzer
def get_analysis_definition(self): #不做任何处理,但这样可以防止因analyzer的bug导致报错
return {}
ik_analyzer = CustomAnalyzer("ik_max_word" , filter= ["lowercase"])#filter作用为大小写转换
# my_analyzer = analyzer('my_analyzer',
# tokenizer=tokenizer('trigram', 'nGram', min_gram=3, max_gram=3),
# filter=['lowercase']
# )
class Comment(InnerObjectWrapper):
created_at = Date()
def age(self):
return datetime.datetime.now() - self.created_at
class ProductInfoType(DocType):
#create the data type order to store to elasticsearch
suggest = Completion(analyzer= ik_analyzer)
title = Text(analyzer= 'ik_max_word')
product_id = Keyword()
product_url = Keyword()
author = Text(analyzer= 'ik_max_word')
price = Nested(
doc_class = Comment ,
properties={
"pre_price" : Float(),
"now_price" : Float(),
"e_price" : Float()
}
)
rank = Integer()
comments = Nested(
doc_class=Comment,
properties= {
'CommentCount' : Integer(),
'GoodCount' : Integer(),
'DefaultGoodCount' : Integer(),
'GoodRate' : Float(),
'PoorCount' : Integer(),
'content' : Text(analyzer= 'ik_max_word')
}
)
class Meta:
index = 'jd'
doc_type = "product"
if __name__ == '__main__':
ProductInfoType.init()
print("------es index is created------")
上一篇: 关于Krpano实现全景按钮的简单动画
下一篇: SQLite的安装及使用