3 回答

TA貢獻1829條經(jīng)驗 獲得超9個贊
生產(chǎn)者-消費者隊列有許多不同的實現(xiàn)方式,例如queue.Queue可用。它們通常具有許多不同的屬性,例如Dmitry Vyukov在此出色的文章中列出的屬性。如您所見,可能有超過1萬種不同的組合。根據(jù)要求,用于此類隊列的算法也相差很大。僅擴展現(xiàn)有隊列算法以保證其他屬性是不可能的,因為這通常需要不同的內部數(shù)據(jù)結構和不同的算法。
Go的頻道提供了相對較高的保證屬性,因此這些頻道可能適用于許多程序。最困難的要求之一是支持一次讀取/阻塞多個通道(select語句),并且如果select語句中可以有多個分支可以繼續(xù)進行,則要公平地選擇一個通道,這樣就不會留下任何消息。 。Python的queue.Queue不提供此功能,因此根本無法使用它來存檔相同的行為。
因此,如果要繼續(xù)使用queue.Queue,則需要找到解決該問題的方法。但是,變通辦法有其自身的缺點列表,并且較難維護。尋找另一個提供所需功能的生產(chǎn)者-消費者隊列可能是一個更好的主意!無論如何,這是兩個可能的解決方法:
輪詢
while True:
try:
i1 = c1.get_nowait()
print "received %s from c1" % i1
except queue.Empty:
pass
try:
i2 = c2.get_nowait()
print "received %s from c2" % i2
except queue.Empty:
pass
time.sleep(0.1)
在輪詢通道時,這可能會占用大量CPU周期,并且在有很多消息時可能會變慢。使用具有指數(shù)退避時間(而不是此處顯示的恒定0.1秒)的time.sleep()可能會大大改善此版本。
單個通知隊列
queue_id = notify.get()
if queue_id == 1:
i1 = c1.get()
print "received %s from c1" % i1
elif queue_id == 2:
i2 = c2.get()
print "received %s from c2" % i2
使用此設置,您必須在發(fā)送到c1或c2之后將某些內容發(fā)送到通知隊列。只要您只有一個這樣的通知隊列就足夠了(即您沒有多個“選擇”,每個“選擇”阻塞在您通道的不同子集上),這可能對您有用。
另外,您也可以考慮使用Go。無論如何,Go的goroutines和并發(fā)支持比Python的有限線程功能強大得多。

TA貢獻1842條經(jīng)驗 獲得超13個贊
如果使用queue.PriorityQueue,則可以使用通道對象作為優(yōu)先級獲得類似的行為:
import threading, logging
import random, string, time
from queue import PriorityQueue, Empty
from contextlib import contextmanager
logging.basicConfig(level=logging.NOTSET,
format="%(threadName)s - %(message)s")
class ChannelManager(object):
next_priority = 0
def __init__(self):
self.queue = PriorityQueue()
self.channels = []
def put(self, channel, item, *args, **kwargs):
self.queue.put((channel, item), *args, **kwargs)
def get(self, *args, **kwargs):
return self.queue.get(*args, **kwargs)
@contextmanager
def select(self, ordering=None, default=False):
if default:
try:
channel, item = self.get(block=False)
except Empty:
channel = 'default'
item = None
else:
channel, item = self.get()
yield channel, item
def new_channel(self, name):
channel = Channel(name, self.next_priority, self)
self.channels.append(channel)
self.next_priority += 1
return channel
class Channel(object):
def __init__(self, name, priority, manager):
self.name = name
self.priority = priority
self.manager = manager
def __str__(self):
return self.name
def __lt__(self, other):
return self.priority < other.priority
def put(self, item):
self.manager.put(self, item)
if __name__ == '__main__':
num_channels = 3
num_producers = 4
num_items_per_producer = 2
num_consumers = 3
num_items_per_consumer = 3
manager = ChannelManager()
channels = [manager.new_channel('Channel#{0}'.format(i))
for i in range(num_channels)]
def producer_target():
for i in range(num_items_per_producer):
time.sleep(random.random())
channel = random.choice(channels)
message = random.choice(string.ascii_letters)
logging.info('Putting {0} in {1}'.format(message, channel))
channel.put(message)
producers = [threading.Thread(target=producer_target,
name='Producer#{0}'.format(i))
for i in range(num_producers)]
for producer in producers:
producer.start()
for producer in producers:
producer.join()
logging.info('Producers finished')
def consumer_target():
for i in range(num_items_per_consumer):
time.sleep(random.random())
with manager.select(default=True) as (channel, item):
if channel:
logging.info('Received {0} from {1}'.format(item, channel))
else:
logging.info('No data received')
consumers = [threading.Thread(target=consumer_target,
name='Consumer#{0}'.format(i))
for i in range(num_consumers)]
for consumer in consumers:
consumer.start()
for consumer in consumers:
consumer.join()
logging.info('Consumers finished')
輸出示例:
Producer#0 - Putting x in Channel#2
Producer#2 - Putting l in Channel#0
Producer#2 - Putting A in Channel#2
Producer#3 - Putting c in Channel#0
Producer#3 - Putting z in Channel#1
Producer#1 - Putting I in Channel#1
Producer#1 - Putting L in Channel#1
Producer#0 - Putting g in Channel#1
MainThread - Producers finished
Consumer#1 - Received c from Channel#0
Consumer#2 - Received l from Channel#0
Consumer#0 - Received I from Channel#1
Consumer#0 - Received L from Channel#1
Consumer#2 - Received g from Channel#1
Consumer#1 - Received z from Channel#1
Consumer#0 - Received A from Channel#2
Consumer#1 - Received x from Channel#2
Consumer#2 - Received None from default
MainThread - Consumers finished
在這個例子中,ChannelManager只是一個包裝器queue.PriorityQueue,將select方法實現(xiàn)contextmanager為使其看起來類似于selectGo中的語句。
注意事項:
定購
在Go示例中,
select
如果有多個通道可用的數(shù)據(jù),則在語句中寫入通道的順序確定將執(zhí)行哪個通道的代碼。在python示例中,順序由分配給每個通道的優(yōu)先級確定。但是,可以將優(yōu)先級動態(tài)分配給每個通道(如示例中所示),因此可以使用更復雜的
select
方法來更改順序,該方法將根據(jù)該方法的參數(shù)來分配新的優(yōu)先級。同樣,一旦上下文管理器完成,可以重新建立舊的順序。封鎖
在Go示例中,
select
如果default
存在案例,則該語句將阻塞。在python示例中,必須將boolean參數(shù)傳遞給該
select
方法,以在需要阻止/非阻止時使其清晰可見。在非阻塞情況下,上下文管理器返回的通道只是字符串,'default'
因此在內部代碼中很容易在with
語句內部的代碼中檢測到此情況。線程:
queue
如示例中所示,模塊中的對象已經(jīng)為多生產(chǎn)者,多消費者的場景做好了準備。

TA貢獻1796條經(jīng)驗 獲得超4個贊
該pychan項目復制在Python圍棋頻道,包括復用。它實現(xiàn)了與Go相同的算法,因此符合您所有需要的屬性:
多個生產(chǎn)者和消費者可以通過Chan進行交流。當生產(chǎn)者和消費者都準備就緒時,他們對
生產(chǎn)者和消費者按到達順序得到服務(FIFO)
空(滿)隊列將阻止使用者(生產(chǎn)者)。
您的示例如下所示:
c1 = Chan(); c2 = Chan(); c3 = Chan()
try:
chan, value = chanselect([c1, c3], [(c2, i2)])
if chan == c1:
print("Received %r from c1" % value)
elif chan == c2:
print("Sent %r to c2" % i2)
else: # c3
print("Received %r from c3" % value)
except ChanClosed as ex:
if ex.which == c3:
print("c3 is closed")
else:
raise
(完全公開:我寫了這個庫)
- 3 回答
- 0 關注
- 266 瀏覽
添加回答
舉報