scrapy中pipeline的异步存储
程序员文章站
2022-05-11 23:13:05
...
import pymysql
‘同步写入数据速度比较慢,而爬虫速度比较快,可能导致数据最后写入不到数据库中’
‘’’
1.引入twisted.enterprise.adbapi pymysql.cursors
2.在settings中配置数据库连接参数
3.创建pipeline,实现from_settings函数,从settings获取数据库连接参数,根据参数创建连接池对象,返回当前pipeline的对象,并且把连接池赋值给该对象属性
4.实现process_item函数,使用db_pool.runInteraction(函数,函数需要的参数) 将数据库的处理操作放入连接池s,还需要将操作数据的函数实现,使用cursor执行sql
5.拿到runInteraction()函数返回的处理结果,添加错误回调函数,在函数中将错误原因打印
‘’’
Twisted 做异步任务处理的包
adbapi 操作数据库的模块
from twisted.enterprise import adbapi
from pymysql import cursors
class MySQLTwistedPipeline(object):
# 1.链接mysql数据库
# from_settings **pipeline之后,会自动调用该函数加载settings中的配置
@classmethod
def from_settings(cls, settings):
# 准备数据库的链接参数,是一个字典
db_params = dict(
host = settings['MYSQL_HOST'],
user = settings['MYSQL_USER'],
password = settings['MYSQL_PASSWD'],
port = settings['MYSQL_PORT'],
db = settings['MYSQL_DBNAME'],
charset = settings['MYSQL_CHARSET'],
use_unicode = True,
# 指定使用的游标类型
cursorclass= cursors.DictCursor
)
# 创建连接池
# 1.使用的操作数据库的包名称
# 2.准备的数据库链接参数
db_pool = adbapi.ConnectionPool('pymysql',**db_params)
# 返回创建好的对象
return cls(db_pool)
# 在初始化函数中,对db_pool进行赋值
def __init__(self,db_pool):
# 赋值
self.db_pool = db_pool
# 处理item的函数
def process_item(self,item,spider):
# 异步写入
# 把执行sql的操作放入pool中
# 1.执行的操作(功能函数) 函数对象 function类型
# 2.item 对象 spider对象
query = self.db_pool.runInteraction(self.insert_item,item)
# 执行sql出现错误,会执行指定的回调函数
query.addErrback(self.handle_error,item,spider)
# 返回item
return item
# failure 错误原因
def handle_error(self,failure,item,spider):
# 输出错误原因
print(failure)
# 执行的操作
def insert_item(self,cursor,item):
sql = "INSERT INTO jobs(job_name,org_name,job_location,max_money,min_money,date)VALUES (%s,%s,%s,%s,%s,%s)"
# 执行sql
cursor.execute(sql,(item['job_name'],item['org_name'],item['job_location'],item['max_money'],item['min_money'],item['date']))