深入分析 crawl 命令的執(zhí)行過程
今天我們來跟蹤學(xué)習(xí) scrapy crawl spider_name
命令的執(zhí)行過程,從這個(gè)過程中我們將看到 Scrapy 的引擎模塊的作用。它是整個(gè) Scrapy 其他模塊共同的溝通主體,在 Scrapy 中處于核心模塊的地位,可以說 Scrapy 的引擎模塊就是它的大腦。本小節(jié)就讓我們一起來探索 Scrapy “大腦” 的奧秘吧 !
1. Crawler 類及其相關(guān)類
我們先簡單介紹下 Scrapy 中幾個(gè)常用的基礎(chǔ)類:Crawler、CrawlerRunner、CrawlerProcess
。這些是我們分析 Scrapy 源碼的基礎(chǔ)知識點(diǎn)。
1.1 Crawler 類
# 源碼位置:scrapy/crawler.py
# ...
class Crawler:
def __init__(self, spidercls, settings=None):
if isinstance(spidercls, Spider):
raise ValueError('The spidercls argument must be a class, not an object')
if isinstance(settings, dict) or settings is None:
settings = Settings(settings)
self.spidercls = spidercls
self.settings = settings.copy()
self.spidercls.update_settings(self.settings)
# 初始化一些屬性
# ...
# 日志格式化類
lf_cls = load_object(self.settings['LOG_FORMATTER'])
self.logformatter = lf_cls.from_crawler(self)
# 擴(kuò)展管理類
self.extensions = ExtensionManager.from_crawler(self)
self.settings.freeze()
self.crawling = False
# 爬蟲模塊
self.spider = None
# 引擎模塊
self.engine = None
@defer.inlineCallbacks
def crawl(self, *args, **kwargs):
if self.crawling:
raise RuntimeError("Crawling already taking place")
self.crawling = True
try:
self.spider = self._create_spider(*args, **kwargs)
self.engine = self._create_engine()
# 獲取初始下載請求
start_requests = iter(self.spider.start_requests())
# 調(diào)用引擎的open_spider()方法
yield self.engine.open_spider(self.spider, start_requests)
# 生成一個(gè)Deferred對象
yield defer.maybeDeferred(self.engine.start)
except Exception:
self.crawling = False
if self.engine is not None:
yield self.engine.close()
raise
def _create_spider(self, *args, **kwargs):
# 實(shí)例化Spider類
return self.spidercls.from_crawler(self, *args, **kwargs)
def _create_engine(self):
# 生成引擎實(shí)例
return ExecutionEngine(self, lambda _: self.stop())
@defer.inlineCallbacks
def stop(self):
"""Starts a graceful stop of the crawler and returns a deferred that is
fired when the crawler is stopped."""
if self.crawling:
self.crawling = False
yield defer.maybeDeferred(self.engine.stop)
Crawler 類的屬性比較豐富,方法較少,最核心的就是這個(gè) crawl()
方法了??梢钥吹剿鋵?shí)就完成了以下幾個(gè)動作:
- 獲取初始的 Request 請求;
- 調(diào)用引擎的 open_spider() 方法;
- 將引擎的 start() 方法添加到 Deferred 對象中;
1.2 CrawlerRunner
該類有比較詳細(xì)的注釋,源碼里對它的介紹如下:
這是一個(gè)簡便的輔助類,用于跟蹤、管理和運(yùn)行已設(shè)置中的爬蟲程序。
我們先來看該類的實(shí)例化方法:
# 源碼位置:scrapy/crawler.py
# ...
class CrawlerRunner:
# ...
@staticmethod
def _get_spider_loader(settings):
""" Get SpiderLoader instance from settings """
cls_path = settings.get('SPIDER_LOADER_CLASS')
# 導(dǎo)入spider loader類
loader_cls = load_object(cls_path)
excs = (DoesNotImplement, MultipleInvalid) if MultipleInvalid else DoesNotImplement
try:
verifyClass(ISpiderLoader, loader_cls)
except excs:
# 打印異常信息
# ...
return loader_cls.from_settings(settings.frozencopy())
def __init__(self, settings=None):
# scrapy全局配置
if isinstance(settings, dict) or settings is None:
settings = Settings(settings)
self.settings = settings
# 根據(jù)爬蟲名,加載相應(yīng)的Spdier類
self.spider_loader = self._get_spider_loader(settings)
# 用于保存所有的Crawler對象
self._crawlers = set()
# self._active用于保存調(diào)用Crawler.crawl()方法,返回的defer對象
self._active = set()
self.bootstrap_failed = False
self._handle_twisted_reactor()
注意比較重要的屬性值有:
self.settings
:保存了全局的配置信息;self.spider_loader
:用于根據(jù)相應(yīng)的爬蟲名加載對應(yīng)的 Spider 類;self._crawlers
:用于保存所有的 Crawler 對象,使用的是 python 中的集合類型;
接下來我們看看該類中幾個(gè)重要方法:
# 源碼位置:scrapy/crawler.py
# ...
class CrawlerRunner:
# ...
@property
def spiders(self):
# 打印告警信息
# ...
return self.spider_loader
def crawl(self, crawler_or_spidercls, *args, **kwargs):
if isinstance(crawler_or_spidercls, Spider):
# 拋出異常,不能是Spider對象,只能是Spider類或者Crawler對象
# ...
crawler = self.create_crawler(crawler_or_spidercls)
return self._crawl(crawler, *args, **kwargs)
def _crawl(self, crawler, *args, **kwargs):
self.crawlers.add(crawler)
# 調(diào)用crawl()方法生成一個(gè)Deferred對象
d = crawler.crawl(*args, **kwargs)
# 加入活躍爬蟲的集合
self._active.add(d)
def _done(result):
# 執(zhí)行完后的收尾工作
self.crawlers.discard(crawler)
self._active.discard(d)
self.bootstrap_failed |= not getattr(crawler, 'spider', None)
return result
# 將_done()方法加入成功和失敗的回調(diào)鏈中
return d.addBoth(_done)
def create_crawler(self, crawler_or_spidercls):
if isinstance(crawler_or_spidercls, Spider):
# 拋出異常,不能是Spider對象,只能是Spider類或者Crawler對象
# ...
if isinstance(crawler_or_spidercls, Crawler):
# 如果是Crawler實(shí)例
return crawler_or_spidercls
# 否則創(chuàng)建Crawler實(shí)例
return self._create_crawler(crawler_or_spidercls)
def _create_crawler(self, spidercls):
if isinstance(spidercls, str):
spidercls = self.spider_loader.load(spidercls)
# 返回Crawler實(shí)例
return Crawler(spidercls, self.settings)
上面的代碼量都比較少,比較容易理解。不用糾結(jié)代碼的細(xì)節(jié),把握代碼的作用即可,對于 Scrapy 框架有一個(gè)全貌即可。來簡單描述下上面的方法:
crawl()
:爬蟲開始執(zhí)行爬取動作,crawl
命令調(diào)用的入口方法;create_crawler()
:創(chuàng)建一個(gè) Crawler 實(shí)例;
在看看這個(gè)類剩余的幾個(gè)方法:
# 源碼位置:scrapy/crawler.py
# ...
class CrawlerRunner:
# ...
def stop(self):
return defer.DeferredList([c.stop() for c in list(self.crawlers)])
@defer.inlineCallbacks
def join(self):
while self._active:
yield defer.DeferredList(self._active)
def _handle_twisted_reactor(self):
if self.settings.get("TWISTED_REACTOR"):
verify_installed_reactor(self.settings["TWISTED_REACTOR"])
來簡單說明下上面幾個(gè)方法:
-
stop()
和join()
方法都是停止或者同步管理的爬蟲,返回一個(gè) Deferred 對象; -
_handle_twisted_reactor
:默認(rèn)配置中TWISTED_REACTOR
的值為 None。其校驗(yàn)安裝的 reactor 代碼也比較簡單,就是導(dǎo)入TWISTED_REACTOR
路徑對應(yīng)的類,然后判斷 twisted 中的 reactor 是否為該類的一個(gè)實(shí)例,不是就拋出異常:# 源碼位置:scrapy/utils/reactor.py # ... def verify_installed_reactor(reactor_path): from twisted.internet import reactor # 導(dǎo)入reactor_path位置的類 reactor_class = load_object(reactor_path) # 判斷twisted中的reactor是否為其實(shí)例 if not isinstance(reactor, reactor_class): # 拋出異常 # ...
到此,這個(gè)類的相關(guān)屬性及方法就介紹完畢了,接下來我們學(xué)習(xí)它的一個(gè)子類:CrawlerProcess
。
1.3 CrawlerProcess
該類用于在一個(gè)進(jìn)程中同時(shí)運(yùn)行多個(gè) Scrapy 爬蟲,它繼承自上面的 CrawlerRunner
類。先學(xué)習(xí)其 __init__()
方法:
# 源碼位置:scrapy/crawler.py
# ...
class CrawlerProcess(CrawlerRunner):
def __init__(self, settings=None, install_root_handler=True):
# 調(diào)用父類的__init__()方法
super(CrawlerProcess, self).__init__(settings)
# 給一些信號設(shè)置相應(yīng)的回調(diào)方法
install_shutdown_handlers(self._signal_shutdown)
# 初始化scrapy中默認(rèn)的日志配置
configure_logging(self.settings, install_root_handler)
# 打印一些基本信息
log_scrapy_info(self.settings)
def _signal_shutdown(self, signum, _):
from twisted.internet import reactor
# 注冊信號回調(diào)方法
install_shutdown_handlers(self._signal_kill)
signame = signal_names[signum]
# ...
reactor.callFromThread(self._graceful_stop_reactor)
def _signal_kill(self, signum, _):
from twisted.internet import reactor
install_shutdown_handlers(signal.SIG_IGN)
signame = signal_names[signum]
logger.info('Received %(signame)s twice, forcing unclean shutdown',
{'signame': signame})
reactor.callFromThread(self._stop_reactor)
# ...
可以看到 CrawlerProcess
類對象的初始化方法中先是調(diào)用父類的 __init__()
初始化相關(guān)屬性,接著注冊信號的回調(diào)方法,主要是終止信號。我們來看看這個(gè) install_shutdown_handlers()
方法的代碼:
# 源碼位置:scrapy/utils/ossignal.py
# ...
def install_shutdown_handlers(function, override_sigint=True):
"""Install the given function as a signal handler for all common shutdown
signals (such as SIGINT, SIGTERM, etc). If override_sigint is ``False`` the
SIGINT handler won't be install if there is already a handler in place
(e.g. Pdb)
"""
from twisted.internet import reactor
reactor._handleSignals()
# 注冊SIGTERM信號的回調(diào)
signal.signal(signal.SIGTERM, function)
if signal.getsignal(signal.SIGINT) == signal.default_int_handler or \
override_sigint:
signal.signal(signal.SIGINT, function)
# Catch Ctrl-Break in windows
if hasattr(signal, 'SIGBREAK'):
# 注冊SIGBREAK信號的回調(diào)
signal.signal(signal.SIGBREAK, function)
可以看到,上面的代碼就是一些注冊信號的回調(diào)方法。接下來我們來看 CrawlerRunner
類中比較重要的幾個(gè)方法:
# 源碼位置:scrapy/crawler.py
# ...
class CrawlerProcess(CrawlerRunner):
# ...
def start(self, stop_after_crawl=True):
from twisted.internet import reactor
if stop_after_crawl:
d = self.join()
# Don't start the reactor if the deferreds are already fired
if d.called:
return
d.addBoth(self._stop_reactor)
resolver_class = load_object(self.settings["DNS_RESOLVER"])
resolver = create_instance(resolver_class, self.settings, self, reactor=reactor)
resolver.install_on_reactor()
tp = reactor.getThreadPool()
tp.adjustPoolsize(maxthreads=self.settings.getint('REACTOR_THREADPOOL_MAXSIZE'))
reactor.addSystemEventTrigger('before', 'shutdown', self.stop)
reactor.run(installSignalHandlers=False) # blocking call
def _graceful_stop_reactor(self):
# 先調(diào)用stop()方法停止爬蟲進(jìn)程
d = self.stop()
# 最后回調(diào)停止reactor的方法
d.addBoth(self._stop_reactor)
return d
def _stop_reactor(self, _=None):
from twisted.internet import reactor
try:
reactor.stop()
except RuntimeError: # raised if already stopped or in shutdown stage
pass
# ...
來看這個(gè) start()
方法,其中比較重要的就是調(diào)用了 reactor.run()
方法。_graceful_stop_reactor()
和 _stop_reactor()
從名字上就可以看出它們是關(guān)于停止 reactor 的方法。
2. crawl 命令追蹤
現(xiàn)在我們要追蹤前面多次使用的 scrapy crawl xxxx
命令的執(zhí)行過程。根據(jù)前面的學(xué)習(xí)成果,我們知道 crawl
命令會調(diào)用 scrapy/commands/crawl.py
文件中 Command
類的 run()
方法:
# 源碼位置:scrapy/commands/crawl.py
# ...
class Command(BaseRunSpiderCommand):
# ...
def run(self, args, opts):
# 沒帶參數(shù)或者帶多了參數(shù),拋出異常
# ...
# 獲取爬蟲名
spname = args[0]
# 核心是調(diào)用self.crawler_process執(zhí)行爬取動作
crawl_defer = self.crawler_process.crawl(spname, **opts.spargs)
if getattr(crawl_defer, 'result', None) is not None and issubclass(crawl_defer.result.type, Exception):
# 如果結(jié)果異常或者沒有結(jié)果,設(shè)置退出碼為1
self.exitcode = 1
else:
# 調(diào)用self.crawler_process的start()方法
self.crawler_process.start()
if self.crawler_process.bootstrap_failed or \
(hasattr(self.crawler_process, 'has_exception') and self.crawler_process.has_exception):
self.exitcode = 1
首先我們從早先的調(diào)用代碼中可以知道 self.crawler_process
其實(shí)是 CrawlerProcess
類的一個(gè)實(shí)例:
# 源碼位置:scrapy/cmdline.py
# ...
def execute(argv=None, settings=None):
# ...
# 核心,設(shè)置command類的核心處理類
cmd.crawler_process = CrawlerProcess(settings)
# ...
這樣命令的核心執(zhí)行方法其實(shí)就是調(diào)用 CrawlerProcess
對象的 crawl()
方法。我們從一小節(jié)的學(xué)習(xí)中可知:CrawlerProcess
對象的 crawl()
方法其實(shí)是父類 (CrawlerRunner
) 中的 crawl()
方法,且其調(diào)用的核心就兩行語句:
crawler = self.create_crawler(crawler_or_spidercls)
return self._crawl(crawler, *args, **kwargs)
這個(gè) crawler 是 Crawler
類的一個(gè)實(shí)例,最后在調(diào)用 self._crawl()
方法時(shí),通過代碼可知該方法其實(shí)是調(diào)用 Crawler
對象的 crawl()
方法開始執(zhí)行 Scrapy 爬蟲程序:
# 源碼位置:scrapy/crawler.py
# ...
class CrawlerRunner:
# ...
def _crawl(self, crawler, *args, **kwargs):
# ...
d = crawler.crawl(*args, **kwargs)
# ...
到此,我們知道 crawl
命令執(zhí)行的核心方法其實(shí)是 Crawler
對象中的 crawl()
方法。 接下來,我們就是要從這個(gè) crawl()
方法入手。
前面我們已經(jīng)介紹過 crawl()
方法,該方法會先根據(jù)傳入的爬蟲名來創(chuàng)建 Spider 實(shí)例以及 Engine 實(shí)例:
self.spider = self._create_spider(*args, **kwargs)
self.engine = self._create_engine()
接著是得到起始的爬取 URL:
start_requests = iter(self.spider.start_requests())
這部分調(diào)用的正是 Spider 對象的 start_requests()
方法獲取初始的 URLs,我們也可以在自定義的爬蟲中通過重寫該方法來獲取自定義的初始 URLs。
在接下來就是兩個(gè) yield 語句,這兩句也是核心:
yield self.engine.open_spider(self.spider, start_requests)
yield defer.maybeDeferred(self.engine.start)
這樣子,我們的跟蹤目標(biāo)就要轉(zhuǎn)入到 Scrapy 源碼的核心目錄下了,其調(diào)用的是引擎模塊中的兩個(gè)方法,位于我們之前接觸的 engine.py
文件中。來分別看著兩個(gè)方法的代碼:
# 源碼位置: scrapy/core/engine.py
# ...
class ExecutionEngine:
# ...
@defer.inlineCallbacks
def open_spider(self, spider, start_requests=(), close_if_idle=True):
if not self.has_capacity():
raise RuntimeError("No free spider slot when opening %r" % spider.name)
logger.info("Spider opened", extra={'spider': spider})
# 獲取CallLaterOnce的實(shí)例,延遲調(diào)用一次
nextcall = CallLaterOnce(self._next_request, spider)
# 獲取調(diào)度器
scheduler = self.scheduler_cls.from_crawler(self.crawler)
# 通過Spider中間件后,獲取初始的urls,對應(yīng)著數(shù)據(jù)流圖的1
start_requests = yield self.scraper.spidermw.process_start_requests(start_requests, spider)
slot = Slot(start_requests, close_if_idle, nextcall, scheduler)
self.slot = slot
self.spider = spider
# 打開調(diào)度器
yield scheduler.open(spider)
yield self.scraper.open_spider(spider)
# 統(tǒng)計(jì)用的
self.crawler.stats.open_spider(spider)
yield self.signals.send_catch_log_deferred(signals.spider_opened, spider=spider)
# 延遲調(diào)用self._next_request()方法,內(nèi)部使用reactor.callLater()方法實(shí)現(xiàn)
slot.nextcall.schedule()
slot.heartbeat.start(5)
看上面的代碼,在 open_spider()
方法中我們可以看到,在我們調(diào)用的 Spider 對象中產(chǎn)生的起始請求在經(jīng)過 Spider 中間件處理后再給回引擎模塊;接著我們還會打開調(diào)度器以及爬取開關(guān) (self.scraper.open_spider()
) 和統(tǒng)計(jì)開關(guān) (self.crawler.stats.open_spider()
)。
緊接著就是內(nèi)部使用 reactor.callLater() 方法實(shí)現(xiàn)對 self._next_request()
方法的延遲調(diào)用:
# 源碼位置:scrapy/utils/reactor.py
# ...
class CallLaterOnce:
"""Schedule a function to be called in the next reactor loop, but only if
it hasn't been already scheduled since the last time it ran.
"""
def __init__(self, func, *a, **kw):
self._func = func
self._a = a
self._kw = kw
self._call = None
def schedule(self, delay=0):
'''設(shè)置延遲調(diào)用'''
from twisted.internet import reactor
if self._call is None:
self._call = reactor.callLater(delay, self)
def cancel(self):
if self._call:
self._call.cancel()
def __call__(self):
# 在對象調(diào)用后,self._call設(shè)置為空,并調(diào)用方法
self._call = None
return self._func(*self._a, **self._kw)
# ...
看完上面的代碼后,我們可知接下來就要進(jìn)入 self._next_request()
方法中執(zhí)行。繼續(xù)來看該方法的代碼:
# 源碼位置:scrapy/core/engine.py
# ...
class ExecutionEngine:
# ...
def _next_request(self, spider):
slot = self.slot
# 不正確的情況,直接return
# ...
while not self._needs_backout(spider):
"""爬蟲不需要返回,然后會從調(diào)度器獲取下一個(gè)請求"""
if not self._next_request_from_scheduler(spider):
break
if slot.start_requests and not self._needs_backout(spider):
try:
request = next(slot.start_requests)
except StopIteration:
slot.start_requests = None
except Exception:
slot.start_requests = None
logger.error('Error while obtaining start requests',
exc_info=True, extra={'spider': spider})
else:
self.crawl(request, spider)
if self.spider_is_idle(spider) and slot.close_if_idle:
self._spider_idle(spider)
注意:一開始的調(diào)度器中并沒有請求的 URLs,因此 _next_request()
方法每次調(diào)用時(shí)會從 start_requests
中獲取請求,然后調(diào)用 self.crawl()
方法。我們來看看具體的 self.crawl()
方法的代碼:
# 源碼位置:scrapy/core/engine.py
# ...
class ExecutionEngine:
# ...
def crawl(self, request, spider):
if spider not in self.open_spiders:
raise RuntimeError("Spider %r not opened when crawling: %s" % (spider.name, request))
# 將請求加入調(diào)度器隊(duì)列
self.schedule(request, spider)
# 繼續(xù)調(diào)用self._next_request()方法
self.slot.nextcall.schedule()
def schedule(self, request, spider):
self.signals.send_catch_log(signals.request_scheduled, request=request, spider=spider)
# 這里就是簡單將請求壓入調(diào)度
if not self.slot.scheduler.enqueue_request(request):
self.signals.send_catch_log(signals.request_dropped, request=request, spider=spider)
這個(gè)代碼就非常清楚了,self.crawl()
方法并不直接下載對應(yīng) url 的網(wǎng)頁,而是將其推入調(diào)度器的下載隊(duì)列,然后再調(diào)回 self._next_request()
方法 (對應(yīng)著就是 self.slot.nextcall.schedule()
這一句)。而真正執(zhí)行下載任務(wù)的代碼為:
while not self._needs_backout(spider):
"""爬蟲不需要返回,然后會從調(diào)度器獲取下一個(gè)請求"""
if not self._next_request_from_scheduler(spider):
break
這段代碼會循環(huán)從調(diào)度器中獲取下一個(gè)請求,如果沒有則跳出循環(huán)往下執(zhí)行。如果有請求,則會執(zhí)行 self._download()
方法去下載對應(yīng)的網(wǎng)頁,而這個(gè)方法正是在第24節(jié)中介紹過的下載網(wǎng)頁的入口。
另外,我們還剩下一個(gè) yield 沒介紹,就是引擎的啟動:yield defer.maybeDeferred(self.engine.start)
。該方法比較簡單,就是記錄下起始時(shí)間以及設(shè)置 self.running
標(biāo)志:
# 源碼位置:scrapy/core/engine.py
# ...
class ExecutionEngine:
# ...
@defer.inlineCallbacks
def start(self):
"""Start the execution engine"""
if self.running:
raise RuntimeError("Engine already running")
# 記錄引擎開始執(zhí)行時(shí)間
self.start_time = time()
# 發(fā)送引擎執(zhí)行信號
yield self.signals.send_catch_log_deferred(signal=signals.engine_started)
# 設(shè)置running標(biāo)識
self.running = True
self._closewait = defer.Deferred()
yield self._closewait
來總結(jié)下 crawl
命令的執(zhí)行流程:
對于 crawl
命令的追蹤到這里就結(jié)束了。當(dāng)然,這里我們還有一些內(nèi)容沒有介紹到,比如調(diào)度器如何實(shí)現(xiàn)請求的入隊(duì)、出隊(duì)等,再比如 Spider 模塊和 Engine 模塊的交互過程等,這些就留給讀者課后繼續(xù)探索了。
3. 小結(jié)
本小節(jié)中我們主要介紹了 scrapy/crawler.py
文件中的代碼以及追蹤了 scrapy crawl xxx
命令的執(zhí)行過程。通過追蹤這個(gè)命令,我們可以了解 Scrapy 爬蟲的整個(gè)運(yùn)行流程,回過頭來在看 Scrapy 的架構(gòu)圖以及數(shù)據(jù)流圖,是不是又多了一份理解與認(rèn)識?當(dāng)然,對于 Scrapy 框架源碼我們還有許多代碼沒有介紹到,比如 scrapy 的擴(kuò)展模塊 (scrapy/extensions
)、鏈接抽取模塊 (scrapy/linkextractors
)。這些部分并不屬于 Scrapy 的核心部分,也不是學(xué)習(xí) Scrapy 的重點(diǎn),讀者有興趣可以課后認(rèn)真研究這些模塊的代碼,以增強(qiáng)對 Scrapy 框架的認(rèn)識。好了,整個(gè) Scrapy 框架的的介紹到此就結(jié)束了,我們青山不改,江湖再會!