一个Item Pipeline 不需要继承特定基类,只需要实现某些特定方法,面向接口。
class MyPipeline(object):
def __init__(self):
"""
可选实现,做参数初始化等
"""
def process_item(self, item, spider):
"""
该方法必须实现,每个item pipeline组件都需要调用该方法,
该方法必须返回一个 Item 对象,被丢弃的item将不会被之后的pipeline组件所处理。
:param item: 被爬取的item
:param spider: 爬取该item的spider debug查看类属性
:return:
"""
return item
def open_spider(self, spider):
"""
可选实现,当spider被开启时,这个方法被调用。
:param spider: 被开启的spider
:return:
"""
def close_spider(self, spider):
"""
可选实现,当spider被关闭时,这个方法被调用
:param spider: 被关闭的spider
:return:
"""
采用同步的机制写入数据:
class MysqlPipeline(object):
def __init__(self):
pass
def process_item(self, item, spider):
if isinstance(item,InstanceItem):
save(item)
if spider.name == "spider_name":
save(item)
采用异步的机制写入代码
class MysqlTwistedPipeline(object):
# 采用异步的机制写入mysql
def __init__(self, dbpool):
self.dbpool = dbpool
@classmethod
def from_settings(cls, settings):
"""
from_settings **pipeline之后,会自动调用该函数加载settings中的配置
:param settings:
:return:
"""
dbparms = dict(
host="127.0.0.1", # settings["MYSQL_HOST"],
db="spider", # settings["MYSQL_DBNAME"],
user="root", # settings["MYSQL_USER"]
password="root", # settings["MYSQL_PASSWORD"],
charset="utf8",
use_unicode=True, # 不然没办法保存中文
cursorclass=cursors.DictCursor
)
db_pool = adbapi.ConnectionPool('pymysql', **dbparms)
return cls(db_pool)
def process_item(self, item, spider):
##使用twisted将mysql插入变成异步执行
query = self.dbpool.runInteraction(self.do_insert, item)
query.addErrback(self.handle_error, item, spider) # 处理异常
def handle_error(self, failure, item, spider):
# 处理异步插入的异常
print(failure)
def do_insert(self, cursor, item):
# 执行具体的插入
# 根据不同的的item构建不同的sql语句插入到mysql中
insert_sql, params = item.get_insert_sql()
cursor.execute(insert_sql, params)
# 自动commit
启用Item Pipeline
在settings.py文件中进行配置
ITEM_PIPELINES={
'ArticleSpider.pipelines.MysqlTwistedPipeline': 2,####数值小的在前
}