Scrapy 的分布式實現(xiàn)
今天我們簡單介紹下 Scrapy 的分布式實現(xiàn)框架:Scrapy-Redis 并基于該插件完成一個簡單的分布式爬蟲案例。
1. 一個簡單的分布式爬蟲案例
我們以前面的第16講的頭條熱點新聞爬蟲基礎,使用 scrapy-redis 插件進行改造,使之支持分布式爬取?,F(xiàn)在我們按照如下的步驟進行。
環(huán)境準備。由于條件限制,我們只有2臺云主機,分別命名為 server 和 server2。兩臺主機的用途如下:
主機 | 服務 | 公網(wǎng) ip |
---|---|---|
server | scrapy爬蟲 | 180.76.152.113 |
server2 | scrapy爬蟲、redis服務 | 47.115.61.209 |
先準備好 redis 服務,redis 服務的搭建以及設置密碼等步驟在第一部分中已經(jīng)介紹過了,這里就不再重復介紹了;
[root@server ~]# redis-cli -h 47.115.61.209 -p 6777
47.115.61.209:6777> auth spyinx
OK
47.115.61.209:6777> get hello
"new world"
我們在 server 和 server2 上都進行測試,確保都能連上 server2 上的 redis 服務。
安裝 scrapy 和 scrapy-redis;
[root@server2 ~]# pip3 install scrapy scrapy-redis
# ...
改造 spider 代碼,將原先繼承的 Spider 類改為繼承 scrapy-redis 插件中的 RedisSpider,同時去掉 start_requests()
方法:
# from scrapy import Request, Spider
from scrapy_redis.spiders import RedisSpider
# ...
class HotnewsSpider(RedisSpider):
# ...
# 注釋start_requests()方法
# def start_requests(self):
# request_url = self._get_url(max_behot_time)
# self.logger.info(f"we get the request url : {request_url}")
# yield Request(request_url, headers=headers, cookies=cookies, callback=self.parse)
# ...
改造下原先的 pipelines.py
代碼,為了能實時將數(shù)據(jù)保存到數(shù)據(jù)庫中,我們挪動下 SQL 語句 commit 的位置,同時去掉原先的郵件發(fā)送功能:
# Define your item pipelines here
#
# Don't forget to add your pipeline to the ITEM_PIPELINES setting
# See: https://docs.scrapy.org/en/latest/topics/item-pipeline.html
# useful for handling different item types with a single interface
import logging
from string import Template
from itemadapter import ItemAdapter
import pymysql
from toutiao_hotnews.mail import HtmlMailSender
from toutiao_hotnews.items import ToutiaoHotnewsItem
from toutiao_hotnews.html_template import hotnews_template_html
from toutiao_hotnews import settings
class ToutiaoHotnewsPipeline:
logger = logging.getLogger('pipelines_log')
def open_spider(self, spider):
# 初始化連接數(shù)據(jù)庫
self.db = pymysql.connect(
host=spider.settings.get('MYSQL_HOST', 'localhost'),
user=spider.settings.get('MYSQL_USER', 'root'),
password=spider.settings.get('MYSQL_PASS', '123456'),
port=spider.settings.get('MYSQL_PORT', 3306),
db=spider.settings.get('MYSQL_DB_NAME', 'mysql'),
charset='utf8'
)
self.cursor = self.db.cursor()
def process_item(self, item, spider):
# 插入sql語句
sql = "insert into toutiao_hotnews(title, abstract, source, source_url, comments_count, behot_time) values (%s, %s, %s, %s, %s, %s)"
if item and isinstance(item, ToutiaoHotnewsItem):
self.cursor.execute(sql, (item['title'], item['abstract'], item['source'], item['source_url'], item['comments_count'], item['behot_time']))
# 將commit語句移動到這里
self.db.commit()
return item
def query_data(self, sql):
data = {}
try:
self.cursor.execute(sql)
data = self.cursor.fetchall()
except Exception as e:
logging.error('database operate error:{}'.format(str(e)))
self.db.rollback()
return data
def close_spider(self, spider):
self.cursor.close()
self.db.close()
接下來就是配置 settings.py
了,我們首先要設置好 UserAgent,這一步是所有爬蟲必須的。另外,針對 scrapy-redis 插件,我們只需要設置 scrapy-redis 的調(diào)度器和去重過濾器以及 Redis 的連接配置即可。如果想要將抓取的結(jié)果保存到 Redis 中,需要在 ITEM_PIPELINES
值中添加 scrapy-redis 的 item pipeline 即可。這里我們相應的配置如下:
# ...
USER_AGENT = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.106 Safari/537.36'
# ...
ITEM_PIPELINES = {
'toutiao_hotnews.pipelines.ToutiaoHotnewsPipeline': 300,
# 指定scrapy-redis的pipeline,將結(jié)果保存到redis中
'scrapy_redis.pipelines.RedisPipeline': 400,
}
# ...
SCHEDULER = 'scrapy_redis.scheduler.Scheduler'
DUPEFILTER_CLASS = 'scrapy_redis.dupefilter.RFPDupeFilter'
# 設置連接 Redis 的 URL
REDIS_URL = 'redis://:spyinx@47.115.61.209:6777'
就這樣簡單改造后,一個支持分布式的爬蟲就完成了。我們在每臺云主機上上傳該爬蟲代碼,然后在爬蟲項目目錄下執(zhí)行 scrapy crawl hotnews
運行爬蟲。此時,所有的爬蟲都會處于等待狀態(tài),需要手動將起始的請求 URL 設置到 redis 的請求列表中,相應的 key 默認為 hotnews:start_urls
。添加的 redis 命令為:
> lpush hotnews:start_urls url
為此我準備了一段 python 代碼幫助我們完成 url 的生成以及推送到 redis 中:
# 位置: 在 toutiao_hotnews 目錄下,和 scrapy.cfg 文件同一級
import redis
from toutiao_hotnews.spiders.hotnews import HotnewsSpider
spider = HotnewsSpider()
r = redis.Redis(host='47.115.61.209', port=6777, password='spyinx', db=0)
request_url = spider._get_url(0)
r.lpush("hotnews:start_urls", request_url)
接下來,我們看看這個分布式爬蟲的運行效果:
上面的視頻中,我們啟動了兩個 scrapy 爬蟲,他們分別監(jiān)聽 redis 中的 hotnews:start_urls
列表,當里面有數(shù)據(jù)時其中一個爬蟲便會讀取該 url 然后開始爬取動作;后面我們停止其中一個爬蟲時,繼續(xù)向 redis 中添加一個請求的 url 則另一個 scrapy 爬蟲也會繼續(xù)正常工作。 由于我們添加了兩個 item pipeline,其中第一個會將 item 數(shù)據(jù)保存到數(shù)據(jù)庫中,scrapy_redis 的 item pipeline 則會將抓取的 item 結(jié)果保存到 redis 中,對應的 key 名為 hotnews:items
。
在對 scrapy-redis 插件有了一定的了解后,我們來分析一下 scrapy-redis 的源碼并解釋下上述工程的執(zhí)行流程。
2. Scrapy-Redis 源碼分析
我們從 github 上可以找到 scrapy-redis 插件的源碼。它的代碼少而精,但是簡單的擴展就能使得 scrapy 框架具備分布式功能,因此它在 github 上也收獲了不少贊。我們下載其源碼來一窺其內(nèi)部原理:
我們會從一開始繼承的 RedisSpider 類開始學起,并逐步深入源碼學習。
2.1 scrapy-redis 中的 RedisSpider 類分析
來看 RedisSpider 類的定義,它位于源碼的 spider.py 文件中:
# 源碼位置:scrapy_redis/spider.py
# ...
class RedisSpider(RedisMixin, Spider):
"""Spider that reads urls from redis queue when idle.
Attributes
----------
redis_key : str (default: REDIS_START_URLS_KEY)
Redis key where to fetch start URLs from..
redis_batch_size : int (default: CONCURRENT_REQUESTS)
Number of messages to fetch from redis on each attempt.
redis_encoding : str (default: REDIS_ENCODING)
Encoding to use when decoding messages from redis queue.
Settings
--------
REDIS_START_URLS_KEY : str (default: "<spider.name>:start_urls")
Default Redis key where to fetch start URLs from..
REDIS_START_URLS_BATCH_SIZE : int (deprecated by CONCURRENT_REQUESTS)
Default number of messages to fetch from redis on each attempt.
REDIS_START_URLS_AS_SET : bool (default: False)
Use SET operations to retrieve messages from the redis queue. If False,
the messages are retrieve using the LPOP command.
REDIS_ENCODING : str (default: "utf-8")
Default encoding to use when decoding messages from redis queue.
"""
@classmethod
def from_crawler(self, crawler, *args, **kwargs):
obj = super(RedisSpider, self).from_crawler(crawler, *args, **kwargs)
# 設置redis相關信息
obj.setup_redis(crawler)
return obj
# ...
源碼中關于該類的說明已經(jīng)非常清楚了,我們簡單翻一下就是:空閑時從 redis 隊列中讀取 urls 的 spider。來關注 from_crawler()
方法的第二個語句:setup_redis()
,其實現(xiàn)代碼如下:
class RedisMixin(object):
"""Mixin class to implement reading urls from a redis queue."""
redis_key = None
redis_batch_size = None
redis_encoding = None
# Redis client placeholder.
server = None
def start_requests(self):
"""Returns a batch of start requests from redis."""
return self.next_requests()
def setup_redis(self, crawler=None):
"""Setup redis connection and idle signal.
This should be called after the spider has set its crawler object.
"""
if self.server is not None:
return
if crawler is None:
# We allow optional crawler argument to keep backwards
# compatibility.
# XXX: Raise a deprecation warning.
crawler = getattr(self, 'crawler', None)
if crawler is None:
raise ValueError("crawler is required")
settings = crawler.settings
# 設置redis_key屬性值,如果settings.py中沒有設置,就會使用默認的key值
if self.redis_key is None:
self.redis_key = settings.get(
'REDIS_START_URLS_KEY', defaults.START_URLS_KEY,
)
self.redis_key = self.redis_key % {'name': self.name}
# 確保redis_key值不為空
if not self.redis_key.strip():
raise ValueError("redis_key must not be empty")
# 獲取redis_batch_size的值,并轉(zhuǎn)成int類型
if self.redis_batch_size is None:
# TODO: Deprecate this setting (REDIS_START_URLS_BATCH_SIZE).
self.redis_batch_size = settings.getint(
'REDIS_START_URLS_BATCH_SIZE',
settings.getint('CONCURRENT_REQUESTS'),
)
try:
self.redis_batch_size = int(self.redis_batch_size)
except (TypeError, ValueError):
raise ValueError("redis_batch_size must be an integer")
# 獲取redis_batch_size值
if self.redis_encoding is None:
self.redis_encoding = settings.get('REDIS_ENCODING', defaults.REDIS_ENCODING)
self.logger.info("Reading start URLs from redis key '%(redis_key)s' "
"(batch size: %(redis_batch_size)s, encoding: %(redis_encoding)s",
self.__dict__)
# 獲取server屬性,即redis的連接
self.server = connection.from_settings(crawler.settings)
# The idle signal is called when the spider has no requests left,
# that's when we will schedule new requests from redis queue
crawler.signals.connect(self.spider_idle, signal=signals.spider_idle)
def next_requests(self):
"""Returns a request to be scheduled or none."""
use_set = self.settings.getbool('REDIS_START_URLS_AS_SET', defaults.START_URLS_AS_SET)
fetch_one = self.server.spop if use_set else self.server.lpop
# XXX: Do we need to use a timeout here?
found = 0
# TODO: Use redis pipeline execution.
while found < self.redis_batch_size:
data = fetch_one(self.redis_key)
if not data:
# Queue empty.
break
req = self.make_request_from_data(data)
if req:
yield req
found += 1
else:
self.logger.debug("Request not made from data: %r", data)
if found:
self.logger.debug("Read %s requests from '%s'", found, self.redis_key)
def make_request_from_data(self, data):
"""Returns a Request instance from data coming from Redis.
By default, ``data`` is an encoded URL. You can override this method to
provide your own message decoding.
Parameters
----------
data : bytes
Message from redis.
"""
url = bytes_to_str(data, self.redis_encoding)
return self.make_requests_from_url(url)
def schedule_next_requests(self):
"""Schedules a request if available"""
# TODO: While there is capacity, schedule a batch of redis requests.
for req in self.next_requests():
self.crawler.engine.crawl(req, spider=self)
def spider_idle(self):
"""Schedules a request if available, otherwise waits."""
# XXX: Handle a sentinel to close the spider.
self.schedule_next_requests()
raise DontCloseSpider
上面的代碼比較簡單,也十分容易看懂。關于 setup_redis()
方法:該方法主要是設置 redis 相關信息,同時連接 redis 并得到連接屬性 self.server
值;從該方法中我們可以看到在 settings.py 中我們可以設置如下幾個參數(shù):
-
REDIS_START_URLS_KEY:設置起始 urls 的 key,前面我們知道沒有設置是,默認的 key 是
爬蟲名:start_urls
,這在代碼中也有所體現(xiàn);if self.redis_key is None: self.redis_key = settings.get( 'REDIS_START_URLS_KEY', defaults.START_URLS_KEY, ) # default.py中有START_URLS_KEY = '%(name)s:start_urls' self.redis_key = self.redis_key % {'name': self.name}
-
REDIS_START_URLS_BATCH_SIZE:已經(jīng)移除了;
-
REDIS_ENCODING: 設置 redis 中的編碼類型;
-
REDIS 的相關配置,主要在如下的語句中讀?。?/p>
self.server = connection.from_settings(crawler.settings)
我們來跟蹤下這個
connection.from_settings()
的代碼,位于 connection.py 中:# 源碼位置:scrapy_redis/connection.py import six from scrapy.utils.misc import load_object from . import defaults # 重要的映射關系,對應著settings.py的 SETTINGS_PARAMS_MAP = { 'REDIS_URL': 'url', 'REDIS_HOST': 'host', 'REDIS_PORT': 'port', 'REDIS_ENCODING': 'encoding', } def get_redis_from_settings(settings): params = defaults.REDIS_PARAMS.copy() params.update(settings.getdict('REDIS_PARAMS')) # XXX: Deprecate REDIS_* settings. for source, dest in SETTINGS_PARAMS_MAP.items(): val = settings.get(source) if val: params[dest] = val # Allow ``redis_cls`` to be a path to a class. if isinstance(params.get('redis_cls'), six.string_types): params['redis_cls'] = load_object(params['redis_cls']) return get_redis(**params) # Backwards compatible alias. from_settings = get_redis_from_settings def get_redis(**kwargs): # 默認使用defaults.REDIS_CLS,也就是redis.StrictRedis redis_cls = kwargs.pop('redis_cls', defaults.REDIS_CLS) url = kwargs.pop('url', None) if url: return redis_cls.from_url(url, **kwargs) else: return redis_cls(**kwargs)
上面的代碼非常簡單,get_redis_from_settings()
方法會從 settings.py 中讀取 REDIS_URL
、REDIS_HOST
、REDIS_PORT
等參數(shù),另外還會額外讀取 REDIS_PARAMS
。優(yōu)先使用 REDIS_URL
配置信息,來看看最核心的建立客戶端連接實例的代碼:
if url:
return redis_cls.from_url(url, **kwargs)
else:
return redis_cls(**kwargs)
這里的 redis_cls 正是第三方模塊類 redis.StrictRedis
,當然我們也可以通過覆蓋 default.py 中的 REDIS_PARAMS
參數(shù)中的 redis_cls 來選擇新的操作 redis 的第三方模塊。
緊接著,我們注意到前面在改造 Scrapy 爬蟲時去掉了 start_requests()
這個方法。在啟動 scrapy 爬蟲后,爬蟲會等到 redis 的 urls 隊列中出現(xiàn)相應的起始 url 值,然后獲取該 url 開始數(shù)據(jù)爬取。我們來看看這個過程是如何實現(xiàn)的?
RedisSpider
類繼承了 RedisMixin
這個 mixin,它是 scrapy-redis 插件爬蟲需要單獨實現(xiàn)的功能類。該 Mixin 中正好實現(xiàn)了 start_requests()
方法,具體代碼如下:
# 源碼位置:scrapy_redis/spiders.py
# ...
class RedisMixin(object):
# ...
def start_requests(self):
"""Returns a batch of start requests from redis."""
return self.next_requests()
# ...
def next_requests(self):
"""Returns a request to be scheduled or none."""
use_set = self.settings.getbool('REDIS_START_URLS_AS_SET', defaults.START_URLS_AS_SET)
fetch_one = self.server.spop if use_set else self.server.lpop
# XXX: Do we need to use a timeout here?
found = 0
# TODO: Use redis pipeline execution.
while found < self.redis_batch_size:
data = fetch_one(self.redis_key)
if not data:
# Queue empty.
break
req = self.make_request_from_data(data)
if req:
yield req
found += 1
else:
self.logger.debug("Request not made from data: %r", data)
if found:
self.logger.debug("Read %s requests from '%s'", found, self.redis_key)
def make_request_from_data(self, data):
url = bytes_to_str(data, self.redis_encoding)
return self.make_requests_from_url(url)
# ...
我們沒有設置 REDIS_START_URLS_AS_SET
值,所以默認使用 Redis 的列表類型。因此 fetch_one
方法就是 self.server.lpop
,對應就是 Redis 中的 lpop (左彈出) 方法。我們的 URL 是使用 lpush 進去了,獲取該結(jié)果默認使用的是 lpop,因此我們知道先 push 進去的 url 元素就會后 pop 彈出并執(zhí)行。另外,我們看到,如果 redis 中對應的 urls 隊列中存在一個 url 元素后,執(zhí)行如下操作:
req = self.make_request_from_data(data)
if req:
yield req
found += 1
else:
self.logger.debug("Request not made from data: %r", data)
來繼續(xù)看 self.make_request_from_data()
這個方法:
def make_request_from_data(self, data):
url = bytes_to_str(data, self.redis_encoding)
return self.make_requests_from_url(url)
這個 self.make_requests_from_url()
方法其實調(diào)用的是 scrapy 模塊中的 spider 類中的方法:
# 源碼位置:scrapy/spiders/__init__.py
# ...
class Spider(object_ref):
# ...
def make_requests_from_url(self, url):
""" This method is deprecated. """
warnings.warn(
"Spider.make_requests_from_url method is deprecated: "
"it will be removed and not be called by the default "
"Spider.start_requests method in future Scrapy releases. "
"Please override Spider.start_requests method instead."
)
return Request(url, dont_filter=True)
最終我們發(fā)現(xiàn)在 scrapy-redis 插件中,它的 RedisSpider 類也有默認的 start_requests()
方法,該方法從 redis 中指定的隊列中取出 urls 并封裝成 Scrapy 中的 Request 請求并 yield 給 Scrapy 的調(diào)度器去調(diào)度處理。
2.2 scrapy-redis 中的請求隊列
接下來我們來看看 scrapy-redis 在對 scrapy 框架對請求隊列做的一些改動。Scrapy 原生的請求隊列是存儲在內(nèi)存中的,這樣必然無法實現(xiàn)分布式功能。scrapy-redis 插件將這個請求隊列改造成基于 redis 數(shù)據(jù)庫的,通過這個 redis 數(shù)據(jù)庫,實現(xiàn)了三種請求隊列:
- 優(yōu)先級隊列 (默認):PriorityQueue;
- 先進先出隊列:FifoQueue;
- 后進先出隊列:LifoQueue;
有了這個基于 Redis 數(shù)據(jù)庫的隊列后,所有的 Scrapy 爬蟲就能共享這一請求隊列,實現(xiàn)分布式協(xié)作的功能。對于請求隊列,主要的功能是入隊 (push) 和出隊 (pop)。這部分的代碼位于 scrapy_redis/queue.py
文件中,有興趣可以仔細閱讀三個隊列的入隊和出隊的實現(xiàn)。
2.3 scrapy-redis 中的去重過濾器
scrapy-redis 插件內(nèi)部實現(xiàn)了一個去重過濾器,同樣基于 Redis 數(shù)據(jù)庫。原生的 Scrapy 的去重功能是基于內(nèi)存的集合實現(xiàn),并不適合分布式的。scrapy-redis 通過 Redis 來實現(xiàn)數(shù)據(jù)共享,利用 Redis 的集合類型來實現(xiàn) 元素的去重,其代碼位于 scrapy_redis/dupefilter.py
文件中。我們可以看看其去重的最核心方法:
# 源碼位置:scrapy_redis/dupefilter.py
# ...
class RFPDupeFilter(BaseDupeFilter):
# ...
def request_seen(self, request):
"""Returns True if request was already seen.
Parameters
----------
request : scrapy.http.Request
Returns
-------
bool
"""
fp = self.request_fingerprint(request)
# This returns the number of values added, zero if already exists.
added = self.server.sadd(self.key, fp)
return added == 0
def request_fingerprint(self, request):
"""Returns a fingerprint for a given request.
Parameters
----------
request : scrapy.http.Request
Returns
-------
str
"""
return request_fingerprint(request)
# ...
從代碼可知,請求會存入到 Redis 的集合中,從而實現(xiàn)去重功能,是不是非常簡單?
2.4 scrapy-redis 中的調(diào)度器
回憶我們的 scrapy 框架中調(diào)度器的功能:接收 scrapy 引擎?zhèn)鬟^來的請求 (入隊),然后從隊列中選出一個請求 (出隊) 發(fā)送給引擎去執(zhí)行下載。此時我們的請求隊列是在 Redis 中的,不能想象,這里 scrapy-redis 插件也是需要對 scrapy 的調(diào)度器模塊進行略微的調(diào)整,主要改造調(diào)度請求的入隊和出隊過程。此外,調(diào)度器還具備去重功能,因此這里也會使用前面改造的去重過濾器來實現(xiàn)對請求的去重。具體代碼如下:
# 源碼位置:scrapy_redis/scheduler.py
# ...
class Scheduler(object):
# ...
# 請求入隊
def enqueue_request(self, request):
if not request.dont_filter and self.df.request_seen(request):
self.df.log(request, self.spider)
return False
if self.stats:
self.stats.inc_value('scheduler/enqueued/redis', spider=self.spider)
self.queue.push(request)
return True
# 請求出隊
def next_request(self):
block_pop_timeout = self.idle_before_close
request = self.queue.pop(block_pop_timeout)
if request and self.stats:
self.stats.inc_value('scheduler/dequeued/redis', spider=self.spider)
return request
# ...
我們看到這里 scrapy-redis 實現(xiàn)的調(diào)度器模塊,其請求入隊操作過程為:
- 先判斷是否需要去重,如果需要則使用 scrapy-redis 實現(xiàn)的去重過濾器進行判斷;
- 判斷是否需要統(tǒng)計;
- 使用 scrapy-redis 中基于 redis 實現(xiàn)的請求隊列進行入隊操作;
出隊操作則是調(diào)用 scrapy-redis 中實現(xiàn)的請求隊列進行出隊 (pop() 方法)。scrapy-redis 中調(diào)度器的核心步驟就是這兩步,大家看懂了嗎?
2.5 scrapy-redis 中的 Item Pipeline
最后我們來看 scrapy-redis 中定義的 item pipeline。前面我們在頭條新聞爬蟲的改造中只是在配置中添加了 scrapy-redis 中的 item pipeline,這樣爬蟲抓取的結(jié)果會保存到 redis 中,那么該 pipeline 是如何實現(xiàn)的呢?其代碼位于 scrapy_redis/pipelines.py
文件中,我們來一覽究竟:
# 源碼位置:scrapy_redis/pipelines.py
# ...
class RedisPipeline(object):
def __init__(self, server,
key=defaults.PIPELINE_KEY,
serialize_func=default_serialize):
"""Initialize pipeline.
Parameters
----------
server : StrictRedis
Redis client instance.
key : str
Redis key where to store items.
serialize_func : callable
Items serializer function.
"""
self.server = server
self.key = key
self.serialize = serialize_func
@classmethod
def from_settings(cls, settings):
params = {
'server': connection.from_settings(settings),
}
if settings.get('REDIS_ITEMS_KEY'):
params['key'] = settings['REDIS_ITEMS_KEY']
if settings.get('REDIS_ITEMS_SERIALIZER'):
params['serialize_func'] = load_object(
settings['REDIS_ITEMS_SERIALIZER']
)
return cls(**params)
@classmethod
def from_crawler(cls, crawler):
return cls.from_settings(crawler.settings)
def process_item(self, item, spider):
return deferToThread(self._process_item, item, spider)
def _process_item(self, item, spider):
key = self.item_key(item, spider)
data = self.serialize(item)
self.server.rpush(key, data)
return item
def item_key(self, item, spider):
"""Returns redis key based on given spider.
Override this function to use a different key depending on the item
and/or spider.
"""
return self.key % {'spider': spider.name}
這段代碼也是簡潔明了,首先是初始化三個屬性值:
server
:redis 客戶端實例,用于對 redis 進行操作;key
:結(jié)果保存到 redis 中的 key 名;serialize
:指定結(jié)果序列化類;
作為 scrapy 中的 pipeline,最核心的處理函數(shù)就是 process_item()
方法。在該 pipeline 中,該方法只有一條語句:
deferToThread(self._process_item, item, spider)
deferToThread()
方法是 Twisted 框架提供的一個方法,其含義如下:
Run a function in a thread and return the result as a Deferred
其實就是開啟一個線程執(zhí)行相應的方法,并將結(jié)果作為一個 Deferred 返回。我們并不關心這個 Deferred 是啥,在最后一部分源碼篇中會介紹到,這里我們只關心處理 item 的操作是 self._process_item()
這個方法。該方法的邏輯非常簡單明了:
- 生成保存到 Redis 中的 key;
- 將 item 值序列化以便能保存到 Redis 中;
- 調(diào)用 redis 的
rpush()
方法將序列化結(jié)果保存到相應列表中;
看完了 scrapy-redis 中 RedisPipeline 的代碼,是不是知道為什么結(jié)果會保存到 redis 中了吧?就這樣,我們幾乎學完了 scrapy-redis 插件的全部源碼,下來我們來看一看 scrapy-redis 插件的架構(gòu)圖,進一步理解該插件:
3. 小結(jié)
本小節(jié)中我們改造了前面的爬取頭條熱點新聞的代碼,將其改造成一個分布式的爬蟲。接下來我們分析了 Scrapy-Redis 插件的源碼,對該插件的內(nèi)部原理有了深刻的了解。