【Python第六课】爬虫实战进阶
程序员文章站
2022-07-14 11:17:50
...
1.爬虫进阶 - POST请求有道翻译
# 有道翻译
"""
Request URL: http://fanyi.youdao.com/translate_o?smartresult=dict&smartresult=rule
Request Method: POST
data = {
'i':'需要翻译的内容',
'doctype':'json'
}
"""
import requests
# 定义请求的URL
url = 'http://fanyi.youdao.com/translate?smartresult=dict&smartresult=rule'
# 请求头
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.108 Safari/537.36'
}
# 定义请求的参数
data = {
'i':input("中文:"),
'doctype':'json'
}
# 发起请求 post
res =requests.post(url,headers=headers,data=data)
# 查看请求结果
code = res.status_code
print(code)
if code ==200:
# 解析数据
# print(res.content)
# print(res.json())
resdata = res.json()
if resdata['errorCode'] == 0:
#请求成功
print(resdata['translateResult'][0][0]['tgt'])
2.爬虫进阶 - 有道翻译封装
import requests
# 封装翻译的函数
def fanyi(kw):
# 定义请求的URL
url = 'http://fanyi.youdao.com/translate?smartresult=dict&smartresult=rule'
# 请求头
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.108 Safari/537.36'
}
# 定义请求的参数
data = {
'i': kw,
'doctype': 'json'
}
# 发起请求 post
res = requests.post(url, headers=headers, data=data)
# 查看请求结果
code = res.status_code
print(code)
if code == 200:
# 解析数据
# print(res.content)
# print(res.json())
resdata = res.json()
if resdata['errorCode'] == 0:
# 请求成功
print(resdata['translateResult'][0][0]['tgt'])
else:
print('翻译失败,请联系管理员')
vars = """
*******************
**欢迎使用py翻译工具 **
**输入需要翻译的内容 **
**输入字母 q 则退出 **
*******************
"""
print(vars)
while True:
# 获取用户的输入内容
print(vars)
keyword = input('输入需要翻译的内容:')
#判断是否需要退出
if keyword == 'q':
break
# 调用函数,进行翻译
fanyi(keyword)
3.爬虫进阶 - 代理IP的使用
"""
当频繁请求一个网站时,对方会认为攻击或者盗取数据,禁用IP是反制的有效手段
如果**这个问题?
1.推荐方案,就是降低爬虫请求的频率,不要对别人的服务器造成压力。
2.使用代理IP
"""
import requests
# 定义请求的url
url ='http://httpbin.org/get'
# 定义请求头
headers = {
'User-Agent':'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.108 Safari/537.36'
}
# 定义代理IP
proxies = {
'http':'47.107.160.99:8118',
'https':'47.107.160.99:8118',
}
try:
# 发起get请求
res = requests.get(url,headers=headers,proxies=proxies)
# 检测请求状态
if res.status_code == 200:
# 获取响应内容
data = res.json()
print(data['origin'].split(',')[0])
except:
print('请求失败')
4.爬虫进阶-爬取代理ip网站数据
# 爬取代理IP数据
# https://www.kuaidaili.com/free/inha/
import requests
from lxml import etree
# 定义请求的url
url = 'https://www.kuaidaili.com/free/inha/'
# 定义请求头
headers = {
'User-Agent':'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.108 Safari/537.36'
}
res = requests.get(url,headers=headers)
# 判断请求状态
if res.status_code == 200:
# 获取响应数据
response = res.content.decode('utf-8')
# 使用XPATH解析Html的数据
res_html = etree.HTML(response)
ips = res_html.xpath('//tbody//tr//td[1]/text()')
ports = res_html.xpath('//tbody//tr//td[2]/text()')
data = list(zip(ips,ports))
print(data)
print(len(data))
5.爬虫进阶-分页数据爬取
# 。分页数据爬出
import requests
import time
import json
from lxml import etree
# 页面请求函数
def getPage(url):
print('页面请求函数')
# 定义请求头
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.108 Safari/537.36'
}
# 发起请求
res = requests.get(url, headers=headers)
# 判断请求状态
if res.status_code == 200:
# 获取响应数据
response = res.content.decode('utf-8')
return response
else:
return False
# 解析页面html的数据
def parseHTML(html):
print('解析页面html的数据')
try:
# 使用XPATH解析Html的数据
res_html = etree.HTML(html)
ips = res_html.xpath('//tbody//tr//td[1]/text()')
ports = res_html.xpath('//tbody//tr//td[2]/text()')
data = list(zip(ips, ports))
data = [{i[0]: i[1]} for i in data]
return data
except:
return False
# 测试ip是否好用
def testIp(ip):
print('测试ip')
# 定义请求的url
url = 'http://httpbin.org/get'
# 定义请求头
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.108 Safari/537.36'
}
# 定义代理ip
proxies = {
}
try:
# 发起get请求
res = requests.get(url, headers=headers, proxies=proxies, timeout=5)
# 检测请求状态
if res.status_code == 200:
# 获取响应内容
# data = res.json()
# print(data['origin'].split(',')[0])
return True
else:
return False
except:
return False
# 主程序
def main(num):
# 拼接url
url = f'https://www.kuaidaili.com/free/inha/{num}/'
# 调用请求页面的程序
html = getPage(url)
if html:
# 调用解析html的方法
alist = parseHTML(html)
data = []
for ip in alist:
# 把返回的解析的数据,去发请求测试是否好用
okip = testIp(ip)
if okip:
data.append(ip)
print('可使用IP已经写入')
# 把返回的好用的ip数据写入文件
with open('./ipdata.json','a+') as fp:
for i in alist:
fp.write(json.dumps(i))
fp.write('\n')
# 如果当前这个脚本是作为主程序使用,那么__name__的结果 就是 __main__
if __name__ == '__main__':
for i in range(1,3610):
print(f'当前正在爬取第{i}页')
main(i)
# 每爬取一个页面后,停顿2秒
time.sleep(2)
"""
分析:
url:
https://www.kuaidaili.com/free/inha/
https://www.kuaidaili.com/free/inha/2/
https://www.kuaidaili.com/free/inha/3/
try:
# 使用XPATH解析Html的数据
res_html = etree.HTML(html)
ips = res_html.xpath('//tbody//tr//td[1]/text()')
ports = res_html.xpath('//tbody//tr//td[2]/text()')
data = list(zip(ips, ports))
data = [{i[1]: i[2]} for i in data]
return data
except:
return False
"""
6.爬虫进阶 - 进程与线程的基本实现
import time,os,threading
from multiprocessing import Process
from threading import Thread
# 工作内容
def work(n):
print(f'给{n}打销售电话,进程号{os.getpid()},线程号:{threading.current_thread()}')
time.sleep(3)
print(f'销售电话结束:{n},进程号{os.getpid()},线程号:{threading.current_thread()}')
userlist = ['刘德华','张学友','梁朝伟']
# 普通方式来完成
# 启动了一个进程,进程中有一个主线程
for item in userlist:
work(item)
# 多进程,类似于 多个创建多个部门来完成这项工作!
# def queue_test1():
#
# plist = []
# for item in userlist:
# # 循环创建进程
# p = Process(target=work,args=(item,))
# # 生成进程
# p.start()
# # 把创建的进程加入列表中
# plist.append(p)
#
# # 阻塞终止进程的进行
# [i.join() for i in plist]
#
# if __name__ == '__main__':
# queue_test1()
# 多线程 类似与 给这个部门增加人手来参加工作
def queue_test():
plist = []
for item in userlist:
# 循环创建 线程
p = Thread(target=work,args=(item,))
# 生成线程
p.start()
# 把创建的线程加入列表中
plist.append(p)
# 阻塞终止线程的进行
[i.join() for i in plist]
if __name__ == '__main__':
queue_test()
7.爬虫进阶 - 线程池与进程池的实现
# 线程池 与进程池
import time
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
import threading
import os
def work(n):
print(f'给{n}打电话:进程号:{os.getpid()},线程号:{threading.current_thread()}\n')
time.sleep(3)
print(f'{n}通话结束:进程号:{os.getpid()},线程号:{threading.current_thread()}\n')
userlist = ['刘德华','吴彦祖','梁朝伟','周杰伦','林俊杰']
def queue_test():
# 1.创建线程池 推荐使用多线程或者线程池
#pool = ThreadPoolExecutor(max_workers=5)
# 1.创建 进程池
pool = ProcessPoolExecutor(max_workers=5)
# 2. 循环指派任务和参数
[pool.submit(work,user) for user in userlist ]
# 3.关闭线程池 进程池
pool.shutdown()
if __name__ == '__main__':
queue_test()
8.爬虫进阶 - 百度图片爬取
import requests
import os
# 进行数据爬取
def getPages(kw,num):
# 循环页码数 和 请求参数
params = []
for i in range(30,30*num+30,30):
params.append({
'tn':'resultjson_com' ,
'ipn':'rj' ,
'ct':'201326592' ,
'is':' ' ,
'fp':'result' ,
'queryWord':kw ,
'cl':'2' ,
'lm':'-1' ,
'ie':'utf-8' ,
'oe':'utf-8' ,
'adpicid':' ' ,
'st':' ' ,
'z':' ' ,
'ic':' ' ,
'hd':' ' ,
'latest':' ' ,
'copyright':' ' ,
'word':kw ,
's':' ' ,
'se':' ' ,
'tab':' ' ,
'width':' ' ,
'height':' ' ,
'face':' ' ,
'istype':' ' ,
'qc':' ' ,
'nc':' 1' ,
'fr':' ' ,
'expermode':' ' ,
'force':' ' ,
'cg':'girl' ,
'pn': i ,
'rn':'30' ,
'gsm':'3c' ,
'1598852707925':' ' ,
})
# 请求的url
url = 'https://image.baidu.com/search/acjson'
# 循环请求
urls = []
for i in params:
# 像每一个url发起请求
res = requests.get(url,params=i)
# print(res.status_code)
# print(res.content)
# 获取请求的数据,加入urls
urls.append(res.json()['data'])
return urls
# 下载图片到本地
def downloadImg(datalist,dir):
# 检测文件夹是否存在
if not os.path.exists(dir):
os.mkdir(dir)
#循环下载图片数据
x = 0
for data in datalist:
for i in data:
if i.get('thumbURL') != None:
print(f'下载图片{i.get("thumbURL")}')
# 向图片地址发起请求
imgres =requests.get(i.get("thumbURL"))
open(f'{dir}/{x}.jpg','wb').write(imgres.content)
x +=1
# 获取用户输入信息
keyword = input('请输入搜索图片的关键字:')
# 调用函数,进行数据的爬取,可以指定关键字和下载页数
datalist = getPages(keyword,2)
#调用函数,保存数据,指定要保存的图片路径
downloadImg(datalist,'./baidu')
9.爬虫进阶-爬取豆瓣电影
# 爬取豆瓣电影
import time
import requests
import json
from lxml import etree
def getPage(url):
'''请求页面数据'''
try:
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.2 (KHTML, like Gecko) Chrome/22.0.1216.0 Safari/537.2'
}
# 发起请求
res = requests.get(url,headers=headers)
# 判断响应状态
if res.status_code == 200:
return res.text
else:
return None
except:
return None
def parsePage(html):
'''解析数据'''
html = etree.HTML(html)
items = html.xpath('//div[@class="item"]')
# 遍历封装数据并且返回
for item in items:
res = {
'index':item.xpath('.//div/em[@class=""]/text()'),
'image':item.xpath('.//img[@width="100"]/@src'),
'title':item.xpath('.//span[@class="title"]/text()'),
'actor':item.xpath('.//p[@class=""]/text()'),
'score':item.xpath('.//span[@class="rating_num"]/text()')
}
yield(res)
def writeFile(item):
'''写入数据'''
with open('./douban1.json','a',encoding='utf-8') as fp:
fp.write(json.dumps(item,ensure_ascii=False))
fp.write('\n')
def main(offset):
'''主程序函数,负责调度爬虫程序'''
url = f'https://movie.douban.com/top250?start={offset}'
# 调用函数进行页面的爬取
html = getPage(url)
print(f'正在解析url:{url}')
if html:
# 去解析页面数据 parsePage(html)
for item in parsePage(html):
print(f'正在写入数据{item["title"]}')
# 写入数据 writeFile(item)
writeFile(item)
if __name__ == '__main__':
#main(0)
for i in range(10):
main(offset=i*25)
time.sleep(2)