第七色在线视频,2021少妇久久久久久久久久,亚洲欧洲精品成人久久av18,亚洲国产精品特色大片观看完整版,孙宇晨将参加特朗普的晚宴

Twisted 框架基礎(chǔ)

今天我們會(huì)先簡(jiǎn)單過一遍 Twisted 框架中的一些核心知識(shí)點(diǎn),但是 Twisted 框架龐大而又復(fù)雜,不適合在一節(jié)內(nèi)容中全部囊括。我們只需要掌握在 Scrapy 框架中經(jīng)常用到的那部分模塊和方法即可。此外,我們將會(huì)重點(diǎn)分析 Scrapy 中對(duì) Twisted 模塊的進(jìn)一步封裝,幫助我們更好地理解接下來的源碼分析過程。

1. Twisted 中的核心類和方法

Twisted 是用 Python 實(shí)現(xiàn)的基于事件驅(qū)動(dòng)的網(wǎng)絡(luò)引擎框架,是 Python 中一個(gè)強(qiáng)大的異步 IO 庫(kù),類似于 Java 中的 NIO。為了能更好的掌握 Twisted 模塊,我們需要先弄清楚 Twisted 中幾個(gè)核心的概念: reactor、Protocol、ProtocolFactory、Transport 以及 Deffered 等,我們接下來逐一說明。

1.1 Reactor

Twisted 實(shí)現(xiàn)了設(shè)計(jì)模式中的反應(yīng)堆(reactor)模式,這種模式在單線程環(huán)境中調(diào)度多個(gè)事件源產(chǎn)生的事件到它們各自的事件處理例程中去。Twisted 的核心就是 reactor 事件循環(huán)。Reactor 可以感知網(wǎng)絡(luò)、文件系統(tǒng)以及定時(shí)器事件。它等待然后處理這些事件,從特定于平臺(tái)的行為中抽象出來,并提供統(tǒng)一的接口,使得在網(wǎng)絡(luò)協(xié)議棧的任何位置對(duì)事件做出響應(yīng)都變得簡(jiǎn)單?;旧蟫eactor完成的任務(wù)就是:循環(huán)等待事件,然后處理事件。

1.2 Deferred 和 DeferredList

Deferred 對(duì)象以抽象化的方式表達(dá)了一種思想,即結(jié)果還尚不存在。它同樣能夠幫助管理產(chǎn)生這個(gè)結(jié)果所需要的回調(diào)鏈。當(dāng)從函數(shù)中返回時(shí),Deferred 對(duì)象承諾在某個(gè)時(shí)刻函數(shù)將產(chǎn)生一個(gè)結(jié)果。返回的 Deferred 對(duì)象中包含所有注冊(cè)到事件上的回調(diào)引用,因此在函數(shù)間只需要傳遞這一個(gè)對(duì)象即可,跟蹤這個(gè)對(duì)象比單獨(dú)管理所有的回調(diào)要簡(jiǎn)單的多。

Deferred 對(duì)象包含一對(duì)回調(diào)鏈,一個(gè)是針對(duì)操作成功的回調(diào),一個(gè)是針對(duì)操作失敗的回調(diào)。初始狀態(tài)下 Deferred 對(duì)象的兩條鏈都為空。在事件處理的過程中,每個(gè)階段都為其添加處理成功的回調(diào)和處理失敗的回調(diào)。當(dāng)一個(gè)異步結(jié)果到來時(shí),Deferred 對(duì)象就被“激活”,那么處理成功的回調(diào)和處理失敗的回調(diào)就可以以合適的方式按照它們添加進(jìn)來的順序依次得到調(diào)用。

注意:Deferred 對(duì)象只能被激活一次,如果試圖重復(fù)激活將引發(fā)一個(gè)異常。

案例1:Deferred 的使用案例。

from twisted.internet import reactor, defer
from twisted.python import failure


def callback_func1(r):
    print('回調(diào)方法1: 結(jié)果=%s' % r)
    return r * r


def callback_func2(r):
    print('回調(diào)方法2: 傳入結(jié)果=%s' % r)
    raise ValueError('value error')


def errback_func(f):
    print('錯(cuò)誤回調(diào):{}'.format(f))

d = defer.Deferred()
d.addCallback(callback_func1)
d.addCallback(callback_func2)

d.addErrback(errback_func)

d.callback(10)

上面我們創(chuàng)建了一個(gè) Deferred 對(duì)象,然后在其中加入兩個(gè)正常回調(diào)方法以及一個(gè)錯(cuò)誤回調(diào)。接下來我們執(zhí)行回調(diào)并帶上一個(gè)參數(shù),直接的結(jié)果如下:

PS D:\learning-notes\慕課網(wǎng)教程\scrapy-lessons\code>python chap23/deferred_test2.py
回調(diào)方法1: 結(jié)果=10
回調(diào)方法2: 傳入結(jié)果=100
錯(cuò)誤回調(diào):[Failure instance: Traceback: <class 'ValueError'>: value error
d:/learning-notes/慕課網(wǎng)教程/scrapy-lessons/code/chap23/deferred_test2.py:24:<module>
D:\Program Files (x86)\python3\lib\site-packages\twisted\internet\defer.py:460:callback
D:\Program Files (x86)\python3\lib\site-packages\twisted\internet\defer.py:568:_startRunCallbacks
--- <exception caught here> ---
D:\Program Files (x86)\python3\lib\site-packages\twisted\internet\defer.py:654:_runCallbacks
d:/learning-notes/慕課網(wǎng)教程/scrapy-lessons/code/chap23/deferred_test2.py:12:callback_func2
]

可以看到,這里回調(diào)函數(shù)會(huì)依次執(zhí)行,同時(shí)將回調(diào)返回結(jié)果作為下一個(gè)回調(diào)的輸入。此外,在回調(diào)函數(shù)中手工拋出一個(gè)異常后,程序?qū)⑦M(jìn)入錯(cuò)誤回調(diào)鏈執(zhí)行。

上面的 Deferred 對(duì)象都是針對(duì)單個(gè)事件的異步回調(diào),而 DeferredList 對(duì)象使我們可以將一個(gè) Deferred 對(duì)象列表視為一個(gè) Deferred 對(duì)象,然后我們啟動(dòng)多個(gè)異步操作并且在它們?nèi)客瓿珊笤賵?zhí)行回調(diào),而無論它們成功或者失敗。

案例2: DeferredList 的使用案例。

"""
DeferredList 對(duì)象使用實(shí)例
"""
from twisted.internet import defer

def print_result(r):
    print(r)


def add_num_10(r):
    return r + 10


def add_num_20(r):
    return r + 20


d1 = defer.Deferred()
d2 = defer.Deferred()
# 一定要在DeferredList之前才能回調(diào)生效
d1.addCallback(add_num_10)
d2.addCallback(add_num_20)
d = defer.DeferredList([d1, d2])
d.addCallback(print_result)
# 回調(diào)的順序無關(guān),只和加入到DeferredList中的元素順序相關(guān)
d1.callback(1) 
d2.callback(2)


d1 = defer.Deferred()
d2 = defer.Deferred()
d = defer.DeferredList([d1, d2])
# 回調(diào)無效
d1.addCallback(add_num_10)
d2.addCallback(add_num_20)
d.addCallback(print_result)
d1.callback(1) 
d2.callback(2)

運(yùn)行結(jié)果如下:

PS D:\learning-notes\慕課網(wǎng)教程\scrapy-lessons\code>python chap23/deferredlist_example.py
[(True, 11), (True, 22)]
[(True, 1), (True, 2)]

我們可以看到,上面的代碼有一個(gè) DeferredList 列表,其回調(diào)結(jié)果返回的是列表中 Deferred 對(duì)象的回調(diào)結(jié)果,這個(gè)結(jié)果的順序是 DeferredList 中元素的順序。例如我們改動(dòng)下回調(diào)的順序:

d2.callback(2) 
d1.callback(1)

輸出的結(jié)果和原來還是一樣:

PS D:\learning-notes\慕課網(wǎng)教程\scrapy-lessons\code>python chap23/deferredlist_example.py
[(True, 11), (True, 22)]
[(True, 1), (True, 2)]

1.3 Transports

Transports 代表網(wǎng)絡(luò)中兩個(gè)通信結(jié)點(diǎn)之間的連接。Transports 負(fù)責(zé)描述連接的細(xì)節(jié),比如連接是 TCP 的還是 UDP 等。它們被設(shè)計(jì)為“滿足最小功能單元,同時(shí)具有最大程度的可復(fù)用性,而且從協(xié)議實(shí)現(xiàn)中分離出來,這讓許多協(xié)議可以采用相同類型的傳輸。Transports 實(shí)現(xiàn)了 ITransports 接口,它包含如下的方法:

  • write():以非阻塞的方式按順序依次將數(shù)據(jù)寫到物理連接上;
  • writeSequence():將一個(gè)字符串列表寫到物理連接上;
  • loseConnection():將所有掛起的數(shù)據(jù)寫入,然后關(guān)閉連接;
  • getPeer():取得連接中對(duì)端的地址信息;
  • getHost():取得連接中本端的地址信息;

1.4 Factory 和 Protocol

Protocols 描述了如何以異步的方式處理網(wǎng)絡(luò)中的事件。HTTP、DNS 以及 IMAP 是應(yīng)用層協(xié)議中的例子。Protocols實(shí)現(xiàn)了 IProtocol 接口,它包含如下的方法:

  • makeConnection(): 在 transport 對(duì)象和服務(wù)器之間建立一個(gè)連接;
  • connectionMade(): 連接建立起來后調(diào)用;
  • dataReceived(): 接收數(shù)據(jù)時(shí)調(diào)用;
  • connectionLost(): 關(guān)閉連接時(shí)調(diào)用;

Factory 和 Protocol 有嚴(yán)格的不同。Factory 的工作是管理連接事件,并且創(chuàng)建 Protocol 對(duì)象處理每一個(gè)成功的連接。一旦連接建立,Protocol 對(duì)象就接管下面的工作了,包括收發(fā)數(shù)據(jù)和決定是否關(guān)閉連接。

1.5 inlineCallbacks

在【文獻(xiàn)4】中對(duì)該裝飾器有詳細(xì)的介紹,我們先來看下該裝飾器的作用:

twisted.internet.defer.inlineCallbacks 裝飾器是用于同步【異步操作】 的。它用于裝飾生成器函數(shù)。調(diào)用該裝飾器裝飾的生成器函數(shù)會(huì)返回一個(gè)Deferred對(duì)象。

其大致執(zhí)行流程如下 (取自文獻(xiàn)4):
圖片描述

inlineCallbacks 裝飾器執(zhí)行流程

2. Scrapy 中對(duì) Twisted 模塊的進(jìn)一步封裝

qi我們來看看 Scrapy 框架源碼中對(duì) Twisted 模塊的一些封裝代碼,主要有兩個(gè)文件:scrapy/utils/defer.pyscrapy/utils/reactor.py。

圖片描述

Scrapy 源碼中對(duì) Twisted 模塊的簡(jiǎn)單封裝文件

其中和后面源碼分析中緊密相關(guān)的主要是 defer.py 文件,我們也先重點(diǎn)先學(xué)習(xí)這里的代碼。

# 源碼位置:scrapy/utils/defer.py
# ...


def defer_fail(_failure):
    from twisted.internet import reactor
    d = defer.Deferred()
    reactor.callLater(0.1, d.errback, _failure)
    return d


def defer_succeed(result):
    from twisted.internet import reactor
    d = defer.Deferred()
    reactor.callLater(0.1, d.callback, result)
    return d


def defer_result(result):
    if isinstance(result, defer.Deferred):
        return result
    elif isinstance(result, failure.Failure):
        return defer_fail(result)
    else:
        return defer_succeed(result)


def mustbe_deferred(f, *args, **kw):
    try:
        result = f(*args, **kw)
    except IgnoreRequest as e:
        return defer_fail(failure.Failure(e))
    except Exception:
        return defer_fail(failure.Failure())
    else:
        return defer_result(result)
    
# ...

上面代碼中定義的 mustbe_deferred() 方法在后面會(huì)經(jīng)常使用,它的調(diào)用會(huì)牽扯到最前面的三個(gè)方法:

  • defer_fail():等同于 twisted.internet.defer.fail,會(huì)延遲到下一個(gè) reactor 循環(huán)才會(huì)調(diào)用錯(cuò)誤回調(diào)方法;
  • defer_succeed():等同于 twisted.internet.defer.succeed,會(huì)延遲到下一個(gè) reactor 循環(huán)才會(huì)調(diào)用成功的回調(diào)方法;
  • defer_result(): 根據(jù)結(jié)果分別進(jìn)行延遲的成功回調(diào)或者錯(cuò)誤回調(diào);
# 源碼位置:scrapy/utils/defer.py
# ...

def parallel(iterable, count, callable, *args, **named):
    coop = task.Cooperator()
    # 封裝調(diào)用的方法
    work = (callable(elem, *args, **named) for elem in iterable)
    return defer.DeferredList([coop.coiterate(work) for _ in range(count)])


def process_chain(callbacks, input, *a, **kw):
    """Return a Deferred built by chaining the given callbacks"""
    d = defer.Deferred()
    # 添加回調(diào)鏈
    for x in callbacks:
        d.addCallback(x, *a, **kw)
    d.callback(input)
    return d


def process_chain_both(callbacks, errbacks, input, *a, **kw):
    """Return a Deferred built by chaining the given callbacks and errbacks"""
    d = defer.Deferred()
    for cb, eb in zip(callbacks, errbacks):
        d.addCallbacks(
            callback=cb, errback=eb,
            callbackArgs=a, callbackKeywords=kw,
            errbackArgs=a, errbackKeywords=kw,
        )
    if isinstance(input, failure.Failure):
        d.errback(input)
    else:
        d.callback(input)
    return d


def process_parallel(callbacks, input, *a, **kw):
    dfds = [defer.succeed(input).addCallback(x, *a, **kw) for x in callbacks]
    d = defer.DeferredList(dfds, fireOnOneErrback=1, consumeErrors=1)
    d.addCallbacks(lambda r: [x[1] for x in r], lambda f: f.value.subFailure)
    return d

我們來分別對(duì)這四個(gè)方法進(jìn)行說明:

  • parallel():對(duì)給定可迭代的對(duì)象并行調(diào)用,使用不超過 count 個(gè)并發(fā)調(diào)用。該方法通過封裝 Twisted 中的 task.Cooperator 對(duì)象,實(shí)現(xiàn)對(duì)任務(wù)的并發(fā)調(diào)用;
  • process_chain():代碼非常簡(jiǎn)單,就是單純的將輸入的 callbacks 依次加入到回調(diào)鏈中并返回對(duì)應(yīng)的 Deferred 對(duì)象;
  • process_chain_both():對(duì)上面方法的擴(kuò)展,構(gòu)建正?;卣{(diào)鏈以及錯(cuò)誤回調(diào)鏈,并返回對(duì)應(yīng)的 Deferred 對(duì)象;
  • process_parallel():返回一個(gè)包含對(duì)給定回調(diào)的所有成功調(diào)用輸出的 Deferred 對(duì)象;

上述這些方法可以通過閱讀其實(shí)現(xiàn)理解其功能,最后剩余的幾個(gè)方法在后續(xù)的源碼介紹中沒有用到,故暫時(shí)不進(jìn)行介紹,大家可以自行參考相關(guān)資料理解。

3. 小結(jié)

本小節(jié)中我們介紹了 Twisted 框架中的幾個(gè)核心概念,主要是 reactor、FactoryProtocol 以及 Deferred 等,這些是 Twisted 的核心組成部分,也會(huì)在 Scrapy 中大量應(yīng)用和進(jìn)一步封裝。此外,我們完成了一些基于 Twisted 模塊的案例,這些也幫助我們很好的理解了 Twisted 模塊的使用。最后我們進(jìn)一步分析了 Scrapy 框架中對(duì) Twisted 模塊核心方法所做的一些封裝,這些也為了后續(xù)更好地理解 Scrapy 框架源碼打好堅(jiān)實(shí)的基礎(chǔ)。