欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

scrapy中pipeline的异步存储

程序员文章站 2022-05-11 23:13:05
...

import pymysql
‘同步写入数据速度比较慢,而爬虫速度比较快,可能导致数据最后写入不到数据库中’
‘’’
1.引入twisted.enterprise.adbapi pymysql.cursors
2.在settings中配置数据库连接参数
3.创建pipeline,实现from_settings函数,从settings获取数据库连接参数,根据参数创建连接池对象,返回当前pipeline的对象,并且把连接池赋值给该对象属性
4.实现process_item函数,使用db_pool.runInteraction(函数,函数需要的参数) 将数据库的处理操作放入连接池s,还需要将操作数据的函数实现,使用cursor执行sql
5.拿到runInteraction()函数返回的处理结果,添加错误回调函数,在函数中将错误原因打印
‘’’

Twisted 做异步任务处理的包

adbapi 操作数据库的模块

from twisted.enterprise import adbapi
from pymysql import cursors
class MySQLTwistedPipeline(object):

# 1.链接mysql数据库
# from_settings **pipeline之后,会自动调用该函数加载settings中的配置
@classmethod
def from_settings(cls, settings):
    # 准备数据库的链接参数,是一个字典
    db_params = dict(
        host = settings['MYSQL_HOST'],
        user = settings['MYSQL_USER'],
        password = settings['MYSQL_PASSWD'],
        port = settings['MYSQL_PORT'],
        db = settings['MYSQL_DBNAME'],
        charset = settings['MYSQL_CHARSET'],
        use_unicode = True,
        # 指定使用的游标类型
        cursorclass= cursors.DictCursor
    )
    # 创建连接池
    # 1.使用的操作数据库的包名称
    # 2.准备的数据库链接参数
    db_pool = adbapi.ConnectionPool('pymysql',**db_params)
    # 返回创建好的对象
    return cls(db_pool)
# 在初始化函数中,对db_pool进行赋值
def __init__(self,db_pool):
    # 赋值
    self.db_pool = db_pool

# 处理item的函数
def process_item(self,item,spider):
    # 异步写入
    # 把执行sql的操作放入pool中
    # 1.执行的操作(功能函数) 函数对象 function类型
    # 2.item 对象 spider对象
    query = self.db_pool.runInteraction(self.insert_item,item)
    # 执行sql出现错误,会执行指定的回调函数
    query.addErrback(self.handle_error,item,spider)
    # 返回item
    return item

# failure 错误原因
def handle_error(self,failure,item,spider):
    # 输出错误原因
    print(failure)

# 执行的操作
def insert_item(self,cursor,item):

    sql = "INSERT INTO jobs(job_name,org_name,job_location,max_money,min_money,date)VALUES (%s,%s,%s,%s,%s,%s)"
    # 执行sql
    cursor.execute(sql,(item['job_name'],item['org_name'],item['job_location'],item['max_money'],item['min_money'],item['date']))