【Scrapy 框架】「版本2.4.0源码」管道(Pipeline)详解篇
程序员文章站
2022-05-08 18:29:44
...
全部源码解析文章索引目录传送门
【Scrapy 框架】版本 2.4.0 源码篇:全部配置目录索引
内容介绍
主要用于处理抓取数据使用,其中包含:
- 清理HTML数据
- 验证抓取的数据(检查项目是否包含某些字段)
- 检查重复项(并将其删除)
- 将刮擦的物品存储在数据库中
pipeline类参数解释
class SomethingPipeline(object):
def __init__(self):
# 可选实现,做参数初始化等
# 写入你的业务逻辑
def process_item(self, item, spider):
# item (Item 对象) – 爬取数据的item
# spider (Spider 对象) – 爬取该item的spider
# 这个方法必须实现,每个item pipeline组件都需要调用该方法,
# 这个方法必须返回一个 Item 对象,被丢弃的item将不会被之后的pipeline组件所处理。
return item
def open_spider(self, spider):
# spider (Spider 对象) – 被开启的spider
# 可选实现,spider开启时,这个方法被调用。
def close_spider(self, spider):
# spider (Spider 对象) – 被关闭的spider
# 可选实现,spider关闭时,这个方法被调用
功能示例
**使用(必须打开)
在 settings.py 中释放代码69行 ITEM_PIPELINES 否则数据库无法写入
# 这里把这3行注释开,无需修改。
DOWNLOADER_MIDDLEWARES = {
'你的项目名称.middlewares.WwwCjnCnDownloaderMiddleware': 543,
}
数据写入JSON文件
import json
from itemadapter import ItemAdapter
class JsonWriterPipeline:
def open_spider(self, spider):
self.file = open('items.jl', 'w')
def close_spider(self, spider):
self.file.close()
def process_item(self, item, spider):
line = json.dumps(ItemAdapter(item).asdict()) + "\n"
self.file.write(line)
return item
数据写入MongoDB
import pymongo
from scrapy.utils.project import get_project_settings
settings = get_project_settings()
class WwwCjnCnPipeline(object):
# class中全部替换
def __init__(self):
host = settings["MONGODB_HOST"]
port = settings["MONGODB_PORT"]
dbname = settings["MONGODB_DBNAME"]
sheetname = settings["MONGODB_SHEETNAME"]
username = settings["MONGODB_USER"]
password = settings["MONGODB_PASSWORD"]
# 创建MONGODB数据库链接
client = pymongo.MongoClient(host=host, port=port, username=username, password=password)
# 指定数据库
mydb = client[dbname]
# 存放数据的数据库表名
self.post = mydb[sheetname]
def process_item(self, item, spider):
data = dict(item)
self.post.insert(data)
return item
抓取数据截图
from urllib.parse import quote
import scrapy
from itemadapter import ItemAdapter
class ScreenshotPipeline:
"""Pipeline that uses Splash to render screenshot of
every Scrapy item."""
SPLASH_URL = "http://localhost:8050/render.png?url={}"
async def process_item(self, item, spider):
adapter = ItemAdapter(item)
encoded_item_url = quote(adapter["url"])
screenshot_url = self.SPLASH_URL.format(encoded_item_url)
request = scrapy.Request(screenshot_url)
response = await spider.crawler.engine.download(request, spider)
if response.status != 200:
# Error happened, return item.
return item
# Save screenshot to file, filename will be hash of url.
url = adapter["url"]
url_hash = hashlib.md5(url.encode("utf8")).hexdigest()
filename = f"{url_hash}.png"
with open(filename, "wb") as f:
f.write(response.body)
# Store filename in item.
adapter["screenshot_filename"] = filename
return item
重复数据过滤
from itemadapter import ItemAdapter
from scrapy.exceptions import DropItem
class DuplicatesPipeline:
def __init__(self):
self.ids_seen = set()
def process_item(self, item, spider):
adapter = ItemAdapter(item)
if adapter['id'] in self.ids_seen:
raise DropItem(f"Duplicate item found: {item!r}")
else:
self.ids_seen.add(adapter['id'])
return item