Scrapy 的分布式實(shí)現(xiàn)
今天我們簡(jiǎn)單介紹下 Scrapy 的分布式實(shí)現(xiàn)框架:Scrapy-Redis 并基于該插件完成一個(gè)簡(jiǎn)單的分布式爬蟲案例。
1. 一個(gè)簡(jiǎn)單的分布式爬蟲案例
我們以前面的第16講的頭條熱點(diǎn)新聞爬蟲基礎(chǔ),使用 scrapy-redis 插件進(jìn)行改造,使之支持分布式爬取?,F(xiàn)在我們按照如下的步驟進(jìn)行。
環(huán)境準(zhǔn)備。由于條件限制,我們只有2臺(tái)云主機(jī),分別命名為 server 和 server2。兩臺(tái)主機(jī)的用途如下:
| 主機(jī) | 服務(wù) | 公網(wǎng) ip |
|---|---|---|
| server | scrapy爬蟲 | 180.76.152.113 |
| server2 | scrapy爬蟲、redis服務(wù) | 47.115.61.209 |
先準(zhǔn)備好 redis 服務(wù),redis 服務(wù)的搭建以及設(shè)置密碼等步驟在第一部分中已經(jīng)介紹過(guò)了,這里就不再重復(fù)介紹了;
[root@server ~]# redis-cli -h 47.115.61.209 -p 6777
47.115.61.209:6777> auth spyinx
OK
47.115.61.209:6777> get hello
"new world"
我們?cè)?server 和 server2 上都進(jìn)行測(cè)試,確保都能連上 server2 上的 redis 服務(wù)。
安裝 scrapy 和 scrapy-redis;
[root@server2 ~]# pip3 install scrapy scrapy-redis
# ...
改造 spider 代碼,將原先繼承的 Spider 類改為繼承 scrapy-redis 插件中的 RedisSpider,同時(shí)去掉 start_requests() 方法:
# from scrapy import Request, Spider
from scrapy_redis.spiders import RedisSpider
# ...
class HotnewsSpider(RedisSpider):
# ...
# 注釋start_requests()方法
# def start_requests(self):
# request_url = self._get_url(max_behot_time)
# self.logger.info(f"we get the request url : {request_url}")
# yield Request(request_url, headers=headers, cookies=cookies, callback=self.parse)
# ...
改造下原先的 pipelines.py 代碼,為了能實(shí)時(shí)將數(shù)據(jù)保存到數(shù)據(jù)庫(kù)中,我們挪動(dòng)下 SQL 語(yǔ)句 commit 的位置,同時(shí)去掉原先的郵件發(fā)送功能:
# Define your item pipelines here
#
# Don't forget to add your pipeline to the ITEM_PIPELINES setting
# See: https://docs.scrapy.org/en/latest/topics/item-pipeline.html
# useful for handling different item types with a single interface
import logging
from string import Template
from itemadapter import ItemAdapter
import pymysql
from toutiao_hotnews.mail import HtmlMailSender
from toutiao_hotnews.items import ToutiaoHotnewsItem
from toutiao_hotnews.html_template import hotnews_template_html
from toutiao_hotnews import settings
class ToutiaoHotnewsPipeline:
logger = logging.getLogger('pipelines_log')
def open_spider(self, spider):
# 初始化連接數(shù)據(jù)庫(kù)
self.db = pymysql.connect(
host=spider.settings.get('MYSQL_HOST', 'localhost'),
user=spider.settings.get('MYSQL_USER', 'root'),
password=spider.settings.get('MYSQL_PASS', '123456'),
port=spider.settings.get('MYSQL_PORT', 3306),
db=spider.settings.get('MYSQL_DB_NAME', 'mysql'),
charset='utf8'
)
self.cursor = self.db.cursor()
def process_item(self, item, spider):
# 插入sql語(yǔ)句
sql = "insert into toutiao_hotnews(title, abstract, source, source_url, comments_count, behot_time) values (%s, %s, %s, %s, %s, %s)"
if item and isinstance(item, ToutiaoHotnewsItem):
self.cursor.execute(sql, (item['title'], item['abstract'], item['source'], item['source_url'], item['comments_count'], item['behot_time']))
# 將commit語(yǔ)句移動(dòng)到這里
self.db.commit()
return item
def query_data(self, sql):
data = {}
try:
self.cursor.execute(sql)
data = self.cursor.fetchall()
except Exception as e:
logging.error('database operate error:{}'.format(str(e)))
self.db.rollback()
return data
def close_spider(self, spider):
self.cursor.close()
self.db.close()
接下來(lái)就是配置 settings.py 了,我們首先要設(shè)置好 UserAgent,這一步是所有爬蟲必須的。另外,針對(duì) scrapy-redis 插件,我們只需要設(shè)置 scrapy-redis 的調(diào)度器和去重過(guò)濾器以及 Redis 的連接配置即可。如果想要將抓取的結(jié)果保存到 Redis 中,需要在 ITEM_PIPELINES 值中添加 scrapy-redis 的 item pipeline 即可。這里我們相應(yīng)的配置如下:
# ...
USER_AGENT = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.106 Safari/537.36'
# ...
ITEM_PIPELINES = {
'toutiao_hotnews.pipelines.ToutiaoHotnewsPipeline': 300,
# 指定scrapy-redis的pipeline,將結(jié)果保存到redis中
'scrapy_redis.pipelines.RedisPipeline': 400,
}
# ...
SCHEDULER = 'scrapy_redis.scheduler.Scheduler'
DUPEFILTER_CLASS = 'scrapy_redis.dupefilter.RFPDupeFilter'
# 設(shè)置連接 Redis 的 URL
REDIS_URL = 'redis://:spyinx@47.115.61.209:6777'
就這樣簡(jiǎn)單改造后,一個(gè)支持分布式的爬蟲就完成了。我們?cè)诿颗_(tái)云主機(jī)上上傳該爬蟲代碼,然后在爬蟲項(xiàng)目目錄下執(zhí)行 scrapy crawl hotnews 運(yùn)行爬蟲。此時(shí),所有的爬蟲都會(huì)處于等待狀態(tài),需要手動(dòng)將起始的請(qǐng)求 URL 設(shè)置到 redis 的請(qǐng)求列表中,相應(yīng)的 key 默認(rèn)為 hotnews:start_urls。添加的 redis 命令為:
> lpush hotnews:start_urls url
為此我準(zhǔn)備了一段 python 代碼幫助我們完成 url 的生成以及推送到 redis 中:
# 位置: 在 toutiao_hotnews 目錄下,和 scrapy.cfg 文件同一級(jí)
import redis
from toutiao_hotnews.spiders.hotnews import HotnewsSpider
spider = HotnewsSpider()
r = redis.Redis(host='47.115.61.209', port=6777, password='spyinx', db=0)
request_url = spider._get_url(0)
r.lpush("hotnews:start_urls", request_url)
接下來(lái),我們看看這個(gè)分布式爬蟲的運(yùn)行效果:
上面的視頻中,我們啟動(dòng)了兩個(gè) scrapy 爬蟲,他們分別監(jiān)聽(tīng) redis 中的 hotnews:start_urls 列表,當(dāng)里面有數(shù)據(jù)時(shí)其中一個(gè)爬蟲便會(huì)讀取該 url 然后開始爬取動(dòng)作;后面我們停止其中一個(gè)爬蟲時(shí),繼續(xù)向 redis 中添加一個(gè)請(qǐng)求的 url 則另一個(gè) scrapy 爬蟲也會(huì)繼續(xù)正常工作。 由于我們添加了兩個(gè) item pipeline,其中第一個(gè)會(huì)將 item 數(shù)據(jù)保存到數(shù)據(jù)庫(kù)中,scrapy_redis 的 item pipeline 則會(huì)將抓取的 item 結(jié)果保存到 redis 中,對(duì)應(yīng)的 key 名為 hotnews:items。
在對(duì) scrapy-redis 插件有了一定的了解后,我們來(lái)分析一下 scrapy-redis 的源碼并解釋下上述工程的執(zhí)行流程。
2. Scrapy-Redis 源碼分析
我們從 github 上可以找到 scrapy-redis 插件的源碼。它的代碼少而精,但是簡(jiǎn)單的擴(kuò)展就能使得 scrapy 框架具備分布式功能,因此它在 github 上也收獲了不少贊。我們下載其源碼來(lái)一窺其內(nèi)部原理:

我們會(huì)從一開始繼承的 RedisSpider 類開始學(xué)起,并逐步深入源碼學(xué)習(xí)。
2.1 scrapy-redis 中的 RedisSpider 類分析
來(lái)看 RedisSpider 類的定義,它位于源碼的 spider.py 文件中:
# 源碼位置:scrapy_redis/spider.py
# ...
class RedisSpider(RedisMixin, Spider):
"""Spider that reads urls from redis queue when idle.
Attributes
----------
redis_key : str (default: REDIS_START_URLS_KEY)
Redis key where to fetch start URLs from..
redis_batch_size : int (default: CONCURRENT_REQUESTS)
Number of messages to fetch from redis on each attempt.
redis_encoding : str (default: REDIS_ENCODING)
Encoding to use when decoding messages from redis queue.
Settings
--------
REDIS_START_URLS_KEY : str (default: "<spider.name>:start_urls")
Default Redis key where to fetch start URLs from..
REDIS_START_URLS_BATCH_SIZE : int (deprecated by CONCURRENT_REQUESTS)
Default number of messages to fetch from redis on each attempt.
REDIS_START_URLS_AS_SET : bool (default: False)
Use SET operations to retrieve messages from the redis queue. If False,
the messages are retrieve using the LPOP command.
REDIS_ENCODING : str (default: "utf-8")
Default encoding to use when decoding messages from redis queue.
"""
@classmethod
def from_crawler(self, crawler, *args, **kwargs):
obj = super(RedisSpider, self).from_crawler(crawler, *args, **kwargs)
# 設(shè)置redis相關(guān)信息
obj.setup_redis(crawler)
return obj
# ...
源碼中關(guān)于該類的說(shuō)明已經(jīng)非常清楚了,我們簡(jiǎn)單翻一下就是:空閑時(shí)從 redis 隊(duì)列中讀取 urls 的 spider。來(lái)關(guān)注 from_crawler() 方法的第二個(gè)語(yǔ)句:setup_redis(),其實(shí)現(xiàn)代碼如下:
class RedisMixin(object):
"""Mixin class to implement reading urls from a redis queue."""
redis_key = None
redis_batch_size = None
redis_encoding = None
# Redis client placeholder.
server = None
def start_requests(self):
"""Returns a batch of start requests from redis."""
return self.next_requests()
def setup_redis(self, crawler=None):
"""Setup redis connection and idle signal.
This should be called after the spider has set its crawler object.
"""
if self.server is not None:
return
if crawler is None:
# We allow optional crawler argument to keep backwards
# compatibility.
# XXX: Raise a deprecation warning.
crawler = getattr(self, 'crawler', None)
if crawler is None:
raise ValueError("crawler is required")
settings = crawler.settings
# 設(shè)置redis_key屬性值,如果settings.py中沒(méi)有設(shè)置,就會(huì)使用默認(rèn)的key值
if self.redis_key is None:
self.redis_key = settings.get(
'REDIS_START_URLS_KEY', defaults.START_URLS_KEY,
)
self.redis_key = self.redis_key % {'name': self.name}
# 確保redis_key值不為空
if not self.redis_key.strip():
raise ValueError("redis_key must not be empty")
# 獲取redis_batch_size的值,并轉(zhuǎn)成int類型
if self.redis_batch_size is None:
# TODO: Deprecate this setting (REDIS_START_URLS_BATCH_SIZE).
self.redis_batch_size = settings.getint(
'REDIS_START_URLS_BATCH_SIZE',
settings.getint('CONCURRENT_REQUESTS'),
)
try:
self.redis_batch_size = int(self.redis_batch_size)
except (TypeError, ValueError):
raise ValueError("redis_batch_size must be an integer")
# 獲取redis_batch_size值
if self.redis_encoding is None:
self.redis_encoding = settings.get('REDIS_ENCODING', defaults.REDIS_ENCODING)
self.logger.info("Reading start URLs from redis key '%(redis_key)s' "
"(batch size: %(redis_batch_size)s, encoding: %(redis_encoding)s",
self.__dict__)
# 獲取server屬性,即redis的連接
self.server = connection.from_settings(crawler.settings)
# The idle signal is called when the spider has no requests left,
# that's when we will schedule new requests from redis queue
crawler.signals.connect(self.spider_idle, signal=signals.spider_idle)
def next_requests(self):
"""Returns a request to be scheduled or none."""
use_set = self.settings.getbool('REDIS_START_URLS_AS_SET', defaults.START_URLS_AS_SET)
fetch_one = self.server.spop if use_set else self.server.lpop
# XXX: Do we need to use a timeout here?
found = 0
# TODO: Use redis pipeline execution.
while found < self.redis_batch_size:
data = fetch_one(self.redis_key)
if not data:
# Queue empty.
break
req = self.make_request_from_data(data)
if req:
yield req
found += 1
else:
self.logger.debug("Request not made from data: %r", data)
if found:
self.logger.debug("Read %s requests from '%s'", found, self.redis_key)
def make_request_from_data(self, data):
"""Returns a Request instance from data coming from Redis.
By default, ``data`` is an encoded URL. You can override this method to
provide your own message decoding.
Parameters
----------
data : bytes
Message from redis.
"""
url = bytes_to_str(data, self.redis_encoding)
return self.make_requests_from_url(url)
def schedule_next_requests(self):
"""Schedules a request if available"""
# TODO: While there is capacity, schedule a batch of redis requests.
for req in self.next_requests():
self.crawler.engine.crawl(req, spider=self)
def spider_idle(self):
"""Schedules a request if available, otherwise waits."""
# XXX: Handle a sentinel to close the spider.
self.schedule_next_requests()
raise DontCloseSpider
上面的代碼比較簡(jiǎn)單,也十分容易看懂。關(guān)于 setup_redis() 方法:該方法主要是設(shè)置 redis 相關(guān)信息,同時(shí)連接 redis 并得到連接屬性 self.server 值;從該方法中我們可以看到在 settings.py 中我們可以設(shè)置如下幾個(gè)參數(shù):
-
REDIS_START_URLS_KEY:設(shè)置起始 urls 的 key,前面我們知道沒(méi)有設(shè)置是,默認(rèn)的 key 是
爬蟲名:start_urls,這在代碼中也有所體現(xiàn);if self.redis_key is None: self.redis_key = settings.get( 'REDIS_START_URLS_KEY', defaults.START_URLS_KEY, ) # default.py中有START_URLS_KEY = '%(name)s:start_urls' self.redis_key = self.redis_key % {'name': self.name} -
REDIS_START_URLS_BATCH_SIZE:已經(jīng)移除了;
-
REDIS_ENCODING: 設(shè)置 redis 中的編碼類型;
-
REDIS 的相關(guān)配置,主要在如下的語(yǔ)句中讀?。?/p>
self.server = connection.from_settings(crawler.settings)我們來(lái)跟蹤下這個(gè)
connection.from_settings()的代碼,位于 connection.py 中:# 源碼位置:scrapy_redis/connection.py import six from scrapy.utils.misc import load_object from . import defaults # 重要的映射關(guān)系,對(duì)應(yīng)著settings.py的 SETTINGS_PARAMS_MAP = { 'REDIS_URL': 'url', 'REDIS_HOST': 'host', 'REDIS_PORT': 'port', 'REDIS_ENCODING': 'encoding', } def get_redis_from_settings(settings): params = defaults.REDIS_PARAMS.copy() params.update(settings.getdict('REDIS_PARAMS')) # XXX: Deprecate REDIS_* settings. for source, dest in SETTINGS_PARAMS_MAP.items(): val = settings.get(source) if val: params[dest] = val # Allow ``redis_cls`` to be a path to a class. if isinstance(params.get('redis_cls'), six.string_types): params['redis_cls'] = load_object(params['redis_cls']) return get_redis(**params) # Backwards compatible alias. from_settings = get_redis_from_settings def get_redis(**kwargs): # 默認(rèn)使用defaults.REDIS_CLS,也就是redis.StrictRedis redis_cls = kwargs.pop('redis_cls', defaults.REDIS_CLS) url = kwargs.pop('url', None) if url: return redis_cls.from_url(url, **kwargs) else: return redis_cls(**kwargs)
上面的代碼非常簡(jiǎn)單,get_redis_from_settings() 方法會(huì)從 settings.py 中讀取 REDIS_URL 、REDIS_HOST 、REDIS_PORT 等參數(shù),另外還會(huì)額外讀取 REDIS_PARAMS 。優(yōu)先使用 REDIS_URL 配置信息,來(lái)看看最核心的建立客戶端連接實(shí)例的代碼:
if url:
return redis_cls.from_url(url, **kwargs)
else:
return redis_cls(**kwargs)
這里的 redis_cls 正是第三方模塊類 redis.StrictRedis,當(dāng)然我們也可以通過(guò)覆蓋 default.py 中的 REDIS_PARAMS 參數(shù)中的 redis_cls 來(lái)選擇新的操作 redis 的第三方模塊。
緊接著,我們注意到前面在改造 Scrapy 爬蟲時(shí)去掉了 start_requests() 這個(gè)方法。在啟動(dòng) scrapy 爬蟲后,爬蟲會(huì)等到 redis 的 urls 隊(duì)列中出現(xiàn)相應(yīng)的起始 url 值,然后獲取該 url 開始數(shù)據(jù)爬取。我們來(lái)看看這個(gè)過(guò)程是如何實(shí)現(xiàn)的?
RedisSpider 類繼承了 RedisMixin 這個(gè) mixin,它是 scrapy-redis 插件爬蟲需要單獨(dú)實(shí)現(xiàn)的功能類。該 Mixin 中正好實(shí)現(xiàn)了 start_requests() 方法,具體代碼如下:
# 源碼位置:scrapy_redis/spiders.py
# ...
class RedisMixin(object):
# ...
def start_requests(self):
"""Returns a batch of start requests from redis."""
return self.next_requests()
# ...
def next_requests(self):
"""Returns a request to be scheduled or none."""
use_set = self.settings.getbool('REDIS_START_URLS_AS_SET', defaults.START_URLS_AS_SET)
fetch_one = self.server.spop if use_set else self.server.lpop
# XXX: Do we need to use a timeout here?
found = 0
# TODO: Use redis pipeline execution.
while found < self.redis_batch_size:
data = fetch_one(self.redis_key)
if not data:
# Queue empty.
break
req = self.make_request_from_data(data)
if req:
yield req
found += 1
else:
self.logger.debug("Request not made from data: %r", data)
if found:
self.logger.debug("Read %s requests from '%s'", found, self.redis_key)
def make_request_from_data(self, data):
url = bytes_to_str(data, self.redis_encoding)
return self.make_requests_from_url(url)
# ...
我們沒(méi)有設(shè)置 REDIS_START_URLS_AS_SET 值,所以默認(rèn)使用 Redis 的列表類型。因此 fetch_one 方法就是 self.server.lpop,對(duì)應(yīng)就是 Redis 中的 lpop (左彈出) 方法。我們的 URL 是使用 lpush 進(jìn)去了,獲取該結(jié)果默認(rèn)使用的是 lpop,因此我們知道先 push 進(jìn)去的 url 元素就會(huì)后 pop 彈出并執(zhí)行。另外,我們看到,如果 redis 中對(duì)應(yīng)的 urls 隊(duì)列中存在一個(gè) url 元素后,執(zhí)行如下操作:
req = self.make_request_from_data(data)
if req:
yield req
found += 1
else:
self.logger.debug("Request not made from data: %r", data)
來(lái)繼續(xù)看 self.make_request_from_data() 這個(gè)方法:
def make_request_from_data(self, data):
url = bytes_to_str(data, self.redis_encoding)
return self.make_requests_from_url(url)
這個(gè) self.make_requests_from_url() 方法其實(shí)調(diào)用的是 scrapy 模塊中的 spider 類中的方法:
# 源碼位置:scrapy/spiders/__init__.py
# ...
class Spider(object_ref):
# ...
def make_requests_from_url(self, url):
""" This method is deprecated. """
warnings.warn(
"Spider.make_requests_from_url method is deprecated: "
"it will be removed and not be called by the default "
"Spider.start_requests method in future Scrapy releases. "
"Please override Spider.start_requests method instead."
)
return Request(url, dont_filter=True)
最終我們發(fā)現(xiàn)在 scrapy-redis 插件中,它的 RedisSpider 類也有默認(rèn)的 start_requests() 方法,該方法從 redis 中指定的隊(duì)列中取出 urls 并封裝成 Scrapy 中的 Request 請(qǐng)求并 yield 給 Scrapy 的調(diào)度器去調(diào)度處理。
2.2 scrapy-redis 中的請(qǐng)求隊(duì)列
接下來(lái)我們來(lái)看看 scrapy-redis 在對(duì) scrapy 框架對(duì)請(qǐng)求隊(duì)列做的一些改動(dòng)。Scrapy 原生的請(qǐng)求隊(duì)列是存儲(chǔ)在內(nèi)存中的,這樣必然無(wú)法實(shí)現(xiàn)分布式功能。scrapy-redis 插件將這個(gè)請(qǐng)求隊(duì)列改造成基于 redis 數(shù)據(jù)庫(kù)的,通過(guò)這個(gè) redis 數(shù)據(jù)庫(kù),實(shí)現(xiàn)了三種請(qǐng)求隊(duì)列:
- 優(yōu)先級(jí)隊(duì)列 (默認(rèn)):PriorityQueue;
- 先進(jìn)先出隊(duì)列:FifoQueue;
- 后進(jìn)先出隊(duì)列:LifoQueue;
有了這個(gè)基于 Redis 數(shù)據(jù)庫(kù)的隊(duì)列后,所有的 Scrapy 爬蟲就能共享這一請(qǐng)求隊(duì)列,實(shí)現(xiàn)分布式協(xié)作的功能。對(duì)于請(qǐng)求隊(duì)列,主要的功能是入隊(duì) (push) 和出隊(duì) (pop)。這部分的代碼位于 scrapy_redis/queue.py 文件中,有興趣可以仔細(xì)閱讀三個(gè)隊(duì)列的入隊(duì)和出隊(duì)的實(shí)現(xiàn)。
2.3 scrapy-redis 中的去重過(guò)濾器
scrapy-redis 插件內(nèi)部實(shí)現(xiàn)了一個(gè)去重過(guò)濾器,同樣基于 Redis 數(shù)據(jù)庫(kù)。原生的 Scrapy 的去重功能是基于內(nèi)存的集合實(shí)現(xiàn),并不適合分布式的。scrapy-redis 通過(guò) Redis 來(lái)實(shí)現(xiàn)數(shù)據(jù)共享,利用 Redis 的集合類型來(lái)實(shí)現(xiàn) 元素的去重,其代碼位于 scrapy_redis/dupefilter.py 文件中。我們可以看看其去重的最核心方法:
# 源碼位置:scrapy_redis/dupefilter.py
# ...
class RFPDupeFilter(BaseDupeFilter):
# ...
def request_seen(self, request):
"""Returns True if request was already seen.
Parameters
----------
request : scrapy.http.Request
Returns
-------
bool
"""
fp = self.request_fingerprint(request)
# This returns the number of values added, zero if already exists.
added = self.server.sadd(self.key, fp)
return added == 0
def request_fingerprint(self, request):
"""Returns a fingerprint for a given request.
Parameters
----------
request : scrapy.http.Request
Returns
-------
str
"""
return request_fingerprint(request)
# ...
從代碼可知,請(qǐng)求會(huì)存入到 Redis 的集合中,從而實(shí)現(xiàn)去重功能,是不是非常簡(jiǎn)單?
2.4 scrapy-redis 中的調(diào)度器
回憶我們的 scrapy 框架中調(diào)度器的功能:接收 scrapy 引擎?zhèn)鬟^(guò)來(lái)的請(qǐng)求 (入隊(duì)),然后從隊(duì)列中選出一個(gè)請(qǐng)求 (出隊(duì)) 發(fā)送給引擎去執(zhí)行下載。此時(shí)我們的請(qǐng)求隊(duì)列是在 Redis 中的,不能想象,這里 scrapy-redis 插件也是需要對(duì) scrapy 的調(diào)度器模塊進(jìn)行略微的調(diào)整,主要改造調(diào)度請(qǐng)求的入隊(duì)和出隊(duì)過(guò)程。此外,調(diào)度器還具備去重功能,因此這里也會(huì)使用前面改造的去重過(guò)濾器來(lái)實(shí)現(xiàn)對(duì)請(qǐng)求的去重。具體代碼如下:
# 源碼位置:scrapy_redis/scheduler.py
# ...
class Scheduler(object):
# ...
# 請(qǐng)求入隊(duì)
def enqueue_request(self, request):
if not request.dont_filter and self.df.request_seen(request):
self.df.log(request, self.spider)
return False
if self.stats:
self.stats.inc_value('scheduler/enqueued/redis', spider=self.spider)
self.queue.push(request)
return True
# 請(qǐng)求出隊(duì)
def next_request(self):
block_pop_timeout = self.idle_before_close
request = self.queue.pop(block_pop_timeout)
if request and self.stats:
self.stats.inc_value('scheduler/dequeued/redis', spider=self.spider)
return request
# ...
我們看到這里 scrapy-redis 實(shí)現(xiàn)的調(diào)度器模塊,其請(qǐng)求入隊(duì)操作過(guò)程為:
- 先判斷是否需要去重,如果需要?jiǎng)t使用 scrapy-redis 實(shí)現(xiàn)的去重過(guò)濾器進(jìn)行判斷;
- 判斷是否需要統(tǒng)計(jì);
- 使用 scrapy-redis 中基于 redis 實(shí)現(xiàn)的請(qǐng)求隊(duì)列進(jìn)行入隊(duì)操作;
出隊(duì)操作則是調(diào)用 scrapy-redis 中實(shí)現(xiàn)的請(qǐng)求隊(duì)列進(jìn)行出隊(duì) (pop() 方法)。scrapy-redis 中調(diào)度器的核心步驟就是這兩步,大家看懂了嗎?
2.5 scrapy-redis 中的 Item Pipeline
最后我們來(lái)看 scrapy-redis 中定義的 item pipeline。前面我們?cè)陬^條新聞爬蟲的改造中只是在配置中添加了 scrapy-redis 中的 item pipeline,這樣爬蟲抓取的結(jié)果會(huì)保存到 redis 中,那么該 pipeline 是如何實(shí)現(xiàn)的呢?其代碼位于 scrapy_redis/pipelines.py 文件中,我們來(lái)一覽究竟:
# 源碼位置:scrapy_redis/pipelines.py
# ...
class RedisPipeline(object):
def __init__(self, server,
key=defaults.PIPELINE_KEY,
serialize_func=default_serialize):
"""Initialize pipeline.
Parameters
----------
server : StrictRedis
Redis client instance.
key : str
Redis key where to store items.
serialize_func : callable
Items serializer function.
"""
self.server = server
self.key = key
self.serialize = serialize_func
@classmethod
def from_settings(cls, settings):
params = {
'server': connection.from_settings(settings),
}
if settings.get('REDIS_ITEMS_KEY'):
params['key'] = settings['REDIS_ITEMS_KEY']
if settings.get('REDIS_ITEMS_SERIALIZER'):
params['serialize_func'] = load_object(
settings['REDIS_ITEMS_SERIALIZER']
)
return cls(**params)
@classmethod
def from_crawler(cls, crawler):
return cls.from_settings(crawler.settings)
def process_item(self, item, spider):
return deferToThread(self._process_item, item, spider)
def _process_item(self, item, spider):
key = self.item_key(item, spider)
data = self.serialize(item)
self.server.rpush(key, data)
return item
def item_key(self, item, spider):
"""Returns redis key based on given spider.
Override this function to use a different key depending on the item
and/or spider.
"""
return self.key % {'spider': spider.name}
這段代碼也是簡(jiǎn)潔明了,首先是初始化三個(gè)屬性值:
server:redis 客戶端實(shí)例,用于對(duì) redis 進(jìn)行操作;key:結(jié)果保存到 redis 中的 key 名;serialize:指定結(jié)果序列化類;
作為 scrapy 中的 pipeline,最核心的處理函數(shù)就是 process_item() 方法。在該 pipeline 中,該方法只有一條語(yǔ)句:
deferToThread(self._process_item, item, spider)
deferToThread() 方法是 Twisted 框架提供的一個(gè)方法,其含義如下:
Run a function in a thread and return the result as a Deferred
其實(shí)就是開啟一個(gè)線程執(zhí)行相應(yīng)的方法,并將結(jié)果作為一個(gè) Deferred 返回。我們并不關(guān)心這個(gè) Deferred 是啥,在最后一部分源碼篇中會(huì)介紹到,這里我們只關(guān)心處理 item 的操作是 self._process_item() 這個(gè)方法。該方法的邏輯非常簡(jiǎn)單明了:
- 生成保存到 Redis 中的 key;
- 將 item 值序列化以便能保存到 Redis 中;
- 調(diào)用 redis 的
rpush()方法將序列化結(jié)果保存到相應(yīng)列表中;
看完了 scrapy-redis 中 RedisPipeline 的代碼,是不是知道為什么結(jié)果會(huì)保存到 redis 中了吧?就這樣,我們幾乎學(xué)完了 scrapy-redis 插件的全部源碼,下來(lái)我們來(lái)看一看 scrapy-redis 插件的架構(gòu)圖,進(jìn)一步理解該插件:

3. 小結(jié)
本小節(jié)中我們改造了前面的爬取頭條熱點(diǎn)新聞的代碼,將其改造成一個(gè)分布式的爬蟲。接下來(lái)我們分析了 Scrapy-Redis 插件的源碼,對(duì)該插件的內(nèi)部原理有了深刻的了解。
沈無(wú)奇 ·
2025 imooc.com All Rights Reserved |