第七色在线视频,2021少妇久久久久久久久久,亚洲欧洲精品成人久久av18,亚洲国产精品特色大片观看完整版,孙宇晨将参加特朗普的晚宴

Scrapy 的分布式實現(xiàn)

今天我們簡單介紹下 Scrapy 的分布式實現(xiàn)框架:Scrapy-Redis 并基于該插件完成一個簡單的分布式爬蟲案例。

1. 一個簡單的分布式爬蟲案例

我們以前面的第16講的頭條熱點新聞爬蟲基礎,使用 scrapy-redis 插件進行改造,使之支持分布式爬取?,F(xiàn)在我們按照如下的步驟進行。

環(huán)境準備。由于條件限制,我們只有2臺云主機,分別命名為 server 和 server2。兩臺主機的用途如下:

主機 服務 公網(wǎng) ip
server scrapy爬蟲 180.76.152.113
server2 scrapy爬蟲、redis服務 47.115.61.209

先準備好 redis 服務,redis 服務的搭建以及設置密碼等步驟在第一部分中已經(jīng)介紹過了,這里就不再重復介紹了;

[root@server ~]# redis-cli -h 47.115.61.209 -p 6777
47.115.61.209:6777> auth spyinx
OK
47.115.61.209:6777> get hello
"new world"

我們在 server 和 server2 上都進行測試,確保都能連上 server2 上的 redis 服務。

安裝 scrapy 和 scrapy-redis;

[root@server2 ~]# pip3 install scrapy scrapy-redis
# ...

改造 spider 代碼,將原先繼承的 Spider 類改為繼承 scrapy-redis 插件中的 RedisSpider,同時去掉 start_requests() 方法:

# from scrapy import Request, Spider
from scrapy_redis.spiders import RedisSpider

# ...

class HotnewsSpider(RedisSpider):
    # ...
    
    # 注釋start_requests()方法
    # def start_requests(self):
        # request_url = self._get_url(max_behot_time)
        # self.logger.info(f"we get the request url : {request_url}")
        # yield Request(request_url, headers=headers, cookies=cookies, callback=self.parse)
        
    # ...

改造下原先的 pipelines.py 代碼,為了能實時將數(shù)據(jù)保存到數(shù)據(jù)庫中,我們挪動下 SQL 語句 commit 的位置,同時去掉原先的郵件發(fā)送功能:

# Define your item pipelines here
#
# Don't forget to add your pipeline to the ITEM_PIPELINES setting
# See: https://docs.scrapy.org/en/latest/topics/item-pipeline.html


# useful for handling different item types with a single interface
import logging
from string import Template
from itemadapter import ItemAdapter
import pymysql


from toutiao_hotnews.mail import HtmlMailSender
from toutiao_hotnews.items import ToutiaoHotnewsItem
from toutiao_hotnews.html_template import hotnews_template_html
from toutiao_hotnews import settings

class ToutiaoHotnewsPipeline:
    logger = logging.getLogger('pipelines_log')

    def open_spider(self, spider):
        # 初始化連接數(shù)據(jù)庫
        self.db = pymysql.connect(
            host=spider.settings.get('MYSQL_HOST', 'localhost'),                 
            user=spider.settings.get('MYSQL_USER', 'root'),
            password=spider.settings.get('MYSQL_PASS', '123456'),
            port=spider.settings.get('MYSQL_PORT', 3306),
            db=spider.settings.get('MYSQL_DB_NAME', 'mysql'),
            charset='utf8'
        ) 
        self.cursor = self.db.cursor()

    def process_item(self, item, spider):
        # 插入sql語句
        sql = "insert into toutiao_hotnews(title, abstract, source, source_url, comments_count, behot_time) values (%s, %s, %s, %s, %s, %s)"
        if item and isinstance(item, ToutiaoHotnewsItem):
            self.cursor.execute(sql, (item['title'], item['abstract'], item['source'], item['source_url'], item['comments_count'], item['behot_time']))
        # 將commit語句移動到這里
        self.db.commit()
        return item

    def query_data(self, sql):
        data = {}
        try:
            self.cursor.execute(sql)
            data = self.cursor.fetchall()
        except Exception as e:
            logging.error('database operate error:{}'.format(str(e)))
            self.db.rollback()
        return data

    def close_spider(self, spider):
        self.cursor.close()
        self.db.close()

接下來就是配置 settings.py 了,我們首先要設置好 UserAgent,這一步是所有爬蟲必須的。另外,針對 scrapy-redis 插件,我們只需要設置 scrapy-redis 的調(diào)度器和去重過濾器以及 Redis 的連接配置即可。如果想要將抓取的結(jié)果保存到 Redis 中,需要在 ITEM_PIPELINES 值中添加 scrapy-redis 的 item pipeline 即可。這里我們相應的配置如下:

# ...

USER_AGENT = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.106 Safari/537.36'

# ...

ITEM_PIPELINES = {
    'toutiao_hotnews.pipelines.ToutiaoHotnewsPipeline': 300,
    # 指定scrapy-redis的pipeline,將結(jié)果保存到redis中
    'scrapy_redis.pipelines.RedisPipeline': 400,
}

# ...
SCHEDULER = 'scrapy_redis.scheduler.Scheduler'
DUPEFILTER_CLASS = 'scrapy_redis.dupefilter.RFPDupeFilter'
# 設置連接 Redis 的 URL
REDIS_URL = 'redis://:spyinx@47.115.61.209:6777'

就這樣簡單改造后,一個支持分布式的爬蟲就完成了。我們在每臺云主機上上傳該爬蟲代碼,然后在爬蟲項目目錄下執(zhí)行 scrapy crawl hotnews 運行爬蟲。此時,所有的爬蟲都會處于等待狀態(tài),需要手動將起始的請求 URL 設置到 redis 的請求列表中,相應的 key 默認為 hotnews:start_urls。添加的 redis 命令為:

> lpush hotnews:start_urls url

為此我準備了一段 python 代碼幫助我們完成 url 的生成以及推送到 redis 中:

# 位置: 在 toutiao_hotnews 目錄下,和 scrapy.cfg 文件同一級

import redis
from toutiao_hotnews.spiders.hotnews import HotnewsSpider

spider = HotnewsSpider()
r = redis.Redis(host='47.115.61.209', port=6777, password='spyinx', db=0)
request_url = spider._get_url(0)
r.lpush("hotnews:start_urls", request_url)

接下來,我們看看這個分布式爬蟲的運行效果:

上面的視頻中,我們啟動了兩個 scrapy 爬蟲,他們分別監(jiān)聽 redis 中的 hotnews:start_urls 列表,當里面有數(shù)據(jù)時其中一個爬蟲便會讀取該 url 然后開始爬取動作;后面我們停止其中一個爬蟲時,繼續(xù)向 redis 中添加一個請求的 url 則另一個 scrapy 爬蟲也會繼續(xù)正常工作。 由于我們添加了兩個 item pipeline,其中第一個會將 item 數(shù)據(jù)保存到數(shù)據(jù)庫中,scrapy_redis 的 item pipeline 則會將抓取的 item 結(jié)果保存到 redis 中,對應的 key 名為 hotnews:items。

在對 scrapy-redis 插件有了一定的了解后,我們來分析一下 scrapy-redis 的源碼并解釋下上述工程的執(zhí)行流程。

2. Scrapy-Redis 源碼分析

我們從 github 上可以找到 scrapy-redis 插件的源碼。它的代碼少而精,但是簡單的擴展就能使得 scrapy 框架具備分布式功能,因此它在 github 上也收獲了不少贊。我們下載其源碼來一窺其內(nèi)部原理:

圖片描述

scrapy-redis插件源碼一覽

我們會從一開始繼承的 RedisSpider 類開始學起,并逐步深入源碼學習。

2.1 scrapy-redis 中的 RedisSpider 類分析

來看 RedisSpider 類的定義,它位于源碼的 spider.py 文件中:

# 源碼位置:scrapy_redis/spider.py

# ...

class RedisSpider(RedisMixin, Spider):
    """Spider that reads urls from redis queue when idle.

    Attributes
    ----------
    redis_key : str (default: REDIS_START_URLS_KEY)
        Redis key where to fetch start URLs from..
    redis_batch_size : int (default: CONCURRENT_REQUESTS)
        Number of messages to fetch from redis on each attempt.
    redis_encoding : str (default: REDIS_ENCODING)
        Encoding to use when decoding messages from redis queue.

    Settings
    --------
    REDIS_START_URLS_KEY : str (default: "<spider.name>:start_urls")
        Default Redis key where to fetch start URLs from..
    REDIS_START_URLS_BATCH_SIZE : int (deprecated by CONCURRENT_REQUESTS)
        Default number of messages to fetch from redis on each attempt.
    REDIS_START_URLS_AS_SET : bool (default: False)
        Use SET operations to retrieve messages from the redis queue. If False,
        the messages are retrieve using the LPOP command.
    REDIS_ENCODING : str (default: "utf-8")
        Default encoding to use when decoding messages from redis queue.

    """

    @classmethod
    def from_crawler(self, crawler, *args, **kwargs):
        obj = super(RedisSpider, self).from_crawler(crawler, *args, **kwargs)
        # 設置redis相關信息
        obj.setup_redis(crawler)
        return obj
    
    
# ...

源碼中關于該類的說明已經(jīng)非常清楚了,我們簡單翻一下就是:空閑時從 redis 隊列中讀取 urls 的 spider。來關注 from_crawler() 方法的第二個語句:setup_redis(),其實現(xiàn)代碼如下:

class RedisMixin(object):
    """Mixin class to implement reading urls from a redis queue."""
    redis_key = None
    redis_batch_size = None
    redis_encoding = None

    # Redis client placeholder.
    server = None

    def start_requests(self):
        """Returns a batch of start requests from redis."""
        return self.next_requests()

    def setup_redis(self, crawler=None):
        """Setup redis connection and idle signal.

        This should be called after the spider has set its crawler object.
        """
        if self.server is not None:
            return

        if crawler is None:
            # We allow optional crawler argument to keep backwards
            # compatibility.
            # XXX: Raise a deprecation warning.
            crawler = getattr(self, 'crawler', None)

        if crawler is None:
            raise ValueError("crawler is required")

        settings = crawler.settings

        # 設置redis_key屬性值,如果settings.py中沒有設置,就會使用默認的key值
        if self.redis_key is None:
            self.redis_key = settings.get(
                'REDIS_START_URLS_KEY', defaults.START_URLS_KEY,
            )

        self.redis_key = self.redis_key % {'name': self.name}

        # 確保redis_key值不為空
        if not self.redis_key.strip():
            raise ValueError("redis_key must not be empty")

        # 獲取redis_batch_size的值,并轉(zhuǎn)成int類型
        if self.redis_batch_size is None:
            # TODO: Deprecate this setting (REDIS_START_URLS_BATCH_SIZE).
            self.redis_batch_size = settings.getint(
                'REDIS_START_URLS_BATCH_SIZE',
                settings.getint('CONCURRENT_REQUESTS'),
            )

        try:
            self.redis_batch_size = int(self.redis_batch_size)
        except (TypeError, ValueError):
            raise ValueError("redis_batch_size must be an integer")

        # 獲取redis_batch_size值
        if self.redis_encoding is None:
            self.redis_encoding = settings.get('REDIS_ENCODING', defaults.REDIS_ENCODING)

        self.logger.info("Reading start URLs from redis key '%(redis_key)s' "
                         "(batch size: %(redis_batch_size)s, encoding: %(redis_encoding)s",
                         self.__dict__)

        # 獲取server屬性,即redis的連接
        self.server = connection.from_settings(crawler.settings)
        # The idle signal is called when the spider has no requests left,
        # that's when we will schedule new requests from redis queue
        crawler.signals.connect(self.spider_idle, signal=signals.spider_idle)

    def next_requests(self):
        """Returns a request to be scheduled or none."""
        use_set = self.settings.getbool('REDIS_START_URLS_AS_SET', defaults.START_URLS_AS_SET)
        fetch_one = self.server.spop if use_set else self.server.lpop
        # XXX: Do we need to use a timeout here?
        found = 0
        # TODO: Use redis pipeline execution.
        while found < self.redis_batch_size:
            data = fetch_one(self.redis_key)
            if not data:
                # Queue empty.
                break
            req = self.make_request_from_data(data)
            if req:
                yield req
                found += 1
            else:
                self.logger.debug("Request not made from data: %r", data)

        if found:
            self.logger.debug("Read %s requests from '%s'", found, self.redis_key)

    def make_request_from_data(self, data):
        """Returns a Request instance from data coming from Redis.

        By default, ``data`` is an encoded URL. You can override this method to
        provide your own message decoding.

        Parameters
        ----------
        data : bytes
            Message from redis.

        """
        url = bytes_to_str(data, self.redis_encoding)
        return self.make_requests_from_url(url)

    def schedule_next_requests(self):
        """Schedules a request if available"""
        # TODO: While there is capacity, schedule a batch of redis requests.
        for req in self.next_requests():
            self.crawler.engine.crawl(req, spider=self)

    def spider_idle(self):
        """Schedules a request if available, otherwise waits."""
        # XXX: Handle a sentinel to close the spider.
        self.schedule_next_requests()
        raise DontCloseSpider

上面的代碼比較簡單,也十分容易看懂。關于 setup_redis() 方法:該方法主要是設置 redis 相關信息,同時連接 redis 并得到連接屬性 self.server 值;從該方法中我們可以看到在 settings.py 中我們可以設置如下幾個參數(shù):

  • REDIS_START_URLS_KEY:設置起始 urls 的 key,前面我們知道沒有設置是,默認的 key 是 爬蟲名:start_urls,這在代碼中也有所體現(xiàn);

    if self.redis_key is None:
        self.redis_key = settings.get(
            'REDIS_START_URLS_KEY', defaults.START_URLS_KEY,
        )
    # default.py中有START_URLS_KEY = '%(name)s:start_urls'
    self.redis_key = self.redis_key % {'name': self.name}
    
  • REDIS_START_URLS_BATCH_SIZE:已經(jīng)移除了;

  • REDIS_ENCODING: 設置 redis 中的編碼類型;

  • REDIS 的相關配置,主要在如下的語句中讀?。?/p>

    self.server = connection.from_settings(crawler.settings)
    

    我們來跟蹤下這個 connection.from_settings() 的代碼,位于 connection.py 中:

    # 源碼位置:scrapy_redis/connection.py
    import six
    
    from scrapy.utils.misc import load_object
    
    from . import defaults
    
    # 重要的映射關系,對應著settings.py的
    SETTINGS_PARAMS_MAP = {
        'REDIS_URL': 'url',
        'REDIS_HOST': 'host',
        'REDIS_PORT': 'port',
        'REDIS_ENCODING': 'encoding',
    }
    
    def get_redis_from_settings(settings):
        params = defaults.REDIS_PARAMS.copy()
        params.update(settings.getdict('REDIS_PARAMS'))
        # XXX: Deprecate REDIS_* settings.
        for source, dest in SETTINGS_PARAMS_MAP.items():
            val = settings.get(source)
            if val:
                params[dest] = val
    
        # Allow ``redis_cls`` to be a path to a class.
        if isinstance(params.get('redis_cls'), six.string_types):
            params['redis_cls'] = load_object(params['redis_cls'])
    
        return get_redis(**params)
    
    # Backwards compatible alias.
    from_settings = get_redis_from_settings
    
    
    def get_redis(**kwargs):
        # 默認使用defaults.REDIS_CLS,也就是redis.StrictRedis
        redis_cls = kwargs.pop('redis_cls', defaults.REDIS_CLS)
        url = kwargs.pop('url', None)
        if url:
            return redis_cls.from_url(url, **kwargs)
        else:
            return redis_cls(**kwargs)
    
    

上面的代碼非常簡單,get_redis_from_settings() 方法會從 settings.py 中讀取 REDIS_URL 、REDIS_HOST 、REDIS_PORT 等參數(shù),另外還會額外讀取 REDIS_PARAMS 。優(yōu)先使用 REDIS_URL 配置信息,來看看最核心的建立客戶端連接實例的代碼:

if url:
    return redis_cls.from_url(url, **kwargs)
else:
    return redis_cls(**kwargs)

這里的 redis_cls 正是第三方模塊類 redis.StrictRedis,當然我們也可以通過覆蓋 default.py 中的 REDIS_PARAMS 參數(shù)中的 redis_cls 來選擇新的操作 redis 的第三方模塊。

緊接著,我們注意到前面在改造 Scrapy 爬蟲時去掉了 start_requests() 這個方法。在啟動 scrapy 爬蟲后,爬蟲會等到 redis 的 urls 隊列中出現(xiàn)相應的起始 url 值,然后獲取該 url 開始數(shù)據(jù)爬取。我們來看看這個過程是如何實現(xiàn)的?

RedisSpider 類繼承了 RedisMixin 這個 mixin,它是 scrapy-redis 插件爬蟲需要單獨實現(xiàn)的功能類。該 Mixin 中正好實現(xiàn)了 start_requests() 方法,具體代碼如下:

# 源碼位置:scrapy_redis/spiders.py
# ...

class RedisMixin(object):
    # ...
    
    def start_requests(self):
        """Returns a batch of start requests from redis."""
        return self.next_requests()
    
    # ...
    
    def next_requests(self):
        """Returns a request to be scheduled or none."""
        use_set = self.settings.getbool('REDIS_START_URLS_AS_SET', defaults.START_URLS_AS_SET)
        fetch_one = self.server.spop if use_set else self.server.lpop
        # XXX: Do we need to use a timeout here?
        found = 0
        # TODO: Use redis pipeline execution.
        while found < self.redis_batch_size:
            data = fetch_one(self.redis_key)
            if not data:
                # Queue empty.
                break
            req = self.make_request_from_data(data)
            if req:
                yield req
                found += 1
            else:
                self.logger.debug("Request not made from data: %r", data)

        if found:
            self.logger.debug("Read %s requests from '%s'", found, self.redis_key)
    
    def make_request_from_data(self, data):
        url = bytes_to_str(data, self.redis_encoding)
        return self.make_requests_from_url(url)
    
    # ...
    

我們沒有設置 REDIS_START_URLS_AS_SET 值,所以默認使用 Redis 的列表類型。因此 fetch_one 方法就是 self.server.lpop,對應就是 Redis 中的 lpop (左彈出) 方法。我們的 URL 是使用 lpush 進去了,獲取該結(jié)果默認使用的是 lpop,因此我們知道先 push 進去的 url 元素就會后 pop 彈出并執(zhí)行。另外,我們看到,如果 redis 中對應的 urls 隊列中存在一個 url 元素后,執(zhí)行如下操作:

req = self.make_request_from_data(data)
if req:
    yield req
    found += 1
else:
    self.logger.debug("Request not made from data: %r", data)

來繼續(xù)看 self.make_request_from_data() 這個方法:

def make_request_from_data(self, data):
    url = bytes_to_str(data, self.redis_encoding)
    return self.make_requests_from_url(url)

這個 self.make_requests_from_url() 方法其實調(diào)用的是 scrapy 模塊中的 spider 類中的方法:

# 源碼位置:scrapy/spiders/__init__.py
# ...

class Spider(object_ref):
    # ...
    
    def make_requests_from_url(self, url):
        """ This method is deprecated. """
        warnings.warn(
            "Spider.make_requests_from_url method is deprecated: "
            "it will be removed and not be called by the default "
            "Spider.start_requests method in future Scrapy releases. "
            "Please override Spider.start_requests method instead."
        )
        return Request(url, dont_filter=True)

最終我們發(fā)現(xiàn)在 scrapy-redis 插件中,它的 RedisSpider 類也有默認的 start_requests() 方法,該方法從 redis 中指定的隊列中取出 urls 并封裝成 Scrapy 中的 Request 請求并 yield 給 Scrapy 的調(diào)度器去調(diào)度處理。

2.2 scrapy-redis 中的請求隊列

接下來我們來看看 scrapy-redis 在對 scrapy 框架對請求隊列做的一些改動。Scrapy 原生的請求隊列是存儲在內(nèi)存中的,這樣必然無法實現(xiàn)分布式功能。scrapy-redis 插件將這個請求隊列改造成基于 redis 數(shù)據(jù)庫的,通過這個 redis 數(shù)據(jù)庫,實現(xiàn)了三種請求隊列:

  • 優(yōu)先級隊列 (默認):PriorityQueue;
  • 先進先出隊列:FifoQueue;
  • 后進先出隊列:LifoQueue;

有了這個基于 Redis 數(shù)據(jù)庫的隊列后,所有的 Scrapy 爬蟲就能共享這一請求隊列,實現(xiàn)分布式協(xié)作的功能。對于請求隊列,主要的功能是入隊 (push) 和出隊 (pop)。這部分的代碼位于 scrapy_redis/queue.py 文件中,有興趣可以仔細閱讀三個隊列的入隊和出隊的實現(xiàn)。

2.3 scrapy-redis 中的去重過濾器

scrapy-redis 插件內(nèi)部實現(xiàn)了一個去重過濾器,同樣基于 Redis 數(shù)據(jù)庫。原生的 Scrapy 的去重功能是基于內(nèi)存的集合實現(xiàn),并不適合分布式的。scrapy-redis 通過 Redis 來實現(xiàn)數(shù)據(jù)共享,利用 Redis 的集合類型來實現(xiàn) 元素的去重,其代碼位于 scrapy_redis/dupefilter.py 文件中。我們可以看看其去重的最核心方法:

# 源碼位置:scrapy_redis/dupefilter.py
# ...

class RFPDupeFilter(BaseDupeFilter):
    # ...
    
    def request_seen(self, request):
        """Returns True if request was already seen.

        Parameters
        ----------
        request : scrapy.http.Request

        Returns
        -------
        bool

        """
        fp = self.request_fingerprint(request)
        # This returns the number of values added, zero if already exists.
        added = self.server.sadd(self.key, fp)
        return added == 0

    def request_fingerprint(self, request):
        """Returns a fingerprint for a given request.

        Parameters
        ----------
        request : scrapy.http.Request

        Returns
        -------
        str

        """
        return request_fingerprint(request)

    # ...

從代碼可知,請求會存入到 Redis 的集合中,從而實現(xiàn)去重功能,是不是非常簡單?

2.4 scrapy-redis 中的調(diào)度器

回憶我們的 scrapy 框架中調(diào)度器的功能:接收 scrapy 引擎?zhèn)鬟^來的請求 (入隊),然后從隊列中選出一個請求 (出隊) 發(fā)送給引擎去執(zhí)行下載。此時我們的請求隊列是在 Redis 中的,不能想象,這里 scrapy-redis 插件也是需要對 scrapy 的調(diào)度器模塊進行略微的調(diào)整,主要改造調(diào)度請求的入隊和出隊過程。此外,調(diào)度器還具備去重功能,因此這里也會使用前面改造的去重過濾器來實現(xiàn)對請求的去重。具體代碼如下:

# 源碼位置:scrapy_redis/scheduler.py
# ...

class Scheduler(object):
    # ...
    
    # 請求入隊
    def enqueue_request(self, request):
        if not request.dont_filter and self.df.request_seen(request):
            self.df.log(request, self.spider)
            return False
        if self.stats:
            self.stats.inc_value('scheduler/enqueued/redis', spider=self.spider)
        self.queue.push(request)
        return True

    # 請求出隊
    def next_request(self):
        block_pop_timeout = self.idle_before_close
        request = self.queue.pop(block_pop_timeout)
        if request and self.stats:
            self.stats.inc_value('scheduler/dequeued/redis', spider=self.spider)
        return request
   
    # ...

我們看到這里 scrapy-redis 實現(xiàn)的調(diào)度器模塊,其請求入隊操作過程為:

  • 先判斷是否需要去重,如果需要則使用 scrapy-redis 實現(xiàn)的去重過濾器進行判斷;
  • 判斷是否需要統(tǒng)計;
  • 使用 scrapy-redis 中基于 redis 實現(xiàn)的請求隊列進行入隊操作;

出隊操作則是調(diào)用 scrapy-redis 中實現(xiàn)的請求隊列進行出隊 (pop() 方法)。scrapy-redis 中調(diào)度器的核心步驟就是這兩步,大家看懂了嗎?

2.5 scrapy-redis 中的 Item Pipeline

最后我們來看 scrapy-redis 中定義的 item pipeline。前面我們在頭條新聞爬蟲的改造中只是在配置中添加了 scrapy-redis 中的 item pipeline,這樣爬蟲抓取的結(jié)果會保存到 redis 中,那么該 pipeline 是如何實現(xiàn)的呢?其代碼位于 scrapy_redis/pipelines.py 文件中,我們來一覽究竟:

# 源碼位置:scrapy_redis/pipelines.py
# ...

class RedisPipeline(object):

    def __init__(self, server,
                 key=defaults.PIPELINE_KEY,
                 serialize_func=default_serialize):
        """Initialize pipeline.

        Parameters
        ----------
        server : StrictRedis
            Redis client instance.
        key : str
            Redis key where to store items.
        serialize_func : callable
            Items serializer function.

        """
        self.server = server
        self.key = key
        self.serialize = serialize_func

    @classmethod
    def from_settings(cls, settings):
        params = {
            'server': connection.from_settings(settings),
        }
        if settings.get('REDIS_ITEMS_KEY'):
            params['key'] = settings['REDIS_ITEMS_KEY']
        if settings.get('REDIS_ITEMS_SERIALIZER'):
            params['serialize_func'] = load_object(
                settings['REDIS_ITEMS_SERIALIZER']
            )

        return cls(**params)

    @classmethod
    def from_crawler(cls, crawler):
        return cls.from_settings(crawler.settings)

    def process_item(self, item, spider):
        return deferToThread(self._process_item, item, spider)

    def _process_item(self, item, spider):
        key = self.item_key(item, spider)
        data = self.serialize(item)
        self.server.rpush(key, data)
        return item

    def item_key(self, item, spider):
        """Returns redis key based on given spider.

        Override this function to use a different key depending on the item
        and/or spider.

        """
        return self.key % {'spider': spider.name}

這段代碼也是簡潔明了,首先是初始化三個屬性值:

  • server:redis 客戶端實例,用于對 redis 進行操作;
  • key:結(jié)果保存到 redis 中的 key 名;
  • serialize:指定結(jié)果序列化類;

作為 scrapy 中的 pipeline,最核心的處理函數(shù)就是 process_item() 方法。在該 pipeline 中,該方法只有一條語句:

deferToThread(self._process_item, item, spider)

deferToThread() 方法是 Twisted 框架提供的一個方法,其含義如下:

Run a function in a thread and return the result as a Deferred

其實就是開啟一個線程執(zhí)行相應的方法,并將結(jié)果作為一個 Deferred 返回。我們并不關心這個 Deferred 是啥,在最后一部分源碼篇中會介紹到,這里我們只關心處理 item 的操作是 self._process_item() 這個方法。該方法的邏輯非常簡單明了:

  • 生成保存到 Redis 中的 key;
  • 將 item 值序列化以便能保存到 Redis 中;
  • 調(diào)用 redis 的 rpush() 方法將序列化結(jié)果保存到相應列表中;

看完了 scrapy-redis 中 RedisPipeline 的代碼,是不是知道為什么結(jié)果會保存到 redis 中了吧?就這樣,我們幾乎學完了 scrapy-redis 插件的全部源碼,下來我們來看一看 scrapy-redis 插件的架構(gòu)圖,進一步理解該插件:

圖片描述

scrapy-redis 插件架構(gòu)圖

3. 小結(jié)

本小節(jié)中我們改造了前面的爬取頭條熱點新聞的代碼,將其改造成一個分布式的爬蟲。接下來我們分析了 Scrapy-Redis 插件的源碼,對該插件的內(nèi)部原理有了深刻的了解。