1 回答

TA貢獻(xiàn)1827條經(jīng)驗(yàn) 獲得超4個(gè)贊
您可以運(yùn)行分布式系統(tǒng)并通過中央排隊(duì)系統(tǒng)傳遞數(shù)據(jù)。采用 Unix 哲學(xué)并創(chuàng)建一個(gè)單一的應(yīng)用程序,它可以完成一些任務(wù)并且做得很好。創(chuàng)建一個(gè)嗅探數(shù)據(jù)包的應(yīng)用程序(您可以scapy在此處使用,因?yàn)槿绻柚谷魏蝺?nèi)容都無關(guān)緊要),然后將它們發(fā)送到隊(duì)列(RabitMQ、Redis、SQS 等)并讓另一個(gè)應(yīng)用程序處理來自隊(duì)列的數(shù)據(jù)包。這種方法應(yīng)該給你帶來最少的頭痛。
如果您需要在單個(gè)應(yīng)用程序中運(yùn)行所有內(nèi)容,那么線程/多處理是唯一的選擇。但是有一些設(shè)計(jì)模式你會想要遵循。您還可以將以下代碼分解為單獨(dú)的函數(shù)并使用專用的排隊(duì)系統(tǒng)。
from threading import Thread
from time import sleep
from twisted.internet import defer, reactor
class Sniffer(Thread):
def __init__(self, _reactor, shared_queue):
super().__init__()
self.reactor = _reactor
self.shared_queue = shared_queue
def run(self):
"""
Sniffer logic here
"""
while True:
self.reactor.callFromThread(self.shared_queue.put, 'hello world')
sleep(5)
@defer.inlineCallbacks
def consume_from_queue(_id, _reactor, shared_queue):
item = yield shared_queue.get()
print(str(_id), item)
_reactor.callLater(0, consume_from_queue, _id, _reactor, shared_queue)
def main():
shared_queue = defer.DeferredQueue()
sniffer = Sniffer(reactor, shared_queue)
sniffer.daemon = True
sniffer.start()
workers = 4
for i in range(workers):
consume_from_queue(i+1, reactor, shared_queue)
reactor.run()
main()
該Sniffer班開始扭曲的控制之外。請注意sniffer.daemon = True,這是為了在主線程停止時(shí)線程將停止。如果它被設(shè)置為False(默認(rèn)),那么只有當(dāng)所有線程都結(jié)束時(shí),應(yīng)用程序才會退出。根據(jù)手頭的任務(wù),這可能總是也可能不總是可能的。如果您可以暫停嗅探以檢查線程事件,那么您也許能夠以更安全的方式停止線程。
self.reactor.callFromThread(self.shared_queue.put, 'hello world')是必要的,以便放入隊(duì)列的項(xiàng)目發(fā)生在主反應(yīng)器線程中,而不是Sniffer執(zhí)行的線程中。這樣做的主要好處是來自線程的消息會有某種同步(假設(shè)您計(jì)劃擴(kuò)展到嗅探多個(gè)接口)。另外,我不確定DeferredQueue對象是否是線程安全的 :) 我像對待它們一樣對待它們。
由于在這種情況下 Twisted 不管理線程,因此開發(fā)人員管理至關(guān)重要。注意worker循環(huán)和consume_from_queue(i+1, reactor, shared_queue)。此循環(huán)確保只有所需數(shù)量的工作人員正在處理任務(wù)。在consume_from_queue()函數(shù)內(nèi)部,shared_queue.get()將等待(非阻塞),直到將一個(gè)項(xiàng)目放入隊(duì)列,打印該項(xiàng)目,然后安排另一個(gè)consume_from_queue().
添加回答
舉報(bào)