深入理解 Scrapy 的 Pipeline
今天我們來(lái)深入學(xué)習(xí) Scrapy 框架 Pipeline 的工作原理。這一次我們采取一種新的學(xué)習(xí)方式:先提出疑問(wèn),然后從源碼中進(jìn)行解答,直到最后我們徹底搞清楚 Pipeline 的工作流程。
1. 問(wèn)題描述
這一小節(jié)我們將從源碼的角度來(lái)分析 Pipeline 的工作過(guò)程。現(xiàn)在我先提出幾個(gè)疑問(wèn):
- Scrapy 框架中使用 Pipeline 處理 Item 的代碼在哪里?為什么我在 settings.py 中設(shè)置了 ITEM_PIPELINES 屬性值,Scrapy 就能將其作為 Pipeline 去處理對(duì)應(yīng) Spider 生成的 Item 呢?
- 定義 Pipeline 的那四個(gè)方法來(lái)自哪里?為什么一定需要 process_item() 方法?
- 第12節(jié)中抓取起點(diǎn)月票榜小說(shuō)時(shí)用到了圖片管道,該管道的一個(gè)詳細(xì)的處理流程是怎樣的,即它如何實(shí)現(xiàn)圖片下載?
帶著這些疑問(wèn),我們來(lái)進(jìn)入源碼中尋找答案。
2. 源碼解惑
2.1 Item Pipeline 的管理器類
還記得上一小節(jié)我們追蹤 Spider 中間件的代碼時(shí),在 scrapy/core/scraper.py
中找到了 Spider 中間件處理 Spider 模塊返回結(jié)果的方法,其代碼內(nèi)容如下:
# 源碼位置:scrapy/core/scraper.py
# ...
class Scraper:
# ...
def _process_spidermw_output(self, output, request, response, spider):
"""Process each Request/Item (given in the output parameter) returned
from the given spider
"""
if isinstance(output, Request):
# 如果spider中間件返回的是Request,則繼續(xù)調(diào)用引擎去處理請(qǐng)求
self.crawler.engine.crawl(request=output, spider=spider)
elif is_item(output):
# 如果spider中間件返回的是item,則調(diào)用self.itemproc對(duì)象的process_item()方法處理
self.slot.itemproc_size += 1
dfd = self.itemproc.process_item(output, spider)
dfd.addBoth(self._itemproc_finished, output, response, spider)
return dfd
elif output is None:
pass
else:
# 打印錯(cuò)誤日志
# ...
從上面的代碼我們知道,對(duì)于 Spider 中間件模塊最后返回的 Item 類型數(shù)據(jù)會(huì)調(diào)用 self.itemproc
對(duì)象的 process_item()
方法處理,那么這個(gè) self.itemproc
對(duì)象是什么呢?找到 Scraper 類的 __init__()
方法:
# 源碼位置:scrapy/core/scraper.py
# ...
class Scraper:
def __init__(self, crawler):
# ...
itemproc_cls = load_object(crawler.settings['ITEM_PROCESSOR'])
self.itemproc = itemproc_cls.from_crawler(crawler)
# ...
# ...
來(lái)看默認(rèn)的配置中關(guān)于 ITEM_PROCESSOR
的值,如下:
# 源碼位置: scrapy/settings/default_settings.py
# ...
ITEM_PROCESSOR = 'scrapy.pipelines.ItemPipelineManager'
# ...
單看這個(gè)類的名稱,又是一個(gè)某某管理器類,前面我們學(xué)了下載中間件管理類、Spider 中間件管理類,分別管理下載中間件類以及 Spider 中間件類,維護(hù)所屬類方法的處理順序。這里我們也是需要一個(gè)同樣功能的管理類,來(lái)保證依次處理相應(yīng)的 Item pipelines。我們進(jìn)入該管理器類,閱讀其實(shí)現(xiàn)代碼:
# 源碼位置:scrapy/
from scrapy.middleware import MiddlewareManager
from scrapy.utils.conf import build_component_list
from scrapy.utils.defer import deferred_f_from_coro_f
class ItemPipelineManager(MiddlewareManager):
component_name = 'item pipeline'
@classmethod
def _get_mwlist_from_settings(cls, settings):
return build_component_list(settings.getwithbase('ITEM_PIPELINES'))
def _add_middleware(self, pipe):
super(ItemPipelineManager, self)._add_middleware(pipe)
if hasattr(pipe, 'process_item'):
self.methods['process_item'].append(deferred_f_from_coro_f(pipe.process_item))
def process_item(self, item, spider):
return self._process_chain('process_item', item, spider)
同樣,這個(gè)管理類直接就繼承了前面的中間件管理器類,其代碼量非常少,十分容易理解。
首先它和所有的中間件管理類一樣從全局配置中獲的對(duì)應(yīng)管理的 pipelines,這個(gè)配置正是 ITEM_PIPELINES
。其次,注意到這個(gè) _add_middleware()
方法中有個(gè)調(diào)用父類的 _add_middleware()
方法,而父類中該方法的代碼如下:
# 源碼位置: scrapy/middleware.py
# ...
class MiddlewareManager:
# ...
def _add_middleware(self, mw):
if hasattr(mw, 'open_spider'):
self.methods['open_spider'].append(mw.open_spider)
if hasattr(mw, 'close_spider'):
self.methods['close_spider'].appendleft(mw.close_spider)
我們從而得知,在 pipeline 中會(huì)將 open_spider()
、close_spider()
以及 process_item()
方法加入到對(duì)應(yīng)的處理鏈中,且 MiddlewareManager
類中 from_crawler()
是一個(gè)類方法,因此對(duì)于繼承該類的子類也同樣會(huì)有該方法,也即具備了通過(guò) Crawler
類對(duì)象實(shí)例化的能力。
2.2 Scrapy 框架內(nèi)置的 Pipelines 分析
前面第12節(jié)中,我們?cè)诮榻B Scrapy 框架的管道內(nèi)容時(shí),使用了其內(nèi)置的圖片處理管道 (ImagesPipeline),它對(duì)應(yīng)的代碼位置為:scrapy/pipelines/images.py
。接下來(lái),我們將分析其源碼,看看如何實(shí)現(xiàn)圖片下載的功能。
首先看看類繼承關(guān)系:在 images.py
中定義的 ImagesPipeline
繼承了 files.py
中定義的 FilesPipeline
類;而 FilesPipeline
類又繼承至 media.py
中定義的 MediaPipeline
類。因此,我們先從分析基類開(kāi)始,我們從管道的兩個(gè)核心方法開(kāi)始入手:
- 初始化方法:
__init__()
; - Item 核心處理方法:
process_item()
;
首先來(lái)看初始化的代碼,如下:
# 源碼位置:scrapy/pipelines/media.py
# ...
class MediaPipeline:
LOG_FAILED_RESULTS = True
class SpiderInfo:
def __init__(self, spider):
self.spider = spider
self.downloading = set()
self.downloaded = {}
self.waiting = defaultdict(list)
def __init__(self, download_func=None, settings=None):
self.download_func = download_func
if isinstance(settings, dict) or settings is None:
settings = Settings(settings)
resolve = functools.partial(self._key_for_pipe,
base_class_name="MediaPipeline",
settings=settings)
self.allow_redirects = settings.getbool(
resolve('MEDIA_ALLOW_REDIRECTS'), False
)
self._handle_statuses(self.allow_redirects)
def _handle_statuses(self, allow_redirects):
# 默認(rèn)不允許重定向
self.handle_httpstatus_list = None
if allow_redirects:
# 當(dāng)設(shè)置了allow_redirects時(shí),會(huì)考慮處理存在3xx的下載地址
self.handle_httpstatus_list = SequenceExclude(range(300, 400))
def _key_for_pipe(self, key, base_class_name=None, settings=None):
"""
>>> MediaPipeline()._key_for_pipe("IMAGES")
'IMAGES'
>>> class MyPipe(MediaPipeline):
... pass
>>> MyPipe()._key_for_pipe("IMAGES", base_class_name="MediaPipeline")
'MYPIPE_IMAGES'
"""
class_name = self.__class__.__name__
formatted_key = "{}_{}".format(class_name.upper(), key)
if (
not base_class_name
or class_name == base_class_name
or settings and not settings.get(formatted_key)
):
return key
return formatted_key
# ...
上面的類中又定義了一個(gè)類:SpiderInfo
,這個(gè)類只是用來(lái)保存多個(gè)數(shù)據(jù)用的。此外,初始化方法中主要讀取相關(guān)的配置,判斷是否需要允許下載的 URL 重定向。該參數(shù)在 Scrapy 官方文檔中的說(shuō)明如下:
接下來(lái)是核心的處理 Item 的方法:
# 源碼位置:scrapy/pipelines/media.py
# ...
class MediaPipeline:
# ...
def process_item(self, item, spider):
info = self.spiderinfo
# 從item中獲取請(qǐng)求列表
requests = arg_to_iter(self.get_media_requests(item, info))
# 形成相關(guān)的處理鏈表
dlist = [self._process_request(r, info) for r in requests]
dfd = DeferredList(dlist, consumeErrors=1)
# 上述的處理全部完成后的回調(diào)
return dfd.addCallback(self.item_completed, item, info)
# ...
我們知道管道類中處理 Item 的核心方法是 process_item()
,上面的 process_item()
方法先調(diào)用對(duì)象的 get_media_requests()
方法從輸入的 item 中獲取相應(yīng)的請(qǐng)求列表,然后在形成對(duì)應(yīng)的請(qǐng)求列表,處理請(qǐng)求的方法為:_process_request()
,最后所有的請(qǐng)求完成后會(huì)執(zhí)行對(duì)象的 item_completed()
方法。
# 源碼位置:scrapy/pipelines/media.py
# ...
class MediaPipeline:
# ...
def get_media_requests(self, item, info):
"""Returns the media requests to download"""
pass
# ...
這個(gè) get_media_requests()
需要在后續(xù)的繼承類中實(shí)現(xiàn)。接下來(lái)看處理下載請(qǐng)求的方法:
# 源碼位置:scrapy/pipelines/media.py
# ...
class MediaPipeline:
# ...
def _process_request(self, request, info):
# 每個(gè)請(qǐng)求計(jì)算一個(gè)指紋,以保證后面不重復(fù)請(qǐng)求
fp = request_fingerprint(request)
# 請(qǐng)求回調(diào)
cb = request.callback or (lambda _: _)
# 請(qǐng)求錯(cuò)誤回調(diào)
eb = request.errback
request.callback = None
request.errback = None
# 如果已經(jīng)請(qǐng)求過(guò)了,直接取緩存的結(jié)果
if fp in info.downloaded:
return defer_result(info.downloaded[fp]).addCallbacks(cb, eb)
# Otherwise, wait for result
wad = Deferred().addCallbacks(cb, eb)
# 將請(qǐng)求的回調(diào)鏈加入對(duì)應(yīng)的請(qǐng)求key中
info.waiting[fp].append(wad)
# 檢查請(qǐng)求是否正在下載中,避免二次請(qǐng)求
if fp in info.downloading:
return wad
# 將請(qǐng)求加入正在下載的隊(duì)列
info.downloading.add(fp)
# 創(chuàng)建Deferred對(duì)象,對(duì)應(yīng)方法為self.media_to_download()
dfd = mustbe_deferred(self.media_to_download, request, info)
# 在self.media_to_download()方法處理完后回調(diào)self._check_media_to_download()方法
dfd.addCallback(self._check_media_to_download, request, info)
# 此外,再加入統(tǒng)一回調(diào)方法
dfd.addBoth(self._cache_result_and_execute_waiters, fp, info)
dfd.addErrback(lambda f: logger.error(
f.value, exc_info=failure_to_exc_info(f), extra={'spider': info.spider})
)
return dfd.addBoth(lambda _: wad) # it must return wad at last
# ...
上面請(qǐng)求的過(guò)程在注釋中已詳細(xì)說(shuō)明,這里處理下載請(qǐng)求主要涉及的方法為:self.media_to_download()
以及 self._check_media_to_download()
。我們繼續(xù)查看該方法的代碼:
# 源碼位置:scrapy/pipelines/media.py
# ...
class MediaPipeline:
# ...
# Overridable Interface
def media_to_download(self, request, info):
"""Check request before starting download"""
pass
def _check_media_to_download(self, result, request, info):
if result is not None:
return result
if self.download_func:
# this ugly code was left only to support tests. TODO: remove
dfd = mustbe_deferred(self.download_func, request, info.spider)
dfd.addCallbacks(
callback=self.media_downloaded, callbackArgs=(request, info),
errback=self.media_failed, errbackArgs=(request, info))
else:
self._modify_media_request(request)
# 將請(qǐng)求發(fā)給引擎模塊,調(diào)用download()方法下載網(wǎng)頁(yè)
dfd = self.crawler.engine.download(request, info.spider)
dfd.addCallbacks(
callback=self.media_downloaded, callbackArgs=(request, info),
errback=self.media_failed, errbackArgs=(request, info))
return dfd
# ...
可以看到 media_to_download()
方法也是在繼承類中需要重寫的,而 _check_media_to_download()
方法則是核心處理下載文件或者圖片的地方。該方法中首先判斷是否有傳入的 download_func()
方法用于下載網(wǎng)頁(yè),如果沒(méi)有則調(diào)用引擎模塊中的 download()
方法下載網(wǎng)頁(yè)數(shù)據(jù),成功后調(diào)用 media_downloaded()
方法,失敗則調(diào)用 media_failed()
方法。最后我們來(lái)看下 self._cache_result_and_execute_waiters()
方法,其內(nèi)容和邏輯比較簡(jiǎn)單,就是緩存請(qǐng)求的數(shù)據(jù)并將請(qǐng)求清除等待隊(duì)列:
# 源碼位置:scrapy/pipelines/media.py
# ...
class MediaPipeline:
# ...
def _cache_result_and_execute_waiters(self, result, fp, info):
if isinstance(result, Failure):
# minimize cached information for failure
result.cleanFailure()
result.frames = []
result.stack = None
context = getattr(result.value, '__context__', None)
if isinstance(context, StopIteration):
setattr(result.value, '__context__', None)
# 下載隊(duì)列中移除該請(qǐng)求
info.downloading.remove(fp)
# 緩存下載請(qǐng)求結(jié)果
info.downloaded[fp] = result
# 移除等待隊(duì)列中的該請(qǐng)求
for wad in info.waiting.pop(fp):
# 將原來(lái)請(qǐng)求的回調(diào)方法以及錯(cuò)誤回調(diào)方法,加入回調(diào)處理鏈
defer_result(result).chainDeferred(wad)
此時(shí),我們總結(jié)下 MediaPipeline
類的核心處理流程:
到此,MediaPipeline
類的核心方法我們已經(jīng)研究完畢,接下來(lái)開(kāi)始繼續(xù)學(xué)習(xí) MediaPipeline
這個(gè)類。注意到該類中并沒(méi)有 process_item()
方法,因此它直接繼承父類的 process_item()
方法。從 MediaPipeline
類中可知在 _check_media_to_download()
方法中會(huì)下載相應(yīng)的媒體文件,成功后會(huì)回調(diào) media_downloaded()
方法
# 源碼位置:scrapy/pipelines/files.py
# ...
class FilesPipeline(MediaPipeline):
# ...
def media_downloaded(self, response, request, info):
referer = referer_str(request)
if response.status != 200:
# 打印告警信息,下載失敗
# ...
raise FileException('download-error')
if not response.body:
# 打印告警信息,無(wú)下載內(nèi)容
# ...
raise FileException('empty-content')
status = 'cached' if 'cached' in response.flags else 'downloaded'
# 打印debug信息
self.inc_stats(info.spider, status)
try:
# 設(shè)置下載文件路徑
path = self.file_path(request, response=response, info=info)
# 將下載的內(nèi)容保存成本地文件
checksum = self.file_downloaded(response, request, info)
except FileException as exc:
# 打印異常信息
# ...
raise
except Exception as exc:
# 打印異常信息
# ...
raise FileException(str(exc))
return {'url': request.url, 'path': path, 'checksum': checksum, 'status': status}
# ...
從上面的代碼可知,在請(qǐng)求成功后,下載的內(nèi)容保存在 response.body
中,上面的代碼就是將該文件內(nèi)容保存成磁盤上的文件:
# 源碼位置:scrapy/pipelines/files.py
# ...
class FilesPipeline(MediaPipeline):
# ...
def file_downloaded(self, response, request, info):
# 生成文件保存路徑
path = self.file_path(request, response=response, info=info)
# 獲取字節(jié)流形式的下載內(nèi)容
buf = BytesIO(response.body)
checksum = md5sum(buf)
buf.seek(0)
# 持久化保存
self.store.persist_file(path, buf, info)
# 返回文件的md5值
return checksum
上面的代碼是不是已經(jīng)夠清楚了?最后文件內(nèi)容是 buf
,保存的方法是 self.store.persist_file(path, buf, info)
,該方法是支持將下載內(nèi)容保存成多種形式,比如保存到本地文件中、保存到 FTP 服務(wù)器上,甚至可以通過(guò) S3 接口保存到云存儲(chǔ)中。來(lái)看看保存成本地文件形式的代碼,其實(shí)和我們平時(shí)寫的文件操作一樣,都是 open()
方法打開(kāi)文件句柄,然后使用 wb
模式將內(nèi)容寫到文件中。
# 源碼位置:scrapy/pipelines/files.py
# ...
class FSFilesStore:
# ...
def persist_file(self, path, buf, info, meta=None, headers=None):
absolute_path = self._get_filesystem_path(path)
self._mkdir(os.path.dirname(absolute_path), info)
# 保存文件
with open(absolute_path, 'wb') as f:
f.write(buf.getvalue())
# ...
最后對(duì)于 ImagesPipeline
類,其基本處理流程不變,只不過(guò)最后的保存方式和普通文件管道不一樣,我們來(lái)看下面幾個(gè)方法:
# 源碼位置:scrapy/pipelines/images.py
# ...
class ImagesPipeline(FilesPipeline):
# ...
def file_downloaded(self, response, request, info):
return self.image_downloaded(response, request, info)
def image_downloaded(self, response, request, info):
checksum = None
for path, image, buf in self.get_images(response, request, info):
if checksum is None:
buf.seek(0)
checksum = md5sum(buf)
width, height = image.size
# 保存成圖片形式
self.store.persist_file(
path, buf, info,
meta={'width': width, 'height': height},
headers={'Content-Type': 'image/jpeg'})
return checksum
def get_images(self, response, request, info):
path = self.file_path(request, response=response, info=info)
# 下載的圖片內(nèi)容主體
orig_image = Image.open(BytesIO(response.body))
width, height = orig_image.size
if width < self.min_width or height < self.min_height:
raise ImageException("Image too small (%dx%d < %dx%d)" %
(width, height, self.min_width, self.min_height))
image, buf = self.convert_image(orig_image)
yield path, image, buf
for thumb_id, size in self.thumbs.items():
thumb_path = self.thumb_path(request, thumb_id, response=response, info=info)
thumb_image, thumb_buf = self.convert_image(image, size)
yield thumb_path, thumb_image, thumb_buf
def convert_image(self, image, size=None):
# 圖片轉(zhuǎn)換格式
# ...
if size:
image = image.copy()
image.thumbnail(size, Image.ANTIALIAS)
buf = BytesIO()
image.save(buf, 'JPEG')
return image, buf
至于上面的代碼細(xì)節(jié)限于篇幅就不再深究了,有興趣的可以課后去深入學(xué)習(xí),這里主要是使用了 Python 的一個(gè)專門用來(lái)處理圖片的第三方模塊:PIL
。掌握了該模塊的基本用法后,再看這些代碼就一目了然了,都是非常常規(guī)和基礎(chǔ)的代碼。
好了,本小節(jié)的內(nèi)容就到這里了。如果你能堅(jiān)持看到這里,在回過(guò)頭看看前面提出的問(wèn)題,是否在心里都已經(jīng)有了準(zhǔn)確的答案?所有的疑問(wèn)其實(shí)在看一遍源碼之后便會(huì)豁然開(kāi)朗,我們也能理解 Scrapy 中設(shè)置的參數(shù)的含義以及其作用,這些是我們后續(xù)深入定制化 Scrapy 框架的基礎(chǔ),一定要掌握。
3. 小結(jié)
本小節(jié)中我們先提出了 3 個(gè)問(wèn)題,然后帶著問(wèn)題進(jìn)入 Scrapy 的源碼尋找答案,在最后完整看完 Pipeline 的工作代碼后,在回過(guò)頭來(lái)看原來(lái)的問(wèn)題時(shí),答案已經(jīng)一目了然了。這種學(xué)習(xí)源碼的方式也是非常有效的,帶著問(wèn)題去看代碼。此外,我們沒(méi)有深究代碼細(xì)節(jié),主要是根據(jù)架構(gòu)圖的數(shù)據(jù)導(dǎo)向來(lái)學(xué)習(xí)源碼,課后也希望讀者能繼續(xù)深入這塊的代碼研究,提出問(wèn)題,然后解答問(wèn)題,最后完全掌握該模塊。