网络爬虫
程序员文章站
2022-07-05 13:00:47
...
1.urllib模块
由于服务器有反爬虫功能,所以我们在爬去数据时要设置User-Agent的值,让服务器以为我获取数据的请求是用户对浏览器的正常请求,而User-Agent的值可以在浏览器上获取,所以不同的浏览器值都会不一样
这里我写的是最新的67.0.3396.99版谷歌浏览器的用户代理值:
header = {
'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.99 Safari/537.36'
}
1.1爬取12306首页
import urllib.request
import ssl
url = 'https://www.12306.cn/mormhweb/'#完整的链接
由于12306的安全认证比较特殊,要爬取该网站需要先取消它的认证功能,取消之后http跟https都能访问,并进行爬取
content = ssl._create_unverified_context()
res = urllib.request.urlopen(url, context=content) #读取url页面源码
print(res.read().decode('utf-8'))#解码
1.2爬取百度首页
import urllib.request
from urllib import parse
def main(url):
header = {
'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko)Chrome/65.0.3325.181 Safari/537.36'
}
req = urllib.request.Request(url, headers=header)
res = urllib.request.urlopen(req)
return res.read().decode('utf-8')
if __name__ == '__main__':
msg = input('请输入搜索信息:')
search = parse.urlencode({'wd': msg})#构造请求参数,如果是文字将会百分号编码
print(search)
#url = 'https://www.baidu.com/s?%s' % search
url = 'https://www.baidu.com/s? + search
result = main(url)
print(result)
1.3爬取智联招聘上的职位信息
import urllib.request
from urllib import parse
import re
def get_zhilian_html(url):
#得到url的网页源码
header = {
'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko)Chrome/65.0.3325.181 Safari/537.36'
}
req = urllib.request.Request(url, headers=header)
res = urllib.request.urlopen(req)
return res.read().decode('utf-8')
def get_job_num(html):
#爬取总的职位数
result = re.findall('<em>(\d+)</em>', html)
if result:
return result[0]
else:
return 0
def fankui(html):
rate = re.findall(r'<td style="width: 60px;" class="fk_lv"><span>([\S]*)</span></td>', html)
return rate
def information(html):
infos = re.findall(r'<table cellpadding="0" cellspacing="0" width="853" '
r'class="newlist">([\s\S]+?)</table>', html)
return infos
def job_name(infos):
#职位名称
list1 = []
for info in infos:
names = re.findall(r'target="_blank">([\s\S]+?)(?:</a>|&)', info)
name = re.sub(r'(<b>|</b>)', "", names[0])
#(?:exp) 匹配exp,不捕获匹配的文本,也不给此分组分配组号
dict1 = {
'职位名称': name
}
list1.append(dict1)
return list1
if __name__ == '__main__':
# city = input('请输入搜索城市:')
# job = input('请输入搜索岗位:')
city = '成都'
job = 'python'
for i in range(1, 26):
#一共25页
search = parse.urlencode({'jl': city, 'kw': job, 'p': i})#得到字典并编码
url = 'https://sou.zhaopin.com/jobs/searchresult.ashx?%s' % search
html = get_zhilian_html(url)
result = get_job_num(html)
rate = fankui(html) #反馈率
info1 = information(html)
jobs = job_name(info1)
for j in jobs:
for k, v in j.items():
print('%s: %s' % (k, v))
1.4搜狐网上爬取新闻存入数据库
import re
import urllib.request
import pymysql
def decode_html(html, charsets=('utf-8', 'gbk')):
#解码页面内容
page_html = ''
for charset in charsets:
try:
page_html = html.decode(charset)
break #如果能正常解析页面的话,跳出循环
except Exception as e:
print('编码出错')
return page_html
def pattern_regex(html, pattern, flags=re.S):
#从页面中提取需要的部分, 参数re.S表示把点号的作用扩展到整个字符串,也包括换行符
html_regex = re.compile(pattern, flags)
return html_regex.findall(html) if html else []
def get_html(url):
header = {
'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko)Chrome/65.0.3325.181 Safari/537.36'
}
req = urllib.request.Request(url, headers=header)
res = urllib.request.urlopen(req)
page_html = decode_html(res.read())
return page_html
def get_mysql(sql, params_list):
#将爬取到的数据存入数据库
conn = pymysql.connect(host='localhost', port=3306, user='root', password='123456', database='spider', charset='utf8')
cursor = conn.cursor()
cursor.executemany(sql, params_list) #将params_list里面的参数传入sql中
conn.commit()
conn.close()
def start_crawl(url):
page_html = get_html(url)
link_list = pattern_regex(page_html, "<a test=a href='(.*?)'")
print(link_list)
params_list = []
for link_url in link_list:
#获取每条新闻的链接地址
html = get_html(link_url)#从链接地址得到每条新闻的页面
#标题
# ?非贪婪匹配
# *? 重复任意次,但尽可能少重复 +? 重复1次或更多次,但尽可能少重复
title = pattern_regex(html, '<h1>(.*?)<span class="article-tag">')
#内容
content = pattern_regex(html, '<article class="article" id="mp-editor">(.*?)</article>')
if title and content:
params_list.append([title[0], content[0]])
sql = 'insert into result_souhu values (%s, %s)'
get_mysql(sql, params_list)
if __name__ == '__main__':
url = 'http://sports.sohu.com/nba_a.shtml'
html = start_crawl(url)
2. 使用lxml , beautifulsoup4
2.1爬取知乎上的问答内容
from bs4 import BeautifulSoup
from lxml import etree
import requests
def start_crawl(url):
header = {
'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko)Chrome/65.0.3325.181 Safari/537.36'
}
res = requests.get(url, headers=header)
#使用lxml的etree去解析
# html = etree.HTML(res.text)
# a = html.xpath('//*[@id="js-explore-tab"]/div[1]/div/div[1]/h2/a/text()')
# a_href = html.xpath('//*[@id="js-explore-tab"]/div[1]/div/div[1]/h2/a/@href')
#这一步得到a标签下的href属性值
# print(a, a_href)
#bs4解析
soup = BeautifulSoup(res.text, 'lxml')
result = soup.find_all('a', 'question_link')
for i in result:
print(type(i))
href_result = i.attrs.get('href')
print(href_result)
if __name__ == '__main__':
url = 'https://www.zhihu.com/explore'
start_crawl(url)
2.2爬取豆瓣上的电影,评分(单线程)
import json
import urllib.request
from urllib import parse
def common(url):
"""
公共的处理代码
"""
header = {
'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/65.0.3325.181 Safari/537.36'
}
req = urllib.request.Request(url=url, headers=header)
res = urllib.request.urlopen(req)
return res.read().decode('utf-8')
def get_movie_tag(url):
"""
获取电影的分类tag
"""
tag_res = common(url)
# 返回的tag_res的结果为'{"tags":["热门","最新","经典","可播放","豆瓣高分","冷门佳片","华语","欧美","韩国","日本","动作","喜剧","爱情","科幻","悬疑","恐怖","成长"]}'
# 其结果为一个字符串类型的数据,需要将之转化为字典类型的
result = json.loads(tag_res)
content = result['tags']
return content
def get_movies(tag_url, movies_url):
tag_content = get_movie_tag(tag_url)
# 循环tag的内容,拼接出指定tag的电影内容
# movies_url中指定电影类型的参数是tag=热门或者最新等等,所以需要进行tag的内容的编码
tag_list = []
print(tag_content)
for tag in tag_content:
data = {'tag': tag}
search_tag = parse.urlencode(data)#百分号编码
tag_list.append(search_tag)
for search_tag in tag_list:
seatch_url = movies_url
seatch_url = seatch_url % (search_tag)
movies_res = common(seatch_url)
res = json.loads(movies_res)
result = res['subjects']
for res in result:
print('标题:<<%s>>,评分:(%s)' % (res['title'], res['rate']))
if __name__ == '__main__':
tag_url = 'https://movie.douban.com/j/search_tags?type=movie&source='
movies_url = 'https://movie.douban.com/j/search_subjects?type=movie&%s&sort=recommend&page_limit=20&page_start=0'
get_movies(tag_url, movies_url)
限制多线程效率的问题:GIL(全局解释性锁 GLOBAL INTERPRETER LOCK), 其实:Python语言和GIL没有半毛钱关系。仅仅是由于历史原因在Cpython虚拟机(解释器),难以移除GIL。
GIL:全局解释器锁。每个线程在执行的过程都需要先获取GIL,保证同一时刻只有一个线程可以执行代码。
cpu密集型—-指的是做算术运算的情况,如果要提高运算效率的话,用多进程。因为做大量的数据运算的时候,多线程花费的时间比单线程还多,所以不适合用多线程。
io密集型—-指的是读写文件操作,这种比较适合用多线程
2.3爬取豆瓣(多线程)
import json
import threading
import urllib.request
from urllib import parse
import requests
def get_html(url):
"""
拿到页面源码
"""
header = {
'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/65.0.3325.181 Safari/537.36'
}
req = requests.get(url=url, headers=header)
return req.json() #得到tag字典
task_lock = threading.Lock()#创建线程锁
class MovieThreads(threading.Thread):
def __init__(self):
super().__init__()
def update_task_list(self):
if task_lock.acquire():#线程锁锁定一个指定的url
link = movies_list.pop(0) if movies_list else ''#线程处理一个url就把它删除掉,保证取到的资源不会重复
task_lock.release()
return link
def run(self):
link = self.update_task_list()
if link:
result = get_html(link)
for res in result['subjects']:
title = res['title']
rate = res['rate']
print('电影名: <<%s>>, 评分: (%s)' % (title, rate))
if __name__ == '__main__':
tag_url = 'https://movie.douban.com/j/search_tags?type=movie&source='
movies_url = 'https://movie.douban.com/j/search_subjects?type=movie&%s&sort=recommend&page_limit=20&page_start=0'
tag_api = get_html(tag_url)
global movies_list
movies_list = []
for tag in tag_api['tags']:
data = {'tag': tag}
search = parse.urlencode(data)#百分号编码
m_url = movies_url % search
movies_list.append(m_url)
while True:
if movies_list:
s1 = MovieThreads()
s2 = MovieThreads()
s1.start()
s2.start()
else:
break
3多进程
import os
import random
import time
from multiprocessing import Process
def download(filename):
while True:
print('开始下载%s' % filename)
time.sleep(3)
def studying():
while True:
#os.getpid()获取进程号
print('起床看书, PID: %s' % os.getpid())
time.sleep(random.randint(1,5))
print('玩会儿手机, PID: %s' %os.getpid())
def chiji():
while True:
print('开始吃鸡, PID: %s' % os.getpid())
time.sleep(random.randint(1,3))
print('开始王者荣耀, PID: %s' % os.getpid())
if __name__ == '__main__':
p1 = Process(target=studying)
p2 = Process(target=chiji)
p3 = Process(target=download, args=('功夫',))#函数的参数必须传元组
p1.start()
p1.join()#阻塞p1进程,后面的进程将不执行
p2.start()
p3.start()
4多线程
import threading
import time
class Study(threading.Thread):
def __init__(self, name):
super(Study, self).__init__()
self.s_name = name
def run(self):
print('当前线程 %s' % self.name)#这里的name属性是从父类继承而来有默认的值
time.sleep(3)
print('学习%s' % self.s_name)
if __name__ == '__main__':
s1 = Study('语文')
s2 = Study('数学')
# 设置守护线程 当定义子线程为守护线程的话,当主线程结束了,不管子线程是否执行完,都会被直接给暂停掉。
s1.daemon = True
s2.daemon = True
s1.start()
s1.join()
s2.start()
s2.join()
#join()的作用是s1和s2两个线程执行完了之后才执行后面的主线程,还可以通过这个来计算线程执行时间
print('结束了') #主线程
5.线程锁:当多个线程分配资源的时候,会产生资源竞争的情况,导致数据重复或出错,因此给每个资源加上线程锁就可以解决该问题
import threading
#线程锁的作用是避免线程之间对资源产生竞争从而导致数据出错
my_lock = threading.Lock() #创建线程锁
class MyThread(threading.Thread):
def __init__(self):
super().__init__()
def run(self):
if my_lock.acquire(): #锁定线程
global n
print('%d, %s' % (n, self.name))#self.name显示的是从父类继承过来的默认名称
n += 1
my_lock.release() #释放锁
if __name__ == '__main__':
n = 1
threads_list = []
#创建20个线程
for _ in range(100):
t1 = MyThread()
threads_list.append(t1)
#启动线程
for i in threads_list:
i.start()
#结束线程
for a in threads_list:
a.join()
6.协程
协程主要用于高io密集型(在cup密集型中用协程作用不大)
6.1Rabbit MQ(生产消费模型)
def customer():
#消费者
r = ''
while True:
m = yield r #这里的值是由send发送过来的
print('第%s次吃鸡翅' % m)
def product(customer):
#生产者
customer.__next__()
for n in range(1, 5):
print('做第%d次鸡翅' % n)
#引入消费者来消费
customer.send(n)
print('第%d次卖完了,继续生产' % n)
customer.close()
if __name__ == '__main__':
customer = customer()
product(customer)
6.2用协程爬取豆瓣电影
import json
from urllib import parse
import aiohttp
import asyncio
from pymongo import MongoClient
class DouBanMovie(object):
def __init__(self):
self.tag_url = 'https://movie.douban.com/j/search_tags?type=movie&source='
self.tags = []
self.page = 10 #取10页的数据
self.movies_url = 'https://movie.douban.com/j/search_subjects?type=movie&%s&sort=recommend&page_limit=20&page_start=%s'
#连接mongodb数据库
conn = MongoClient('mongodb://127.0.0.1:27017')
db = conn['douban']
self.collection = db['spider']
async def get_html_info(self):
async with aiohttp.ClientSession() as session:
async with session.get(self.tag_url, verify_ssl=False) as response:#通过传入的url得到资源接口
#verify_ssl参数是做安全验证的
tags = json.loads(await response.text()) # 将得到的数据转换成字典
self.tags = tags['tags']#得到列表
for tag in self.tags:
data = {'tag': tag}
search = parse.urlencode(data)#百分号编码
for start_page in range(self.page):
async with session.get(self.movies_url % (search, start_page*20), verify_ssl=False) as response:
doc = json.loads(await response.text())
for movie_info in doc['subjects']:
await self.insert_into_db(movie_info)
async def insert_into_db(self, doc):
return self.collection.insert_one(doc)#向数据库中的collection插入数据
def run(self):
loop = asyncio.get_event_loop() #事件循环
task = asyncio.wait([self.get_html_info()])
loop.run_until_complete(task)
if __name__ == '__main__':
dbm = DouBanMovie()
dbm.run()
6.3用协程爬取今日头条漫画
import re
import aiohttp
import asyncio
import requests
class Manhua(object):
def __init__(self):
self.image_url = 'https://www.toutiao.com/a6572350040151425539/'
async def get_image(self):
async with aiohttp.ClientSession() as session:
async with session.get(self.image_url, verify_ssl=False) as response:
image_html = await response.text()
#由于页面中用到了实体字符&符合,在正则匹配的时候&被作为加法运算了,所以必须把&符号替换掉
image_html = image_html.replace('&', '-')
images_list = await self.pattern_regex(image_html, r'class.*?img src.*?quot;(.*?)-quot; img_width.*?quot;')#匹配得到图片的链接
i = 1
for image in images_list:
res = requests.get(image)
filename = 'filename-'+str(i)+'.jpg'
try:
with open(filename, 'wb') as f:
f.write(res.content)
except FileNotFoundError as e:
print('保存图片失败')
i += 1
async def pattern_regex(self, html, pattern, flags=re.S):
# 从页面中提取需要的部分, 参数re.S表示把点号的作用扩展到整个字符串,也包括换行符
html_regex = re.compile(pattern, flags)
return html_regex.findall(html) if html else []
def run(self):
loop = asyncio.get_event_loop() # 事件循环
task = asyncio.wait([self.get_image()])
loop.run_until_complete(task)
if __name__ == '__main__':
dbm = Manhua()
dbm.run()
6.4协程爬取妹子图
import ssl
import aiohttp
import asyncio
import aiofiles
import requests
import urllib.request
from bs4 import BeautifulSoup
n = 1
class Meinv(object):
def __init__(self):
self.url = 'http://www.mzitu.com/xinggan/page/%s/'
def picture_url(self):
header = {
'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.99 Safari/537.36',
'Referer': 'http://www.mzitu.com/xinggan/'
}
return header
def get_url(self, page1=10):
list1 = []
for i in range(1, page1+1):
list1.append(self.url % i)
return list1
def images_url(self, html):
soup = BeautifulSoup(html, 'lxml')
result = soup.find(id='pins').find_all('li')
list2 = []
for lis in result:
urs = lis.find('a').find('img')
list2.append(urs.get('data-original'))
return list2
async def get_image(self):
async with aiohttp.ClientSession() as session:
for url in self.get_url():
if url:
async with session.get(url, verify_ssl=False, headers=self.picture_url()) as response:
get_html = await response.text()
srcs = self.images_url(get_html)
for src in srcs:
async with session.get(src, verify_ssl=False, headers=self.picture_url()) as response:
res = await response.read()
print(response)
global n
file = 'file' + str(n) + '.jpg'
n += 1
filename = 'images' + '/' + file
try:
with open(filename, 'wb') as f:
f.write(res)
except:
print('保存图片出错')
# urllib.request.urlretrieve(src, filename)该方式的底层实现是将图片下载下来然后存入文件中
# f = await aiofiles.open('images'+'/'+file, 'wb') 该方式也可将图片存入文件中
# await f.write(image)
def run(self):
loop = asyncio.get_event_loop() # 事件循环
task = asyncio.wait([self.get_image()])
loop.run_until_complete(task)
if __name__ == '__main__':
dbm = Meinv()
dbm.run()
7.selenium自动化测试
7.1模拟提交搜索信息
import time
from selenium import webdriver
chromedriver = 'C:\Program Files (x86)\Google\Chrome\Application\chromedriver.exe'
browser = webdriver.Chrome(chromedriver)
#获取淘宝首页
browser.get('https://www.taobao.com')
#输入搜索ipad
browser.find_element_by_id('q').send_keys('ipad')
time.sleep(3)
#模拟点击搜索按钮
browser.find_element_by_class_name('btn-search').click()
#关闭浏览器
#browser.close()
7.2模拟登录知乎
from selenium import webdriver
chromedriver = 'C:\Program Files (x86)\Google\Chrome\Application\chromedriver.exe'
browser = webdriver.Chrome(chromedriver)
#进入知乎首页
browser.get('https://www.zhihu.com')
#点击登录按钮,进入登录界面
browser.find_element_by_xpath('//*[@id="root"]/div/main/div/div[2]/div/div/div[1]/div[1]/div/div[1]/div[2]/button[1]').click()
#输入用户名
browser.find_element_by_xpath('/html/body/div[4]/div/span/div/div[2]/div/div/div/div[2]/div[1]/form/div[1]/div[2]/div[1]/input').send_keys('15884550995')
#输入密码
browser.find_element_by_xpath('/html/body/div[4]/div/span/div/div[2]/div/div/div/div[2]/div[1]/form/div[2]/div/div[1]/input').send_keys('xiongbiao199329')
#点击登录按钮,模拟登录成功
browser.find_element_by_class_name('SignFlow-submitButton').click()
# browser.close()
7.2.1图片验证码
import base64
import urllib.request
from urllib import parse
def base64_img():
with open(r'base.png', 'rb') as f:
base64_img_data = base64.b64encode(f.read())
return base64_img_data.decode('utf-8')
def get_code(base64_img):
host = 'http://txyzmsb.market.alicloudapi.com'
path = '/yzm'
method = 'POST'
appcode = '4a929344cfbf447fbf31e2ec37534974'
querys = ''
bodys = {}
url = host + path
bodys['v_pic'] = base64_img
bodys['v_type'] = '''ne4'''
post_data = parse.urlencode(bodys).encode('utf-8')
request = urllib.request.Request(url, post_data)
request.add_header('User-Agent','Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/68.0.3423.2 Safari/537.36')
request.add_header('Authorization', 'APPCODE ' + appcode)
# 根据API的要求,定义相对应的Content-Type
request.add_header('Content-Type', 'application/x-www-form-urlencoded; charset=UTF-8')
response = urllib.request.urlopen(request)
content = response.read()
return content if content else ''
if __name__ == '__main__':
result = base64_img()
content = get_code(result)
print(content)
7.3切换窗口操作
import time
from selenium import webdriver
chromedriver = 'C:\Program Files (x86)\Google\Chrome\Application\chromedriver.exe'
browser = webdriver.Chrome(chromedriver)
#打开浏览器,访问url
browser.get('https://www.taobao.com')
#获取首页的窗口处理手柄
#taobao_handle = browser.current_window_handle
time.sleep(5)
#隐式等待操作,等待5秒
#browser.implicitly_wait(5)
#xpath获取元素,进行点击事件
#browser.find_element_by_xpath('/html/body/div[4]/div[1]/div[1]/div[1]/div/ul/li[1]/a[1]').click()
#休眠3秒
#time.sleep(3)
#获取女装窗口处理手柄
#nvzhuang_handle = browser.current_window_handle
#切换到首页的窗口
#browser.switch_to_window(taobao_handle)
#搜索max pro并且点击搜索按钮
browser.find_element_by_id('q').send_keys('mac pro')
time.sleep(3)
browser.find_element_by_class_name('btn-search').click()
#关闭窗口
#browser.close()
#关闭浏览器
time.sleep(5)
#browser.quit()
#回退
browser.back()
time.sleep(3)
#前进
browser.forward()
time.sleep(3)
#滚动到底部
browser.execute_script('window.scrollTo(0, document.body.scrollHeight)')
#滚动到顶部
# browser.execute_script('document.documentElement.scrollTop=0')
7.4获取淘宝导航信息
from selenium import webdriver
chromedriver = 'C:\Program Files (x86)\Google\Chrome\Application\chromedriver.exe'
browser = webdriver.Chrome(chromedriver)
#打开淘宝
browser.get('https://www.taobao.com')
#处理爬取数据的业务
list1 = browser.find_elements_by_css_selector('.service-bd li a')
for li in list1:
print(li.text) #获取标签里的内容
print(li.get_attribute('href')) #获取href属性的值
#关闭浏览器
browser.close()
7.5模拟豆瓣爬电影
import time
from selenium import webdriver
def more_movies(browser):
flags = True
while flags:
try:
#滚动条置底
browser.execute_script('window.scrollTo(0, document.body.scrollHeight)')
time.sleep(3)
#模拟点击加载更多
browser.find_element_by_xpath('//*[@id="content"]/div/div[1]/div/div[4]/a').click()
except Exception as e:
print('没有加载项了')
flags = False
if __name__ == '__main__':
chromedriver = 'C:\Program Files (x86)\Google\Chrome\Application\chromedriver.exe'
browser = webdriver.Chrome(chromedriver)
browser.get('https://movie.douban.com/')
browser.find_element_by_xpath('//*[@id="db-nav-movie"]/div[2]/div/ul/li[2]/a').click()
tags = browser.find_elements_by_css_selector('.tag-list label')
print(tags)
for tag in tags:
#print(tag.text)
tag.click()
time.sleep(2)
more_movies(browser)
8.scrapy框架
8.1爬取免费代理ip
from time import sleep
from bs4 import BeautifulSoup
import urllib.request
def get_html(url):
header = {
'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.99 Safari/537.36'
}
req = urllib.request.Request(url, headers=header)
res = urllib.request.urlopen(req).read()
soup = BeautifulSoup(res, 'lxml')
bars = soup.find_all('tr')
_, _, *a = bars #把列表第一个和第二个元素删除,剩下的列表值赋给a
if a:
for b in a:
ip = b.findAll('td')[0].get_text() #get_text()用于取标签的值
port = b.findAll('td')[1].get_text()
ip_temp = '%s:%s' % (ip, port)
proxy = {'http': 'http://%s' % ip_temp}
# 代理设置
proxy_handler = urllib.request.ProxyHandler(proxy)
opener = urllib.request.build_opener(proxy_handler)
urllib.request.install_opener(opener)
# 请求网址
validateUrl = 'https://www.baidu.com'
req = urllib.request.Request(url=validateUrl, headers=header)
# 延时,等待反馈结果
sleep(2)
# 判断结果
try:
res = urllib.request.urlopen(req)
# 延时,等待反馈结果
sleep(2)
content = res.read()
# 写入文件
if content:
print('ok')
with open('ip.txt', 'a') as wd:
wd.write("'" + ip_temp + "'" + '\n')
else:
# 未通过
print('error')
except urllib.request.URLError as e:
print(e)
if __name__ == '__main__':
for page in range(1, 5):
ip_url = 'http://www.66ip.cn/%s.html' % page
get_html(ip_url)
8.2 用scrapy爬取链家网页面信息
***lianjia.py文件***
import json
from scrapy import Request
from scrapy.selector import Selector
from scrapy.spiders import Spider
from lianjiaspider.items import LianjiaspiderItem
class LianJiaSpider(Spider):
name = 'lianjia'
domains_url = 'https://cd.lianjia.com'
start_lianjia_url = 'https://cd.lianjia.com/ershoufang'
def start_requests(self):
#请求资源,该函数是从父类继承而来,是固定写法
yield Request(self.start_lianjia_url)
def parse(self, response):
sel = Selector(response)#得到selector对象进行筛选
areas = sel.xpath('//div[@data-role="ershoufang"]/div/a')
for area in areas:
area_href = area.xpath('./@href').extract()[0] #得到区域的链接
area_name = area.xpath('./text()').extract()[0] #得到区域的名称
yield Request(self.domains_url+area_href, callback=self.parse_house_info,
meta={'name': area_name, 'href': area_href})
#meta参数的作用是进行函数回调时将参数值传给函数
def parse_house_info(self, response):
#对所有页面进行爬取
sel = Selector(response)
page_box = sel.xpath('//div[@class="page-box house-lst-page-box"]/@page-data').extract()[0]
total_page = json.loads(page_box).get('totalPage') #得到指定区域的总页数
for i in range(1, int(total_page)+1):
yield Request(self.domains_url+response.meta.get('href')+'pg'+str(i), callback=self.parse_house, meta={'name': response.meta.get('name')})
def parse_house(self, response):
sel = Selector(response)
lis = sel.xpath('//html/body/div[4]/div[1]/ul/li[@class="clear"]')
for li in lis:
#所有信息都在li标签里,所有重点解析该标签
item = LianjiaspiderItem()
item['house_code'] = li.xpath('./a/@data-housecode').extract()[0]#房屋编号,具有唯一性
if li.xpath('./a/img/@src').extract():
item['img_src'] = li.xpath('./a/img/@src').extract()[0] #爬取图片链接
if li.xpath('./div/div/a/text()').extract():
item['title'] = li.xpath('./div/div/a/text()').extract()[0] #爬取图片标题
item['address'] = li.xpath('./div/div[2]/div/a/text()').extract()[0] #房屋地址
item['info'] = li.xpath('./div/div[2]/div/text()').extract()[0]
item['info'] = self.split_house_info(item['info'])
item['flood'] = li.xpath('./div/div[3]/div/text()').extract()[0]+li.xpath('./div/div[3]/div/a/text()').extract()[0]
item['tag'] = li.xpath('./div[1]/div[5]/span/text()').extract()
item['price'] = li.xpath('./div[1]/div[6]/div[2]/span/text()').extract()[0]#房屋单价
item['type'] = 'ershoufang' #房屋类型是二手房
item['city'] = '成都'
item['area'] = response.meta.get('name') #区域名
yield item
def split_house_info(self, info):
return [i.strip() for i in info.split('|')[1:]] #对info值进行切片操作
***main.py文件***
from scrapy import cmdline
cmdline.execute(['scrapy', 'crawl', 'lianjia'])
***items.py文件***
import scrapy
#建立模型
class LianjiaspiderItem(scrapy.Item):
collections = 'ershoufang'
house_code = scrapy.Field()
img_src = scrapy.Field() #图片
title = scrapy.Field() #标题
address = scrapy.Field() #地址
info = scrapy.Field() # 房子大小,方位等
flood = scrapy.Field() #楼层,建筑年份
tag = scrapy.Field() #交通等
price = scrapy.Field() #房子单价
type = scrapy.Field() #房子类型
city = scrapy.Field() #城市
area = scrapy.Field() #区域
***pipelines.py文件***
import pymongo
from scrapy.conf import settings
from lianjiaspider.items import LianjiaspiderItem
class LianjiaspiderPipeline(object):
def process_item(self, item, spider):
return item
class Py*njiaPipeline(object):
def __init__(self):
conn = pymongo.MongoClient(host=settings['MONGODB_HOST'],
port=settings['MONGODB_PORT'])
db = conn[settings['MONGODB_DB']]
self.collection = db[LianjiaspiderItem.collections]
def process_item(self, item, spider):
if isinstance(item, LianjiaspiderItem):
self.collection.update({'house_code': item['house_code']}, {'$set': item}, True)
return item
***settings.py文件***
USER_AGENT = 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.99 Safari/537.36'
ROBOTSTXT_OBEY = False #爬虫协议
DOWNLOAD_DELAY = 1
ITEM_PIPELINES = {
'lianjiaspider.pipelines.LianjiaspiderPipeline': 300,
'lianjiaspider.pipelines.Py*njiaPipeline': 301
}
MONGODB_HOST = 'localhost'
MONGODB_PORT = 27017
MONGODB_DB = 'lianjia'
8.3 用scrapy爬取微博API信息
***weibo.py文件***
from scrapy.spiders import Spider
import json
from scrapy import Request
from weibospider.items import WeiBoUserItem, UserRelationItem
class WeiboSpider(Spider):
name = 'weibo'
#用户url
user_url = 'https://m.weibo.cn/api/container/getIndex?uid={uid}&containerid=100505{uid}'
#关注
fllowers_url = 'https://m.weibo.cn/api/container/getIndex?containerid=231051_-_followers_-_{uid}&page={page}'
#粉丝
fans_url = 'https://m.weibo.cn/api/container/getIndex?containerid=231051_-_fans_-_{uid}&since_id={page}'
start_user_uids = ['1669879400']
def start_requests(self):
for uid in self.start_user_uids:
yield Request(self.user_url.format(uid=uid), callback=self.parse_user)
def parse_user(self, response):
res = json.loads(response.text)
#判断api接口返回成功与否
if res['ok']:
user_item = WeiBoUserItem()
#获取用户信息
user_info = res.get('data').get('userInfo')
user_params = {
'id': 'id', 'screen_name': 'screen_name', 'profile_image_url': 'profile_image_url', 'profile_url': 'profile_url', 'verified_reason': 'verified_reason', 'close_blue_v': 'close_blue_v', 'description': 'description', 'gender': 'gender', 'follow_me': 'follow_me', 'following': 'following', 'followers_count': 'followers_count', 'follow_count': 'follow_count', 'cover_image_phone': 'cover_image_phone', 'avatar_hd': 'avatar_hd'
}
for k, v in user_params.items():
user_item[k] = user_info.get(v)
yield user_item #返回user_item值之后继续往下运行
#关注
yield Request(self.fllowers_url.format(uid=user_item.get('id'), page=1), callback=self.parse_follower, meta={'uid': user_item.get('id'), 'page': 1})
# 粉丝
yield Request(self.fans_url.format(uid=user_item.get('id'), page=1), callback=self.parse_fan,meta={'uid': user_item.get('id'), 'page': 1})
def parse_follower(self, response):
#解析用户关注信息
res = json.loads(response.text)
if res['ok']:
card_group = res['data']['cards'][-1]['card_group']
for card_info in card_group:
user_info = card_info['user']
uid = user_info['id']
# yield Request(self.user_url.format(uid=uid), callback=self.parse_user)#这句的作用是把关注的人作为用户呈网状的继续往下查找关注信息,最终能爬取整个微博参与关注的人
#解析用户的关注人的信息之间的关系
follower_list = []
for follower in card_group:
follower_list.append({'id': follower['user']['id'], 'name': follower['user']['screen_name']})
uid = response.meta.get('uid')
user_relation = UserRelationItem()
user_relation['id'] = uid
user_relation['fans'] = []
user_relation['follower'] = follower_list
yield user_relation
#获取下一页关注信息,依次迭代,获取全部页码的信息
uid = response.meta.get('uid')
page = int(response.meta.get('page'))+1
yield Request(self.fllowers_url.format(uid=uid, page=page), callback=self.parse_follower, meta={'uid': uid, 'page': page})
def parse_fan(self, response):
#解析用户的粉丝信息
res = json.loads(response.text)
if res['ok']:
card_group = res['data']['cards'][-1]['card_group']
fan_list = []
for card_info in card_group:
fan_id = card_info['user']['id']
fan_list.append({'id': fan_id, 'name': card_info['user']['screen_name']})
uid = response.meta.get('uid')
user_relation = UserRelationItem()
user_relation['id'] = uid
user_relation['fans'] = fan_list
user_relation['follower'] = []
yield user_relation
for _ in range(10):
uid = response.meta.get('uid')
page = int(response.meta.get('page')) + 1
yield Request(self.fans_url.format(uid=uid, page=page), callback=self.parse_fan, meta={'uid': uid, 'page': page})
***main.py文件***
from scrapy import cmdline
cmdline.execute(['scrapy', 'crawl', 'weibo'])
***items.py文件***
import scrapy
#创建用户模型
class WeiBoUserItem(scrapy.Item):
collections = 'users'
id = scrapy.Field()
screen_name = scrapy.Field()
profile_image_url = scrapy.Field()
profile_url = scrapy.Field()
verified_reason = scrapy.Field()
close_blue_v = scrapy.Field()
description = scrapy.Field()
gender = scrapy.Field()
follow_me = scrapy.Field()
following = scrapy.Field()
followers_count = scrapy.Field()
follow_count = scrapy.Field()
cover_image_phone = scrapy.Field()
avatar_hd = scrapy.Field()
create_time = scrapy.Field() #创建时间
#创建用户的关注,粉丝模型
class UserRelationItem(scrapy.Item):
collections = 'user'
fans = scrapy.Field()
follower = scrapy.Field()
id = scrapy.Field() #关注的人的id号以及粉丝的id号
***pipelines文件***
from datetime import datetime
import pymongo
from scrapy.conf import settings
from weibospider.items import WeiBoUserItem, UserRelationItem
class UserCreateTimePipeline(object):
def process_item(self, item, spider):
if isinstance(item, WeiBoUserItem):
item['create_time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
return item
class WeibospiderPipeline(object):
def process_item(self, item, spider):
return item
class WeiboPymongoPipeline(object):
#用于保存item数据
def __init__(self):
self.MONGODB_HOST = settings['MONGODB_HOST']
self.MONGODB_PORT = settings['MONGODB_PORT']
self.MONGODB_DB = settings['MONGODB_DB']
conn = pymongo.MongoClient(host=self.MONGODB_HOST, port=self.MONGODB_PORT)
db = conn[self.MONGODB_DB]
self.collections = db[WeiBoUserItem.collections]
def process_item(self, item, spider):
if isinstance(item, WeiBoUserItem):
#self.collections.insert(dict(item))#这种方式,数据库有重复值
self.collections.update({'id': item['id']}, {'$set': item}, True)
if isinstance(item, UserRelationItem):
#向数据库插入用户关注数据
self.collections.update(
{'id': item['id']},
{'$addToSet': {
'fans': {'$each': item['fans']},
'follower': {'$each': item['follower']}
}}
)
return item
***middlewares.py文件***
from scrapy.conf import settings
from scrapy.contrib.downloadermiddleware.useragent import UserAgentMiddleware
import random
class RandomUserAgent(UserAgentMiddleware):
def process_request(self, request, spider):
user_agent = random.choice(settings['USER_AGENT_LIST'])
request.headers.setdefault(b'User-Agent', user_agent)
class RandomProxy(object):
def process_request(self, request, spider):
random_proxy = random.choice(settings['PROXY'])
request.meta['proxy'] = 'http://%s' % random_proxy
***settings.py文件***
USER_AGENT_LIST = [
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.1 (KHTML, like Gecko) Chrome/22.0.1207.1 Safari/537.1",
"Mozilla/5.0 (X11; CrOS i686 2268.111.0) AppleWebKit/536.11 (KHTML, like Gecko) Chrome/20.0.1132.57 Safari/536.11",
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/536.6 (KHTML, like Gecko) Chrome/20.0.1092.0 Safari/536.6",
"Mozilla/5.0 (Windows NT 6.2) AppleWebKit/536.6 (KHTML, like Gecko) Chrome/20.0.1090.0 Safari/536.6",
"Mozilla/5.0 (Windows NT 6.2; WOW64) AppleWebKit/537.1 (KHTML, like Gecko) Chrome/19.77.34.5 Safari/537.1",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/536.5 (KHTML, like Gecko) Chrome/19.0.1084.9 Safari/536.5",
"Mozilla/5.0 (Windows NT 6.0) AppleWebKit/536.5 (KHTML, like Gecko) Chrome/19.0.1084.36 Safari/536.5",
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/536.3 (KHTML, like Gecko) Chrome/19.0.1063.0 Safari/536.3",
"Mozilla/5.0 (Windows NT 5.1) AppleWebKit/536.3 (KHTML, like Gecko) Chrome/19.0.1063.0 Safari/536.3",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_8_0) AppleWebKit/536.3 (KHTML, like Gecko) Chrome/19.0.1063.0 Safari/536.3",
"Mozilla/5.0 (Windows NT 6.2) AppleWebKit/536.3 (KHTML, like Gecko) Chrome/19.0.1062.0 Safari/536.3",
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/536.3 (KHTML, like Gecko) Chrome/19.0.1062.0 Safari/536.3",
"Mozilla/5.0 (Windows NT 6.2) AppleWebKit/536.3 (KHTML, like Gecko) Chrome/19.0.1061.1 Safari/536.3",
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/536.3 (KHTML, like Gecko) Chrome/19.0.1061.1 Safari/536.3",
"Mozilla/5.0 (Windows NT 6.1) AppleWebKit/536.3 (KHTML, like Gecko) Chrome/19.0.1061.1 Safari/536.3",
"Mozilla/5.0 (Windows NT 6.2) AppleWebKit/536.3 (KHTML, like Gecko) Chrome/19.0.1061.0 Safari/536.3",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/535.24 (KHTML, like Gecko) Chrome/19.0.1055.1 Safari/535.24",
"Mozilla/5.0 (Windows NT 6.2; WOW64) AppleWebKit/535.24 (KHTML, like Gecko) Chrome/19.0.1055.1 Safari/535.24",
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/43.0.2357.132 Safari/537.36",
"Mozilla/5.0 (Windows NT 6.1; WOW64; rv:41.0) Gecko/20100101 Firefox/41.0"
]
PROXY = [
'112.115.57.20:3128',
'39.104.53.175:8080',
'103.220.29.244:8080',
'74.210.184.16:53281',
'177.126.81.63:20183',
'93.76.53.243:53281',
'201.184.105.122:8080',
'58.17.125.215:53281',
'36.81.203.228:8080'
]
ROBOTSTXT_OBEY = False
DOWNLOAD_DELAY = 3
DOWNLOADER_MIDDLEWARES = {
# 'weibospider.middlewares.WeibospiderDownloaderMiddleware': 543,
'weibospider.middlewares.RandomUserAgent': 543,
'weibospider.middlewares.RandomProxy': 544,
}
ITEM_PIPELINES = {
'weibospider.pipelines.UserCreateTimePipeline': 300,
'weibospider.pipelines.WeibospiderPipeline': 301,
'weibospider.pipelines.WeiboPymongoPipeline': 302
}
MONGODB_HOST = '127.0.0.1'
MONGODB_PORT = 27017
MONGODB_DB = 'weibo'
8.4 用scrapy框架分布式爬取贝壳房源信息(主从分离)(url资源存储在redis , 信息存储在mongodb)
beike-master主文件
import json
from scrapy import Request
from scrapy.selector import Selector
from scrapy.spiders import Spider
from beike.items import SpiderBeiKeItem
class BeiKei(Spider):
name = 'ershou'
domains_url = 'https://cd.ke.com'
start_urls_ershoufang = 'https://cd.ke.com/ershoufang/'
def start_requests(self):
#请求资源,该函数是从父类继承而来,是固定写法
yield Request(self.start_urls_ershoufang, callback=self.parse_ershou)
def parse_ershou(self, response):
sel = Selector(response)
areas = sel.xpath('//*[@data-role="ershoufang"]/div/a')
for area in areas:
area_href = area.xpath('./@href').extract()[0] # 得到区域的链接
yield Request(self.domains_url + area_href, callback=self.parse_page,
meta={'href': area_href})
# meta参数的作用是进行函数回调时将参数值传给函数
def parse_page(self, response):
sel = Selector(response)
page = sel.xpath('//*[@class="page-box house-lst-page-box"]/@page-data').extract()[0]
total_page = json.loads(page).get('totalPage')
# 得到指定区域的总页数
for i in range(1, int(total_page) + 1): # 分页
item = SpiderBeiKeItem()
item['url'] = self.domains_url + response.meta.get('href')+'pg'+str(i)
yield item
main.py文件
from scrapy import cmdline
cmdline.execute(['scrapy', 'crawl', 'ershou'])
items.py文件
import scrapy
class SpiderBeiKeItem(scrapy.Item):
url = scrapy.Field()
pipelines.py文件
import redis
from scrapy.conf import settings
class RedisBeiKePipeline(object):
def __init__(self):
self.r = redis.Redis(host=settings['REDIS_HOST'], port=settings['REDIS_PORT'])
def process_item(self, item, spider):
self.r.lpush('beike:start_urls', item['url'])
settings.py文件
USER_AGENT = 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.99 Safari/537.36'
ROBOTSTXT_OBEY = False
DOWNLOAD_DELAY = 3
ITEM_PIPELINES = {
# 'beike.pipelines.BeikePipeline': 300,
'beike.pipelines.RedisBeiKePipeline': 300,
}
REDIS_HOST = '127.0.0.1'
REDIS_PORT = 6379
beike-slave文件
from scrapy import Request
from scrapy.selector import Selector
from scrapy_redis.spiders import RedisSpider
from beike.items import BeiKespiderItem
class LianJiaSpider(RedisSpider):
name = 'ershou'
redis_key = 'beike:start_urls'
def parse(self, response):
sel = Selector(response)
lis = sel.xpath('/html/body/div[4]/div[1]/ul/li')
for li in lis:
#所有信息都在li标签里,所以重点解析该标签
item = BeiKespiderItem()
# 房屋编号,具有唯一性
item['house_code'] = li.xpath('./div[1]/div[6]/div[2]/@data-rid').extract()[0]
# 所在城市
item['city'] = '成都'
# 区域名
item['area'] = sel.xpath('//*[@data-role="ershoufang"]/div/a[@class="selected CLICKDATA"]/text()').extract()[0]
# 爬取图片
if li.xpath('./a/img[@class="lj-lazy"]/@src').extract():
item['img_src'] = li.xpath('./a/img[@class="lj-lazy"]/@src').extract()[0]
else:
item['img_src'] = '暂时无图片'
# 爬取房子标题
if li.xpath('./div[1]/div[1]/a/text()').extract():
item['title'] = li.xpath('./div[1]/div[1]/a/text()').extract()[0]
# 房屋地址
item['address'] = li.xpath('./div[1]/div[2]/div/a/text()').extract()[0]
contents = li.xpath('./div[1]/div[2]/div/text()').extract()[0]
content_list = self.split_house_info(contents)
# 房子信息
item['info'] = content_list
#楼层
item['flood'] = li.xpath('./div[1]/div[3]/div/text()').extract()[0]+li.xpath('./div[1]/div[3]/div/a/text()').extract()[0]
#交通等
item['tag'] = li.xpath('./div[1]/div[5]/span[1]/text()').extract()[0] if li.xpath('./div[1]/div[5]/span[1]/text()') else '' + (li.xpath('./div[1]/div[5]/span[2]/text()').extract()[0] if li.xpath('./div[1]/div[5]/span[2]/text()') else '') + (li.xpath('./div[1]/div[5]/span[3]/text()').extract()[0] if li.xpath('./div[1]/div[5]/span[3]/text()') else '')
# 房屋单价
item['price'] = li.xpath('./div[1]/div[6]/div[2]/span/text()').extract()[0]
#房子总价
item['total_price'] = li.xpath('./div[1]/div[6]/div[1]/span/text()').extract()[0]+li.xpath('./div[1]/div[6]/div[1]/text()').extract()[0].replace('\n', '').strip()
# 房屋类型是二手房
item['type'] = '二手房'
yield item
def split_house_info(self, info):
return [i.strip() for i in info.split('|')[1:]] #对info值进行切片操作
main.py文件
from scrapy import cmdline
cmdline.execute(['scrapy', 'crawl', 'ershou'])
items.py文件
import scrapy
#建立模型
class BeiKespiderItem(scrapy.Item):
collections = 'ershoufang'
house_code = scrapy.Field() #房子id
city = scrapy.Field() # 城市
area = scrapy.Field() # 区域
img_src = scrapy.Field() #图片
title = scrapy.Field() #标题
address = scrapy.Field() #地址
info = scrapy.Field() #房子信息
flood = scrapy.Field() #楼层,建筑年份等
tag = scrapy.Field() #交通,看房时间等
price = scrapy.Field() #房子单价
total_price = scrapy.Field() #房子总价
type = scrapy.Field() #房子类型
pipelines.py文件
import pymongo
from scrapy.conf import settings
from beike.items import BeiKespiderItem
class BeikePipeline(object):
def process_item(self, item, spider):
return item
class PymongoBeiKePipeline(object):
def __init__(self):
conn = pymongo.MongoClient(host=settings['MONGODB_HOST'],
port=settings['MONGODB_PORT'])
db = conn[settings['MONGODB_DB']]
self.collection = db[BeiKespiderItem.collections]
def process_item(self, item, spider):
if isinstance(item, BeiKespiderItem):
self.collection.update({'house_code': item['house_code']}, {'$set': item}, True)
return item
settings.py文件
USER_AGENT = 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.99 Safari/537.36'
ROBOTSTXT_OBEY = False
DOWNLOAD_DELAY = 1
ITEM_PIPELINES = {
# 'beike.pipelines.BeikePipeline': 300,
'beike.pipelines.PymongoBeiKePipeline': 300,
}
MONGODB_HOST = 'localhost'
MONGODB_PORT = 27017
MONGODB_DB = 'beike'
REDIS_URL = 'redis://127.0.0.1:6379'
REDIS_HOST = '127.0.0.1'
REDIS_PORT = 6379
SCHEDULER = "scrapy_redis.scheduler.Scheduler" #任务调度
SCHEDULER_PERSIST = True #重启的时候重新爬取,防止被清理
REDIS_START_URLS_AS_SET = False
DUPEFILTER_CLASS = "scrapy_redis.dupefilter.RFPDupeFilter" #对所有请求进行去重
SCHEDULER_QUEUE_CLASS = "scrapy_redis.queue.SpiderQueue"