python爬虫 - 起点女生榜单爬取 - 1
python爬虫 - 起点女生榜单爬取
最近一直在追庆余年,顺带瞄了一眼小说,真真是精彩(虽然因为范闲多妻的设定接受不了就放弃了)。
说来说去,还是钟爱女频的修仙小说,所以就想爬一下起点女生网的仙侠奇缘的作品,结果。。。也不知怎么起点竟然只显示5页,一万多部作品,怎么都是5页,所以就偷懒地只爬了三个榜单(藏榜、推荐榜、热销榜,也能说是六个,推荐榜的周榜、月榜、总榜都爬了)。
爬取的都是基本信息:
排名 |
---|
作品名 |
作者 |
作品分类 |
作品状态 |
作品简介 |
作者自定标签 |
最新更新章节 |
因为要爬多个不同网页,所以使用多线程去爬,比较省心,爬取下来总用时10分钟左右。
获取网页
首先还是定义一下headers,只设置了user-agent(感谢起点不封ip,肆无忌惮地拿自家ip爬了好几次),其次就是又对自己的代码优化了一下,发现每次写都会有新的问题,总能成长。莫怪我话多,代码即刻赶到。
def __init__(self):
self.header = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/76.0.3809.132 Safari/537.36"
}
# 获得网页
def get_page(self, url):
# 设置若获取网页出错,重复获取3次
i = 0
while i < 3:
try:
doc = pq(url, self.header) # 获取网页内容
return doc
except Exception as e:
print("\033[4;30;47mINFO:{},{}\033[0m".format(e.args, url)) # 设置打印信息的字体颜色、背景颜色及装饰
i += 1
对了,这次把爬取的代码封装到一个类了。方便另写一个文件写多线程。
解析网页
这次除了爬取表面上的信息,还进作品详细的网页中爬了完整的作品介绍和作者自定标签,所以有两个函数。
# 解析每部作品中简介及作者自定义分类
def parse_detail(self, item):
url = item["作品链接"] # 获取相应网址
if requests.get(url).status_code == 200: # 判断是否能正常打开网页
doc = self.get_page(url)
try:
intro = doc(".book-intro p").text() # 获取作品简介
tag_by_author = doc(".book-state .detail .tag-wrap").find("a").text().replace(" ", "/") # 获取作者自定义标签
return {
"作品简介": intro,
"作者自定标签": tag_by_author
}
except:
intro = doc(".book-intro p").text() # 因部分作品无作者自定义标签,所以此刻只返回简介
return {
"作品简介": intro
}
# 解析网页
def parse_page(self, doc):
li_list = doc(".rank-body .book-img-text li") # 获取主要内容
# print(li_list.text())
for li in li_list.items():
rank_num = li(".book-img-box span").text() # 获取作品拍,排名信息
url = "https:" + li(".book-img-box a").attr("href") # 获取作品地址,方便后续爬取作品详细信息
poster_src = "https:" + li(".book-img-box img").attr("src") # 获取作品封面地址,方便后续保存作品封面
poster_b = requests.get(poster_src, headers=self.header).content # 获取作品封面字节信息
title = li(".book-mid-info h4").text() # 获取作品标题
author = li(".book-mid-info .author .name").text() # 获取作品作者
type = li(".book-mid-info .author .name").siblings("a").text() # 获取作品类型
status = li(".book-mid-info .author span").text() # 获取作品状态
update = li(".book-mid-info .update a").text() # 获取作品最新更新信息
update_time = li(".book-mid-info .update span").text() # 获取作品最新章节更新时间
yield {
"排名": rank_num,
"作品名": title,
"作者": author,
"类型": type,
"作品状态": status,
"最新章节": update,
"更新时间": update_time,
"海报": poster_b,
"作品链接": url
}
主函数定义
主函数定义了各个网站的网址和翻页设定,简单分析一下,就能知道网页规律,所以就不往上放了。
# 定义翻页,主调用函数
def main(self, url_type, delay):
# 判断所要爬y取的网页,并以此创建对应文件,以便后续保存数据
if url_type == "hotsales?":
with open("24小时热销榜.csv", "w", encoding="utf-8", newline="") as f:
writer = csv.writer(f)
writer.writerow(("排名", "作品名", "作者", "类型", "作品状态", "最新章节", "更新时间", "作品简介", "作者自定标签"))
elif url_type == "recom?chn=81&dateType=1&":
with open("周推荐票榜.csv", "w", encoding="utf-8", newline="") as f:
writer = csv.writer(f)
writer.writerow(("排名", "作品名", "作者", "类型", "作品状态", "最新章节", "更新时间", "作品简介", "作者自定标签"))
elif url_type == "recom?chn=81&dateType=2&":
with open("月推荐票榜.csv", "w", encoding="utf-8", newline="") as f:
writer = csv.writer(f)
writer.writerow(("排名", "作品名", "作者", "类型", "作品状态", "最新章节", "更新时间", "作品简介", "作者自定标签"))
elif url_type == "recom?chn=81&dateType=3&":
with open("总推荐票榜.csv", "w", encoding="utf-8", newline="") as f:
writer = csv.writer(f)
writer.writerow(("排名", "作品名", "作者", "类型", "作品状态", "最新章节", "更新时间", "作品简介", "作者自定标签"))
elif url_type == "collect?chn=81&":
with open("收藏榜.csv", "w", encoding="utf-8", newline="") as f:
writer = csv.writer(f)
writer.writerow(("排名", "作品名", "作者", "类型", "作品状态", "最新章节", "更新时间", "作品简介", "作者自定标签"))
# 设定好爬取的页面,共6页
for page in range(1, 6):
url = "https://www.qidian.com/mm/rank/" + url_type + "page=" + str(page)
doc = self.get_page(url)
for item in self.parse_page(doc):
# print(item)
if self.parse_detail(item): # 判断是否爬取到作品的详细信息
item.update(self.parse_detail(item)) # 将作品详细信息(parse_detail)加入到作品基本信息(parse_page)中去
self.write_to_csv(url_type, item) # 保存数据
self.save_img(url_type, item) # 保存图片
else:
self.write_to_csv(url_type, item)
self.save_img(url_type, item)
time.sleep(delay) # 设置睡眠时间(延迟时间)
或许看得眼花,主要是爬取中有些作品网页打不开,有些又没有作者自定义标签,为了把各自的数据都放在一张表里,就麻烦了些。
结尾的睡眠时间也是为了防止被屏蔽设置的。
保存到文件
直接放代码:
# 定义保存文件
def write_to_csv(self, url_type, item):
if len(item) > 9: # 判断是否爬取到作品的详细信息
if url_type == "hotsales?":
with open("24小时热销榜.csv", "a", encoding="utf-8", newline="") as f:
writer = csv.writer(f)
try:
writer.writerow((item["排名"], item["作品名"], item["作者"], item["类型"], item["作品状态"], item["最新章节"], item["更新时间"], item["作品简介"], item["作者自定标签"]))
except:
writer.writerow((item["排名"], item["作品名"], item["作者"], item["类型"], item["作品状态"], item["最新章节"], item["更新时间"], item["作品简介"]))
elif url_type == "recom?chn=81&dateType=1&":
with open("周推荐票榜.csv", "a", encoding="utf-8", newline="") as f:
writer = csv.writer(f)
try:
writer.writerow((item["排名"], item["作品名"], item["作者"], item["类型"], item["作品状态"], item["最新章节"], item["更新时间"], item["作品简介"], item["作者自定标签"]))
except:
writer.writerow((item["排名"], item["作品名"], item["作者"], item["类型"], item["作品状态"], item["最新章节"], item["更新时间"], item["作品简介"]))
elif url_type == "recom?chn=81&dateType=2&":
with open("月推荐票榜.csv", "a", encoding="utf-8", newline="") as f:
writer = csv.writer(f)
try:
writer.writerow((item["排名"], item["作品名"], item["作者"], item["类型"], item["作品状态"], item["最新章节"], item["更新时间"], item["作品简介"], item["作者自定标签"]))
except:
writer.writerow((item["排名"], item["作品名"], item["作者"], item["类型"], item["作品状态"], item["最新章节"], item["更新时间"], item["作品简介"]))
elif url_type == "recom?chn=81&dateType=3&":
with open("总推荐票榜.csv", "a", encoding="utf-8", newline="") as f:
writer = csv.writer(f)
try:
writer.writerow((item["排名"], item["作品名"], item["作者"], item["类型"], item["作品状态"], item["最新章节"], item["更新时间"], item["作品简介"], item["作者自定标签"]))
except:
writer.writerow((item["排名"], item["作品名"], item["作者"], item["类型"], item["作品状态"], item["最新章节"], item["更新时间"], item["作品简介"]))
elif url_type == "collect?chn=81&":
with open("收藏榜.csv", "a", encoding="utf-8", newline="") as f:
writer = csv.writer(f)
try:
writer.writerow((item["排名"], item["作品名"], item["作者"], item["类型"], item["作品状态"], item["最新章节"], item["更新时间"], item["作品简介"], item["作者自定标签"]))
except:
writer.writerow((item["排名"], item["作品名"], item["作者"], item["类型"], item["作品状态"], item["最新章节"], item["更新时间"], item["作品简介"]))
else: # 未爬取到作品的详细信息
if url_type == "hotsales?":
with open("24小时热销榜.csv", "a", encoding="utf-8", newline="") as f:
writer = csv.writer(f)
writer.writerow((item["排名"], item["作品名"], item["作者"], item["类型"], item["作品状态"], item["最新章节"],
item["更新时间"]))
elif url_type == "recom?chn=81&dateType=1&":
with open("周推荐票榜.csv", "a", encoding="utf-8", newline="") as f:
writer = csv.writer(f)
writer.writerow((item["排名"], item["作品名"], item["作者"], item["类型"], item["作品状态"], item["最新章节"],
item["更新时间"]))
elif url_type == "recom?chn=81&dateType=2&":
with open("月推荐票榜.csv", "a", encoding="utf-8", newline="") as f:
writer = csv.writer(f)
writer.writerow((item["排名"], item["作品名"], item["作者"], item["类型"], item["作品状态"], item["最新章节"],
item["更新时间"]))
elif url_type == "recom?chn=81&dateType=3&":
with open("总推荐票榜.csv", "a", encoding="utf-8", newline="") as f:
writer = csv.writer(f)
writer.writerow((item["排名"], item["作品名"], item["作者"], item["类型"], item["作品状态"], item["最新章节"],
item["更新时间"]))
elif url_type == "collect?chn=81&":
with open("收藏榜.csv", "a", encoding="utf-8", newline="") as f:
writer = csv.writer(f)
writer.writerow((item["排名"], item["作品名"], item["作者"], item["类型"], item["作品状态"], item["最新章节"],
item["更新时间"]))
定义多线程
为了看着清爽,重新建了个文件写多线程,用的是最基本的写法,并没有像其他大神一样爬取和线程结合在一起,用了一个最笨的方法将两者连在了一起:
import threading
import time
from crawl_data_threading4 import CrawlQidian # 导入编写的爬虫文件
# 定义线程
class myThread(threading.Thread):
def __init__(self, url_type, name, delay):
threading.Thread.__init__(self)
self.url_type = url_type
self.name = name
self.delay = delay
def run(self):
print("线程开始:" + self.name)
qidian = CrawlQidian() # 定义实例
qidian.main(self.url_type, self.delay) # 运行主函数
print("线程结束:" + self.name)
if __name__ == "__main__":
start = time.time() # 用于程序运行计时
# 定义线程,每个榜单一个线程,delay时间依次增大
thread1 = myThread("hotsales?", "24小时热销榜", 1)
thread2 = myThread("recom?chn=81&dateType=1&", "周推荐票榜", 2)
thread3 = myThread("recom?chn=81&dateType=2&", "月推荐票榜", 3)
thread4 = myThread("recom?chn=81&dateType=3&", "总推荐票榜", 4)
thread5 = myThread("collect?chn=81&", "收藏榜", 5)
# 开始运行
thread1.start()
thread2.start()
thread3.start()
thread4.start()
thread5.start()
# 确保所有线程结束后才运行后续程序
thread1.join()
thread2.join()
thread3.join()
thread4.join()
thread5.join()
print("退出主线程") # 运行结束
end = time.time()
time_t = time.strftime("%M:%S", time.localtime(end - start)) # 总用时
print("总用时:{}分{}秒".format(*time_t.split(":")))
具体可以参照一下菜鸟教程中的内容,链接在此:
https://www.runoob.com/python3/python3-multithreading.html
这是我初次写多线程方面的程序,也是学习菜鸟教程里的方法,虽然看着密密麻麻,但总体运行得很顺畅,之后也会慢慢多学习。程序写的有缺陷的地方,也请各位多多指教!大家共同学习!
关于这次爬取的数据会在年前发布,敬请期待一下!????
这是我的微信公众号,欢迎关注,一起学习????(●’◡’●)