欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Scrapy框架item pipeline异步保存数据

程序员文章站 2022-01-20 23:14:31
...

在使用Scrapy框架时,面对多个爬虫此时每个爬虫爬取的数据都来自不同的网站或者不同的数据的时候,就需要高效率的将数据存储到数据库。这里就用到了异步机制来保存数据

好了,不BB了。show code :

class MysqlTwistedPipeline(object):
    #采用异步的机制写入mysql
    def __init__(self,dbpool):
        self.dbpool =dbpool
 
    @classmethod
    def from_settings(cls,settings):
        dbparms = dict(
            host = "127.0.0.1",#settings["MYSQL_HOST"],
            db = "bitcoin",#settings["MYSQL_DBNAME"],
            user ="root",#settings["MYSQL_USER"]
            password ="root",#settings["MYSQL_PASSWORD"],
            charset="utf8",
            use_unicode =True,#不然没办法保存中文
            cursorclass = MySQLdb.cursors.DictCursor
        )
        dbpool = adbapi.ConnectionPool("MySQLdb",**dbparms)
        return cls(dbpool)
 
    def process_item(self,item,spider):
        ##使用twisted将mysql插入变成异步执行
        query = self.dbpool.runInteraction(self.do_insert,item)
        query.addErrback(self.handle_error,item,spider)#处理异常
 
 
    def handle_error(self,failure,item,spider):
        #处理异步插入的异常
        print(failure)
 
    def do_insert(self,cursor,item):
        #执行具体的插入
        #根据不同的的item构建不同的sql语句插入到mysql中
        insert_sql,params = item.get_insert_sql()
 
        cursor.execute(insert_sql,params)
            ###自动commit

上面的代码是参考这位仁兄的,借用一下,但是看着上面的代码我觉得经验不足的程序猿也许会比较困惑,那我就上面的模板来实践出我实际生产中的代码:

# -*- coding: utf-8 -*-
# Define your item pipelines here
# Don't forget to add your pipeline to the ITEM_PIPELINES setting
# See: http://doc.scrapy.org/en/latest/topics/item-pipeline.html
from scrapy.exceptions import DropItem
from twisted.enterprise import adbapi
import copy
import time
import datetime
import re
import json
from settings import ORACLE_DSN, ORACLE_MOD, ORACLE_PASSWD, ORACLE_USER
from cde.items import CdeItem, CompanyItem, LcItem, DrugDetail, CfdaItem, RegistrationNumber, Organization_Researcher,CfdastatusItem, GroupItem
from organization_compare import mail_content
from organization_compare  import maildict
import traceback
 
class CdePipeline(object):
    # 采用异步的机制写入Oracle
    def __init__(self, dbpool):
        self.dbpool = dbpool
 
    @classmethod
    def from_settings(cls, settings):
        dbpool = adbapi.ConnectionPool(ORACLE_MOD, user=ORACLE_USER, password=ORACLE_PASSWD, dsn=ORACLE_DSN,
                                       threaded=True)  
        return cls(dbpool)
 
    def process_item(self, item, spider):
        asynItem = copy.deepcopy(item)
        if isinstance(item, CdeItem): # 判断来自哪个爬虫爬取的数据
            self.dbpool.runInteraction(self.insert_cdelog_into_table, asynItem) # 执行下面的insert_cdelog_into_table函数保存数据
        elif isinstance(item, CompanyItem):
            self.dbpool.runInteraction(self.insert_company_name, asynItem)
        return item
 
    def insert_cdelog_into_table(self, conn, item):
        print "开始存储数据.............."
        curTime = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        # print curTime
        try:
            conn.execute(
                """INSERT into "cde_log"("is_new","tid","rank","code","name","join_date","review_status","pharmacology_status","clinical_status","pharmacy_status","remark","sha224","create_date") values(2,:tid,:rank,:code,:name,to_date(:join_date,'yyyy-mm-dd'),:review_status,:pharmacology_status,:clinical_status,:pharmacy_status,:remark,:sha224,SYSDATE)""",
                {
                    'tid': item['tid'],
                    'rank': item['xuhao'],
                    'code': item['shoulihao'],
                    'name': item['name'],
                    'join_date': item['intime'],
                    'review_status': item['state'],
                    'pharmacology_status': item['yaoliduli'],
                    'clinical_status': item['linchuang'],
                    'pharmacy_status': item['yaoxue'],
                    'remark': item['beizhu'],
                    'sha224': item['sha224']
                })
        except Exception, e:
            print e
        finally:
            pass
        # 如果爬取的受理号不在表"cde"中就添加
        conn.execute("""SELECT "name" from "cde" where "old_value"=0 and "code"=:code""", {'code': item['shoulihao']})
        result2 = conn.fetchone()
        if not result2:
            conn.execute(
                """INSERT into "cde"("tid","rank","code","name","join_date","review_status","pharmacology_status","clinical_status","pharmacy_status","remark","create_date") values(:tid,:rank,:code,:name,to_date(:join_date,'yyyy-mm-dd'),:review_status,:pharmacology_status,:clinical_status,:pharmacy_status,:remark,SYSDATE)""",
                {
                    'tid': item['tid'],
                    'rank': item['xuhao'],
                    'code': item['shoulihao'],
                    'name': item['name'],
                    'join_date': item['intime'],
                    'review_status': item['state'],
                    'pharmacology_status': item['yaoliduli'],
                    'clinical_status': item['linchuang'],
                    'pharmacy_status': item['yaoxue'],
                    'remark': item['beizhu']})
            print "添加受理号{}成功".format(item['shoulihao'])
        # 如果爬取的受理号在表"cde"中存在,再比较一下药物名并且更新
        else:
            if not result2[0] == item['name']:
                conn.execute("""UPDATE "cde" set "name"=:name where "old_value"=0 and "code"=:code""",
                             {'name': item['name'], 'code': item['shoulihao']})
                print "更新药物{}名成功".format(item['name'])
 
    def insert_company_name(self, conn, item):
        conn.execute("""UPDATE "cde" set "company"=:company where "code" =:code """, {
            'company': item['company'],
            'code': item['shouliid']
        })

上面的代码就是我工作中用到的,先判断来自哪个爬虫爬取的数据,根据爬虫将数据分配到不用函数来分别保存数据,这样大大加快了存储的效率,所以爬虫多、数据源多的情况就可以选择异步存储是不会错的。

如果上面的代码还有什么不明白的地方,可以留言问我,我会尽快答复,共同交流,共同进步!

相关标签: Scrapy