欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Scrapy-5.Items

程序员文章站 2022-03-02 21:08:55
...

本文地址:https://www.jianshu.com/p/58781f28904f

在抓取数据的过程中,主要要做的事就是从杂乱的数据中提取出结构化的数据。ScrapySpider可以把数据提取为一个Python中的字典,虽然字典使用起来非常方便,对我们来说也很熟悉,但是字典有一个缺点:缺少固定结构。在一个拥有许多爬虫的大项目中,字典非常容易造成字段名称上的语法错误,或者是返回不一致的数据。

所以Scrapy中,定义了一个专门的通用数据结构:Item。这个Item对象提供了跟字典相似的API,并且有一个非常方便的语法来声明可用的字段。

声明Item

Item的声明在items.py这个文件中,声明Item时,需要从scrapy.Item继承:

import scrapy

class Product(scrapy.Item):
    name = scrapy.Field()
    price = scrapy.Field()
    stock = scrapy.Field()
    last_updated = scrapy.Field(serializer=str)

在声明这个Item时,需要同时声明Item中的字段,声明的方式类似于设置类属性。熟悉Djiango的话那么你会注意到这与Django Models非常相似,但是没有多种Field类型,更加简单。

使用:

In [1]: product = Product(name='Desktop PC', price=1000)

In [2]: product
Out[2]: {'name': 'Desktop PC', 'price': 1000}

Field对象

可以看到,在声明Item时,声明字段使用的是Field对象。这个Field对象其实完全继承自Python的字典,并且没有做任何改动,所以在使用Field声明字段时,可以传入数据作为这个字段的元数据(metadata),上方的serializer=str其实就是一个指定序列化函数的元数据。

字段的元数据与字段的值之间没有必然的联系。如果我们直接查看Item对象,那么获取的是字段的值;

In [3]: product
Out[3]: {'name': 'Desktop PC', 'price': 1000}

如果使用.fields,获取的就是字段的元数据了:

In [4]: product.fields
Out[4]: {'last_updated': {'serializer': str}, 'name': {}, 'price': {}, 'stock': {}}

使用方式

由于Item有跟字典类似的API,所以很多时候可以像字典一样使用:

# 可以像字典一样用字段名取值
In [5]: product['name']
Out[5]: 'Desktop PC'

# 可以使用get方法
In [6]: product.get('name')
Out[6]: 'Desktop PC'

# 可以在获取字段没有值时,设置默认返回的值
In [7]: product.get('last_updated', 'not set')
Out[7]: 'not set'

# 可以像字典一样对存在的字段赋值
In [8]: product['last_updated'] = 'today'
In [9]: product['last_updated']
Out[9]: today

但是有一点区别的是,如果对Item没有声明的字段操作,会抛出异常:

# 获取没有声明的字段
In [10]: product['lala']
Traceback (most recent call last):
    ...
KeyError: 'lala'

# 可以对未声明字段使用get方法,设置默认返回的值
In [11]: product.get('lala', 'unknown field')
Out[11]:'unknown field'

# 给没有声明的字段赋值
In [12]: product['lala'] = 'test' # setting unknown field
Traceback (most recent call last):
    ...
KeyError: 'Product does not support field: lala'

还可以直接从Dict直接创建Item

In [13]: Product({'name': 'Laptop PC', 'price': 1500})
Out[13]: Product(price=1500, name='Laptop PC')

In [14]: Product({'name': 'Laptop PC', 'lala': 1500}) # warning: unknown field in dict
Traceback (most recent call last):
    ...
KeyError: 'Product does not support field: lala'

如果需要扩展或者修改某一个Item类的话,可以使用继承的方式:

class DiscountedProduct(Product):
    discount_percent = scrapy.Field(serializer=str)
    discount_expiration_date = scrapy.Field()

class SpecificProduct(Product):
    name = scrapy.Field(Product.fields['name'], serializer=my_serializer)

Item Pipeline

Spider中返回一个Item后,这个Item将会被发送给Item Pipeline,其主要有以下几种作用:

  • 清洗数据
  • 验证抓取下来的数据(检查是否含有某些字段)
  • 检查去重
  • 存储数据到数据库

每个Item Pipeline都是一个Python类,实现了几个简单的方法。

启用Item Pipeline

启用方式与Middleware基本相同,优先级的值越小,越先被调用。

ITEM_PIPELINES = {
    'myproject.pipelines.PricePipeline': 300,
    'myproject.pipelines.JsonWriterPipeline': 800,
}

自定义Item Pipeline

自定义Item Pipeline必须实现以下这个方法:

  • process_item(self, item, spider)

    每一个Item Pipeline都会调用这个方法,用来处理Item

    参数:

    • item(Item对象或者Dict) - 抓取的Item
    • spider(Spider对象) - 抓取这个ItemSpider

    这个方法需要返回以下两种返回值的一种:

    • Item或者Dict

      Scrapy将会继续调用接下来的Item Pipeline组件处理下去。

    • 抛出一个DropItem异常

      将会不再继续调用接下来的Item Pipeline

还可以实现以下几种方法之一来实现某些功能:

  • open_spider(self, spider)

    这个方法将会在Spider打开时调用。

  • close_spider(self, spider)

    这个方法将会在Spider关闭时调用。

  • from_crawler(cls, crawler)

    如果存在这个方法,那么就会调用这个类方法来从Crawler创建一个Pipeline实例。这个方法必须返回一个Pipeline的新实例。

    其中的Crwaler对象提供了可以访问到Scrapy核心部件的路径,比如settings和signals。Pipeline实例可以通过这种方式来连接他们。

Item Pipeline例子

1.验证数据

如果Item中包含price_excludes_vat属性,就调整数据中的price属性,并抛弃那些不包含price属性的Item。

from scrapy.exceptions import DropItem

class PricePipeline(object):

    vat_factor = 1.15

    def process_item(self, item, spider):
        if item['price']:
            if item['price_excludes_vat']:
                item['price'] = item['price'] * self.vat_factor
            return item
        else:
            raise DropItem("Missing price in %s" % item)

2.写入Item到JSON文件

import json

class JsonWriterPipeline(object):

    def open_spider(self, spider):
        self.file = open('items.jl', 'w')

    def close_spider(self, spider):
        self.file.close()

    def process_item(self, item, spider):
        line = json.dumps(dict(item)) + "\n"
        self.file.write(line)
        return item

3.写入Item到MongoDB

import pymongo

class MongoPipeline(object):

    collection_name = 'scrapy_items'

    def __init__(self, mongo_uri, mongo_db):
        self.mongo_uri = mongo_uri
        self.mongo_db = mongo_db

    @classmethod
    def from_crawler(cls, crawler):
        return cls(
            mongo_uri=crawler.settings.get('MONGO_URI'),
            mongo_db=crawler.settings.get('MONGO_DATABASE', 'items')
        )

    def open_spider(self, spider):
        self.client = pymongo.MongoClient(self.mongo_uri)
        self.db = self.client[self.mongo_db]

    def close_spider(self, spider):
        self.client.close()

    def process_item(self, item, spider):
        self.db[self.collection_name].insert_one(dict(item))
        return item

4.去重

from scrapy.exceptions import DropItem

class DuplicatesPipeline(object):

    def __init__(self):
        self.ids_seen = set()

    def process_item(self, item, spider):
        if item['id'] in self.ids_seen:
            raise DropItem("Duplicate item found: %s" % item)
        else:
            self.ids_seen.add(item['id'])
            return item

系列文章:

推荐阅读