使用Item Loaders对Item数据进行提取和解析(整理) 以及 多线程异步的形式对数据进行写入
程序员文章站
2022-05-11 23:17:59
...
使用Item Loaders对Item数据进行提取和解析(整理)。
作用 :
之前的方式,是将数据的提取和解析混合在一起,但是Item Loaders是将这两个部分分开处理了;
爬虫文件bole.py中只负责数据的提取;
Items.py文件负责数据的整理;(可以实现数据解析代码的重用。相当于将功能相同的解析函数封装成为一个公用的函数,任何爬
虫需要这个函数,都可以来调用。)
1. 使关于数据的提取代码更加简洁,结构更加清晰;
2. 可以实现数据解析(整理)部分的代码的重用;
3. 提高代码的可维护性;
步骤如下:
1. 当创建item对象(item=JobboleItem())的时候,会去Items.py文件中初始化对应的
input/output_processor处理器; 2. 当item中的处理器初始化完成,回到bole.py爬虫文件
中,创建item_loader对象;
3. item_loader对象创建完成,开始通过add_xpath/add_css/add_value收集数据;
4. 每收集到一个数据,就会将该数据传递给对应字段对应的input_processor绑定的函数进行
数据的处理;数据处理完成,会暂时保存在ItemLoader中;
5. 循环第4步,将每一个字段的数据提取并交给input_processor,直到所有数据提取完毕,
所有数据都会被保存在ItemLoader中;
6. 调用load_item()函数,给item对象进行赋值;
附如下的代码
# jobbole.py
item_loader = ItemLoader(item=JobboleItem(), response=response)
item_loader.add_xpath('title', '//div[@class="entry-header"]/h1/text()')
item_loader.add_xpath('date_time', '//p[@class="entry-meta-hide-on-mobile"]/text()')
item_loader.add_xpath('tags', '//p[@class="entry-meta-hide-on-mobile"]/a/text()')
item_loader.add_xpath('content', '//div[@class="entry"]//text()')
item_loader.add_xpath('zan_num', '//div[@class="post-adds"]/span[contains(@class, "vote-post-up")]//text()')
item_loader.add_xpath('keep_num', '//div[@class="post-adds"]/span[contains(@class, "bookmark-btn")]/text()')
item_loader.add_xpath('comment_num', '//div[@class="post-adds"]/a/span/text()')
item_loader.add_value('img_src', [response.meta['img_src']])
item = item_loader.load_item()
yield item
# items.py
import scrapy,re
from datetime import datetime
from scrapy.contrib.loader.processor import Join, MapCompose, TakeFirst
# 函数中的参数value值,是add_xpath/add_css/add_value传过来的列表数据中的每一个元素。
# def input_test_title(value):
# return value + '===='
#
# def output_test_title(value):
# return '---' + value
#
# def result(value):
# # 这里面join拼接的大列表里的一个元素
# return ''.join(value)
def convert_datetime(value):
# item.py
# 将字符串类型转化成datetime类型
value = value.replace('·', '').strip()
try:
# strptime(时间字符串,转化后的格式): 函数返回值是datetime类型的对象
date_time = datetime.strptime(value, '%Y/%m/%d')
except:
# 如果转化失败,将当前时间作为默认值。
date_time = datetime.now()
return date_time
def convert_tags(value):
# ['*职业', '1 评论', '职业']
# 过滤 "评论"
if "评论" in value:
return ""
return value
def zan_number(value):
if value.strip() != "":
pattern = re.compile(r'\d+')
num = re.findall(pattern, value)
if num:
num = int(num[0])
else:
num = 0
return num
def get_number(value):
# 提取评论、点赞数
pattern = re.compile(r'\d+')
num = re.findall(pattern, value)
if num:
num = int(num[0])
else:
num = 0
return num
class JobboleItem(scrapy.Item):
title = scrapy.Field(
# MapCompose映射类,可以将ItemLoader传递过来的列表中的元素,依次作用到test_title函数上,类似于map()函数。
# input_processor=MapCompose(input_test_title),
# Join(): 对列表进行合并,add_xpath/add_css/add_value传过来的列表数据。
# output_processor=TakeFirst()
)
date_time = scrapy.Field(
input_processor=MapCompose(convert_datetime),
# TakeFirst(): 获取列表中的首个元素
# output_processor=TakeFirst()
)
tags = scrapy.Field(
input_processor=MapCompose(convert_tags),
# 覆盖默认的default_output_processor = TakeFirst()
output_processor=Join()
)
content = scrapy.Field(
output_processor=Join()
)
zan_num = scrapy.Field(
# ['', '1', ' 赞']
input_processor=MapCompose(zan_number),
# output_processor=TakeFirst()
)
keep_num = scrapy.Field(
input_processor=MapCompose(get_number),
# output_processor=TakeFirst()
)
comment_num = scrapy.Field(
input_processor=MapCompose(get_number),
# output_processor=TakeFirst()
)
# 图片的源地址
img_src = scrapy.Field()
# 图片在本地的下载路径, 该字段只有在图片下载完成以后,才能进行赋值。
img_path = scrapy.Field()
实现自定义的ItemLoader
from scrapy.contrib.loader import ItemLoader
class CustomItemloader(ItemLoader):
"""
实现自定义的ItemLoader,可以指定默认的output_processor的值。可以避免在每一个字段中,设置重复的值。
"""
default_output_processor = TakeFirst()
在pipeline.py中:
# 定义处理图片的Pipeline
class ImagePipeline(ImagesPipeline):
# 图片下载完成以后的调用方法。
def item_completed(self, results, item, info):
print('---',results)
# return item
# 如果图片能够下载成功,说明这个文章是有图片的。如果results中不存在path路径,说明是没有图片的。
# [(True, {'path': ''})]
if results:
try:
img_path = results[0][1]['path']
except Exception as e:
print('img_path获取异常,',e)
img_path = '没有图片'
else:
img_path = '没有图片'
# 对item对象中的img_path进行赋值
item['img_path'] = img_path
# 判断完成,需要将变量img_path重新保存到item中。
return item
# 数据库pymysql的commit()和execute()在提交数据时,都是同步提交至数据库,由于scrapy框架数据的解析和异步多线程的,所以scrapy的数据解析速度,要远高于数据的写入数据库的速度。如果数据写入过慢,会造成数据库写入的阻塞,影响数据库写入的效率。
# 通过多线程异步的形式对数据进行写入,可以提高数据的写入速度。
from pymysql import cursors
# 使用twsited异步IO框架,实现数据的异步写入。
from twisted.enterprise import adbapi
class MySQLTwistedPipeline(object):
"""
MYSQL_HOST = 'localhost'
MYSQL_DB = 'jobbole'
MYSQL_USER = 'root'
MYSQL_PASSWD = '123456'
MYSQL_CHARSET = 'utf8'
MYSQL_PORT = 3306
"""
def __init__(self, dbpool):
self.dbpool = dbpool
@classmethod
def from_settings(cls, settings):
params = dict(
host=settings['MYSQL_HOST'],
db=settings['MYSQL_DB'],
user=settings['MYSQL_USER'],
passwd=settings['MYSQL_PASSWD'],
charset=settings['MYSQL_CHARSET'],
port=settings['MYSQL_PORT'],
cursorclass=cursors.DictCursor,
)
# 初始化数据库连接池(线程池)
# 参数一:mysql的驱动
# 参数二:连接mysql的配置信息
dbpool = adbapi.ConnectionPool('pymysql', **params)
return cls(dbpool)
def process_item(self, item, spider):
# 在该函数内,利用连接池对象,开始操作数据,将数据写入到数据库中。
# pool.map(self.insert_db, [1,2,3])
# 同步阻塞的方式: cursor.execute() commit()
# 异步非阻塞的方式
# 参数1:在异步任务中要执行的函数insert_db;
# 参数2:给该函数insert_db传递的参数
query = self.dbpool.runInteraction(self.insert_db, item)
# 如果异步任务执行失败的话,可以通过ErrBack()进行监听, 给insert_db添加一个执行失败的回调事件
query.addErrback(self.handle_error)
return item
def handle_error(self, field):
print('-----数据库写入失败:',field)
def insert_db(self, cursor, item):
insert_sql = "INSERT INTO bole(title, date_time, tags, content, zan_num, keep_num, comment_num, img_src, img_path) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)"
cursor.execute(insert_sql, (item['title'], item['date_time'], item['tags'], item['content'], item['zan_num'], item['keep_num'], item['comment_num'], item['img_src'], item['img_path']))
# 在execute()之后,不需要再进行commit(),连接池内部会进行提交的操作。
下一篇: 41 DNS