scrapy 异步插入数据库pipeline 通用代码
程序员文章站
2022-05-10 19:56:38
...
核心代码
import pymysql
import logging
logger = logging.getLogger(__name__)
from scrapy.exceptions import DropItem
from twisted.enterprise import adbapi
class MysqlTwistedPipeline(object):
""" 异步存储到MySQL """
def __init__(self, dbpool):
self.dbpool = dbpool
# self.redis_client = redis.StrictRedis(
# host=settings.REDIS_HOST,
# port=settings.REDIS_PORT,
# password=settings.REDIS_PASSWORD,
# db=settings.REDIS_DB,
# )
@classmethod
def from_crawler(cls, crawler):
dbparms = dict(
host=crawler.settings.get('DB_HOST'), # settings中配置的数据库信息
db=crawler.settings.get('DB_NAME'),
user=crawler.settings.get('DB_USER'),
passwd=crawler.settings.get('DB_PASSWORD'),
charset=crawler.settings.get('DB_CHARSET'),
cursorclass=pymysql.cursors.Cursor,
use_unicode=True,
connect_timeout=600, # 分钟,默认十分钟不操作断开
)
dbpool = adbapi.ConnectionPool('pymysql', **dbparms) # 连接
return cls(dbpool)
def process_item(self, item, spider):
self.dbpool.runInteraction(self.do_insert, item) # 调用twisted进行异步的插入操作
def do_insert(self, cursor, item): # 这里的item 是一个字典格式
table = item.get('collection')
item.pop('collection') # collection key 是数据库名字段
fields = ", ".join(list(item.keys()))
sub_char = ", ".join(["%s"] * len(item))
values = tuple(list(item.values()))
sql = "insert ignore into {}({}) values ({})".format(table, fields, sub_char)
try:
cursor.execute(sql, values)
logger.debug('插入成功:{}'.format(table))
except Exception as e:
if "Duplicate" in repr(e):
logger.info("数据重复--删除")
DropItem(item)
else:
print(sql, values)
logger.info('插入失败--{}'.format(repr(e)))
该pipeline 代码完全通用,并且代码中没有采用item 存储数据 直接把数据放在一个字典中 yield
字典格式:
controlling_item = dict(
key1=value1,key2=value2,key3=value3,
collection='表名'
)