深入理解 Scrapy 中間件
本小節(jié)我們來深入學習 Scrapy 中間件相關的代碼,這里會涉及兩個部分,分別是下載中間件和 Spider 中間件。我們會詳細介紹到每個中間件模塊的執(zhí)行流程,找出相關代碼進行分析。
1. Scrapy 中的中間件管理類
在源碼的 scrapy/middleware.py
文件中,定義了一個所有中間件管理類的基類:MiddlewareManager。我們首先來仔細分析下該類的代碼。
__init__.py
:類初始化;該類的初始化將傳入相應的中間件類,一般是列表的形式;
# ...
class MiddlewareManager:
"""Base class for implementing middleware managers"""
component_name = 'foo middleware'
def __init__(self, *middlewares):
# 中間件類
self.middlewares = middlewares
# 中間件的方法
self.methods = defaultdict(deque)
# 添加中間件
for mw in middlewares:
self._add_middleware(mw)
_get_mwlist_from_settings()
: 所有繼承該中間件管理類的子類需要實現(xiàn)該方法,用于從 settings.py 中獲取該中間件的列表;
# ...
class MiddlewareManager:
"""Base class for implementing middleware managers"""
# ...
@classmethod
def _get_mwlist_from_settings(cls, settings):
raise NotImplementedError
from_settings()
:從 settings.py 配置中實例化中間件管理類;
# ...
class MiddlewareManager:
"""Base class for implementing middleware managers"""
@classmethod
def from_settings(cls, settings, crawler=None):
# 從配置中獲取相應設置的中間件,比如設置的下載中間件、spider中間件列表等
mwlist = cls._get_mwlist_from_settings(settings)
middlewares = []
enabled = []
for clspath in mwlist:
try:
# 加載完整的模塊路徑,得到具體的模塊
mwcls = load_object(clspath)
# 創(chuàng)建中間件實例
mw = create_instance(mwcls, settings, crawler)
# 添加到middles中
middlewares.append(mw)
# 用于后續(xù)info打印,表明啟用的中間件位置路徑
enabled.append(clspath)
except NotConfigured as e:
if e.args:
clsname = clspath.split('.')[-1]
logger.warning("Disabled %(clsname)s: %(eargs)s",
{'clsname': clsname, 'eargs': e.args[0]},
extra={'crawler': crawler})
# 打印啟用的中間件信息
logger.info("Enabled %(componentname)ss:\n%(enabledlist)s",
{'componentname': cls.component_name,
'enabledlist': pprint.pformat(enabled)},
extra={'crawler': crawler})
# 使用從配置中獲取的中間件列表實例化該中間件管理類
return cls(*middlewares)
from_crawler()
: 根據(jù) Crawler 類對象來實例化中間件管理器類;利用的就是 Crawler 對象的 settings 屬性值,然后調(diào)用 MiddlewareManager
類的 from_settings()
方法進行實例化;
# ...
class MiddlewareManager:
"""Base class for implementing middleware managers"""
# ...
def from_crawler(cls, crawler):
return cls.from_settings(crawler.settings, crawler)
_add_middleware()
:添加所管理中間件類中的方法,主要是 open_spider()
和 close_spider()
方法;在這里會更新 self.methods
屬性值;
# ...
class MiddlewareManager:
"""Base class for implementing middleware managers"""
# ...
def _add_middleware(self, mw):
if hasattr(mw, 'open_spider'):
self.methods['open_spider'].append(mw.open_spider)
if hasattr(mw, 'close_spider'):
self.methods['close_spider'].appendleft(mw.close_spider)
其他方法:下面的這些方法都是我們在前面 scrapy/utils/defer.py
中介紹過的,在此就不詳細展開了;
# ...
class MiddlewareManager:
"""Base class for implementing middleware managers"""
# ...
def _process_parallel(self, methodname, obj, *args):
return process_parallel(self.methods[methodname], obj, *args)
def _process_chain(self, methodname, obj, *args):
# 執(zhí)行回調(diào)鏈,依次執(zhí)行,返回一個Deferred對象
return process_chain(self.methods[methodname], obj, *args)
def _process_chain_both(self, cb_methodname, eb_methodname, obj, *args):
return process_chain_both(self.methods[cb_methodname],
self.methods[eb_methodname], obj, *args)
def open_spider(self, spider):
return self._process_parallel('open_spider', spider)
def close_spider(self, spider):
return self._process_parallel('close_spider', spider)
以上就是管理中間件類的全部屬性和方法。下面我們將具體看由該管理類衍生出來的下載中間件管理類,同時會重點分析下載中間件以及 Spider 中間件的執(zhí)行流程。
2. 下載中間件執(zhí)行流程
我們現(xiàn)在來梳理下載中間件這個模塊的執(zhí)行流程。注意在 scrapy/downloadmiddlewares
目錄下的代碼并沒有下載中間件的執(zhí)行相關代碼,該目錄下是一系列定義好的內(nèi)置下載中間件,大部分默認是啟用的。這里的代碼并不是我們想要的,那么和下載中間件執(zhí)行流程相關的代碼究竟在哪呢?
首先回過來頭看下前面描述 Scrapy 的架構圖,可知下載中間件位于引擎和下載器之間。上一節(jié)中我們介紹了下載器類 (Downloader) ,其中有一個屬性 self.middleware
,如下圖所示。
該屬性值是下載中間件管理器類 (DownloaderMiddlewareManager) 的一個實例,某種意義上來說它是連接下載模塊和下載中間件模塊之間的橋梁。我們繼續(xù)研究下這個中間件管理器類,很明顯它應該繼承自上面介紹的中間件管理器類,事實也是如此:
# 源碼位置:scrapy/core/downloader/middleware.py
# ...
class DownloaderMiddlewareManager(MiddlewareManager):
component_name = 'downloader middleware'
@classmethod
def _get_mwlist_from_settings(cls, settings):
return build_component_list(
settings.getwithbase('DOWNLOADER_MIDDLEWARES'))
def _add_middleware(self, mw):
if hasattr(mw, 'process_request'):
self.methods['process_request'].append(mw.process_request)
if hasattr(mw, 'process_response'):
self.methods['process_response'].appendleft(mw.process_response)
if hasattr(mw, 'process_exception'):
self.methods['process_exception'].appendleft(mw.process_exception)
# ...
上面的兩個方法非常容易理解,也透露了一些信息。對于下載中間件對象,中間件管理器主要提取的是中間件對象中的三個方法:process_request()
、process_response()
以及 process_exception()
。注意這里方法進入隊列的順序,這也關系到框架對這些方法的調(diào)用順序。在管理器類中還有一個非常重要的 download()
方法,該方法決定了上面三個方法返回不同值時的處理方案,同時也會將下載中間件中的三個方法按照相應的順序添加到對應的回調(diào)鏈中:
# 源碼位置:scrapy/core/downloader/middleware.py
# ...
class DownloaderMiddlewareManager(MiddlewareManager):
# ...
def download(self, download_func, request, spider):
@defer.inlineCallbacks
def process_request(request):
# 依次遍歷下載中間件的process_request()方法,處理請求
for method in self.methods['process_request']:
response = yield deferred_from_coro(method(request=request, spider=spider))
if response is not None and not isinstance(response, (Response, Request)):
# 返回非Request或Response類型,拋出異常
# ...
if response:
return response
# 最后將請求傳給下載器執(zhí)行下載
return (yield download_func(request=request, spider=spider))
@defer.inlineCallbacks
def process_response(response):
# 處理下載的響應
if response is None:
raise TypeError("Received None in process_response")
elif isinstance(response, Request):
return response
for method in self.methods['process_response']:
response = yield deferred_from_coro(method(request=request, response=response, spider=spider))
if not isinstance(response, (Response, Request)):
# 返回非Request或Response類型,拋出異常
# ...
if isinstance(response, Request):
# 如果返回Request,則直接返回,后續(xù)的中間件的process_response()不處理
return response
# 最后返回響應結(jié)果
return response
@defer.inlineCallbacks
def process_exception(failure):
exception = failure.value
for method in self.methods['process_exception']:
response = yield deferred_from_coro(method(request=request, exception=exception, spider=spider))
if response is not None and not isinstance(response, (Response, Request)):
# 返回非Request或Response類型,拋出異常
# ...
if response:
return response
return failure
# 調(diào)用請求,同時將process_request()依次加入回調(diào)鏈中并返回一個Deferred對象
deferred = mustbe_deferred(process_request, request)
# 異?;卣{(diào)
deferred.addErrback(process_exception)
# 響應回調(diào)
deferred.addCallback(process_response)
return deferred
這個 download()
方法非常重要,也有些難以理解。注意一點:該方法主要是形成一個完整的下載鏈路,包括請求鏈 (process request chain)、下載請求 (在 download() 方法的 download_func 參數(shù))、響應處理鏈 (process response chain),另外還加上一個請求異常的回調(diào)鏈。來看看我們對這個過程的一個總結(jié)圖:
注意,中間件管理器會和引擎模塊以及下載模塊之間有交互,那么它們之間發(fā)生交互的代碼是哪一句呢?這里就不賣關子和追蹤了,我們直接給出答案:
下載中間件管理對象和下載器的交互就在于 download()
方法中傳進來的 download_func 參數(shù)。我們在下載器中找到如下代碼:
# ...
class Downloader:
# ...
def fetch(self, request, spider):
def _deactivate(response):
self.active.remove(request)
return response
self.active.add(request)
# 調(diào)用下載中間件管理對象的download()方法
dfd = self.middleware.download(self._enqueue_request, request, spider)
return dfd.addBoth(_deactivate)
這里調(diào)用的下載中間件管理對象 download()
中的 download_func 參數(shù)為 self._enqueue_request
,而我們在上一節(jié)中正好介紹過該方法正好是下載器中下載網(wǎng)頁的起始方法。
另一方面,引擎模塊和該下載中間件管理器類的交互也正是通過下載器的這個 fetch()
方法。我們直接找出相關的代碼語句:
# 源碼位置:scrapy/core/engine.py
# ...
class ExecutionEngine:
# ...
def _download(self, request, spider):
# ...
# 這里的self.downloader就是下載器對象,調(diào)用fetch()方法下載網(wǎng)頁
dwld = self.downloader.fetch(request, spider)
dwld.addCallbacks(_on_success)
dwld.addBoth(_on_complete)
return dwld
總的來說,我們可以得到如下的一個調(diào)用過程:
到目前為止,我們對下載中間件的執(zhí)行流程進行了一個簡要的概述,但沒深究其中的代碼細節(jié)。如果有興趣的話可以仔細跟蹤下代碼的執(zhí)行過程,這些并不復雜,主要是對 Twisted 模塊的應用。
3. Spider 中間件執(zhí)行流程
上面我們看到了下載中間件的管理類,那么這個 Spider 中間件有沒有相應的管理類呢?答案是肯定的,這個類也在 scrapy 的核心目錄 (core) 下的 spidermw.py
文件:
# 源碼位置:scrapy/core/spidermw.py
# ...
class SpiderMiddlewareManager(MiddlewareManager):
component_name = 'spider middleware'
@classmethod
def _get_mwlist_from_settings(cls, settings):
return build_component_list(settings.getwithbase('SPIDER_MIDDLEWARES'))
def _add_middleware(self, mw):
super(SpiderMiddlewareManager, self)._add_middleware(mw)
if hasattr(mw, 'process_spider_input'):
self.methods['process_spider_input'].append(mw.process_spider_input)
if hasattr(mw, 'process_start_requests'):
self.methods['process_start_requests'].appendleft(mw.process_start_requests)
process_spider_output = getattr(mw, 'process_spider_output', None)
self.methods['process_spider_output'].appendleft(process_spider_output)
process_spider_exception = getattr(mw, 'process_spider_exception', None)
self.methods['process_spider_exception'].appendleft(process_spider_exception)
def scrape_response(self, scrape_func, response, request, spider):
# ...
def process_start_requests(self, start_requests, spider):
return self._process_chain('process_start_requests', start_requests, spider)
有了前面下載中間件管理器類的剖析經(jīng)驗,我們可以很容易知道這些方法的作用:
_get_mwlist_from_settings()
:從全局配置中獲取定義的 Spider 中間件,包括 settings.py 和 default_settings.py 中的配置;_add_middleware()
:將各個 Spider 中間件中的相關處理方法添加到對應的self.methods
中,和前面的下載中間件管理器類一樣;scrape_response()
方法類似于前面的下載中間件管理器類的download()
方法,它是連接 Spdier 中間件和其他模塊的橋梁,我們后面詳細介紹其實現(xiàn)細節(jié);process_start_requests()
是處理初始請求的方法;
我們來回顧下第8節(jié)的 Scrapy 框架的數(shù)據(jù)流圖,如下:
可以看到 Spider 中間件連接的是 Spider 模塊和引擎模塊,引擎將下載器獲得的 Response 結(jié)果交給 Spdier 模塊解析,可以返回 ITEMS 類型或者 Requests 類型。有了這個圖,我們就可以來代碼中去搜索相關代碼:
# 源碼位置:scrapy/core/engine.py
# ...
class ExecutionEngine:
# ...
def _handle_downloader_output(self, response, request, spider):
if not isinstance(response, (Request, Response, Failure)):
raise TypeError(
"Incorrect type: expected Request, Response or Failure, got %s: %r"
% (type(response), response)
)
# downloader middleware can return requests (for example, redirects)
if isinstance(response, Request):
self.crawl(response, spider)
return
# response is a Response or Failure
d = self.scraper.enqueue_scrape(response, request, spider)
d.addErrback(lambda f: logger.error('Error while enqueuing downloader output',
exc_info=failure_to_exc_info(f),
extra={'spider': spider}))
return d
從引擎的類定義中我們可以很快定位到上面的方法:_handle_downloader_output()
該方法正如其方法名,用于處理下載器的結(jié)果,也就是上面數(shù)據(jù)流圖的第5部分。對于下載器部分的返回結(jié)果必須是 Request、Response 或者 Failure 三者之一,否則直接拋異常;此外,如果返回結(jié)果是 Request 類型,則調(diào)用 self.crawl()
方法繼續(xù)請求。對于 Response 類型的處理就是調(diào)用 self.scraper.enqueue_scrape()
方法處理。這里就要涉及到 Scrapy 中定義的 Scraper 類了,它的實現(xiàn)位于 scrapy/core/scraper.py
文件中。我們來看相關的代碼段:
# 源碼位置:scrapy/core/scraper.py
# ...
class Scraper:
def __init__(self, crawler):
# ...
# 獲取Spider中間件管理器對象
self.spidermw = SpiderMiddlewareManager.from_crawler(crawler)
# ...
# ...
def enqueue_scrape(self, response, request, spider):
slot = self.slot
dfd = slot.add_response_request(response, request)
def finish_scraping(_):
slot.finish_response(response, request)
self._check_if_closing(spider, slot)
self._scrape_next(spider, slot)
return _
dfd.addBoth(finish_scraping)
# ...
self._scrape_next(spider, slot)
return dfd
def _scrape_next(self, spider, slot):
while slot.queue:
response, request, deferred = slot.next_response_request_deferred()
self._scrape(response, request, spider).chainDeferred(deferred)
def _scrape(self, response, request, spider):
"""Handle the downloaded response or failure through the spider
callback/errback"""
# ...
# 這里就是經(jīng)過Spider中間件后,調(diào)用Spider模塊去處理結(jié)果
dfd = self._scrape2(response, request, spider) # returns spider's processed output
# 加入回調(diào),處理spider結(jié)果成功和失敗的回調(diào)
dfd.addErrback(self.handle_spider_error, request, response, spider)
dfd.addCallback(self.handle_spider_output, request, response, spider)
return dfd
def _scrape2(self, request_result, request, spider):
"""Handle the different cases of request's result been a Response or a
Failure"""
if not isinstance(request_result, Failure):
# 下載請求成功,調(diào)用Spider中間件管理對象的scrape_response()方法
return self.spidermw.scrape_response(
self.call_spider, request_result, request, spider)
else:
dfd = self.call_spider(request_result, request, spider)
return dfd.addErrback(
self._log_download_errors, request_result, request, spider)
# ...
def handle_spider_output(self, result, request, response, spider):
if not result:
return defer_succeed(None)
it = iter_errback(result, self.handle_spider_error, request, response, spider)
dfd = parallel(it, self.concurrent_items, self._process_spidermw_output,
request, response, spider)
return dfd
def _process_spidermw_output(self, output, request, response, spider):
"""Process each Request/Item (given in the output parameter) returned
from the given spider
"""
if isinstance(output, Request):
# 如歸spider模塊返回的是Request,則繼續(xù)請求
self.crawler.engine.crawl(request=output, spider=spider)
elif is_item(output):
# 如果返回的是Item類型,則調(diào)用self.itemproc.process_item()方法處理
self.slot.itemproc_size += 1
dfd = self.itemproc.process_item(output, spider)
dfd.addBoth(self._itemproc_finished, output, response, spider)
return dfd
elif output is None:
pass
else:
typename = type(output).__name__
logger.error(
'Spider must return request, item, or None, got %(typename)r in %(request)s',
{'request': request, 'typename': typename},
extra={'spider': spider},
)
# ...
上面的代碼看似復雜,其實邏輯關系非常清晰明了,正如前面的數(shù)據(jù)流圖所畫。我們現(xiàn)在也用下面的圖來描述下函數(shù)的調(diào)用過程,方便大家更清楚的看到引擎、Spider 中間件以及 Spider 模塊三者之間的調(diào)用過程:
根據(jù)這個流程圖再回去看相應的代碼,是不是會有一些理解的感覺?現(xiàn)在關于 Spider 中間件還有最后一塊內(nèi)容沒搞定,就是 Spider 中間件如何與 Spider 模塊交互的呢?答案就是 Spider 中間件管理器的 scrape_response()
方法:
# ...
class SpiderMiddlewareManager(MiddlewareManager):
# ...
def scrape_response(self, scrape_func, response, request, spider):
# ...
# ...
重點關注該方法的第二個參數(shù):scrape_func
。通過前面的 _scrapy2()
可知調(diào)用 Spider 中間件管理器對象的 scrape_response()
方法時傳入的參數(shù)為:self.call_spider
。其實現(xiàn)如下:
# ...
class Scraper:
# ...
def call_spider(self, result, request, spider):
result.request = request
dfd = defer_result(result)
callback = request.callback or spider.parse
warn_on_generator_with_return_value(spider, callback)
warn_on_generator_with_return_value(spider, request.errback)
# 將spider對象中設置的解析網(wǎng)頁的函數(shù)加入回調(diào)鏈
dfd.addCallbacks(callback=callback,
errback=request.errback,
callbackKeywords=request.cb_kwargs)
return dfd.addCallback(iterate_spider_output)
# ...
這里的代碼非常關鍵,先是獲取 Request 請求設置的回調(diào)函數(shù),如果沒有則默認為 spider.parse
函數(shù);緊接著就是將 callback 方法添加到回調(diào)鏈中。我們的請求在通過 Spider 中間件處理之后會進入 Spider 模塊中定義的 callback 方法去處理 response。看到了么,這就是我們前面寫了那么多的處理下載結(jié)果的函數(shù),它的調(diào)用就在這里設置的。
到此,我們的下載中間件以及 Spider 中間件的整個代碼過程就算大致介紹完畢了,對于更多的代碼細節(jié),還需要讀者課后仔細研讀,期待日后 Scrapy 的源碼中能留下你們的足跡 !
4. 小結(jié)
本小節(jié)中,我們簡單分析了 Scrapy 框架中的下載中間件和 Spider 中間件的相關代碼,理清楚了之前介紹的實現(xiàn)中間件類所必要的屬性方法,對這些中間件類的編寫,我們是不是有了更清楚的認識呢?