欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

scrapy 异步插入数据库pipeline 通用代码

程序员文章站 2022-05-10 19:56:38
...

核心代码

import pymysql
import logging
logger = logging.getLogger(__name__)

from scrapy.exceptions import DropItem
from twisted.enterprise import adbapi

class MysqlTwistedPipeline(object):
    """ 异步存储到MySQL """
    def __init__(self, dbpool):
        self.dbpool = dbpool
    #     self.redis_client = redis.StrictRedis(
    #         host=settings.REDIS_HOST,
    #         port=settings.REDIS_PORT,
    #         password=settings.REDIS_PASSWORD,
    #         db=settings.REDIS_DB,
    #     )

    @classmethod
    def from_crawler(cls, crawler):
        dbparms = dict(             
            host=crawler.settings.get('DB_HOST'),  # settings中配置的数据库信息
            db=crawler.settings.get('DB_NAME'),
            user=crawler.settings.get('DB_USER'),
            passwd=crawler.settings.get('DB_PASSWORD'),
            charset=crawler.settings.get('DB_CHARSET'),
            cursorclass=pymysql.cursors.Cursor,
            use_unicode=True,
            connect_timeout=600,  # 分钟,默认十分钟不操作断开
        )
        dbpool = adbapi.ConnectionPool('pymysql', **dbparms)  # 连接
        return cls(dbpool)

    def process_item(self, item, spider):
        self.dbpool.runInteraction(self.do_insert, item)  # 调用twisted进行异步的插入操作

    def do_insert(self, cursor, item):   # 这里的item 是一个字典格式 
        table = item.get('collection')
        item.pop('collection')         # collection key 是数据库名字段
        fields = ", ".join(list(item.keys()))
        sub_char = ", ".join(["%s"] * len(item))
        values = tuple(list(item.values()))

        sql = "insert ignore into {}({}) values ({})".format(table, fields, sub_char)
        try:
            cursor.execute(sql, values)
            logger.debug('插入成功:{}'.format(table))
        except Exception as e:
            if "Duplicate" in repr(e):
                logger.info("数据重复--删除")
                DropItem(item)
            else:
                print(sql, values)
                logger.info('插入失败--{}'.format(repr(e)))

该pipeline 代码完全通用,并且代码中没有采用item 存储数据 直接把数据放在一个字典中 yield

字典格式:

controlling_item = dict(
            key1=value1,key2=value2,key3=value3,
            collection='表名'
        )