第七色在线视频,2021少妇久久久久久久久久,亚洲欧洲精品成人久久av18,亚洲国产精品特色大片观看完整版,孙宇晨将参加特朗普的晚宴

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

隊列上有多路復用

隊列上有多路復用

Go
莫回無 2021-04-27 09:49:38
如何queue.Queue同時在多個對象上進行“選擇” ?Golang的頻道具有所需的功能:select {case i1 = <-c1:    print("received ", i1, " from c1\n")case c2 <- i2:    print("sent ", i2, " to c2\n")case i3, ok := (<-c3):  // same as: i3, ok := <-c3    if ok {        print("received ", i3, " from c3\n")    } else {        print("c3 is closed\n")    }default:    print("no communication\n")}其中第一個要解除阻塞的通道執(zhí)行相應的塊。我將如何在Python中實現(xiàn)這一目標?更新0根據(jù)tux21b的答案中給出的鏈接,所需的隊列類型具有以下屬性:多生產(chǎn)者/多消費者隊列(MPMC)提供按生產(chǎn)者的FIFO / LIFO當隊列為空/完整的消費者/生產(chǎn)者被阻止時此外,渠道可能會被阻塞,生產(chǎn)者將阻塞,直到消費者取回該物品為止。我不確定Python的Queue是否可以做到這一點。
查看完整描述

3 回答

?
PIPIONE

TA貢獻1829條經(jīng)驗 獲得超9個贊

生產(chǎn)者-消費者隊列有許多不同的實現(xiàn)方式,例如queue.Queue可用。它們通常具有許多不同的屬性,例如Dmitry Vyukov在此出色的文章中列出的屬性。如您所見,可能有超過1萬種不同的組合。根據(jù)要求,用于此類隊列的算法也相差很大。僅擴展現(xiàn)有隊列算法以保證其他屬性是不可能的,因為這通常需要不同的內部數(shù)據(jù)結構和不同的算法。


Go的頻道提供了相對較高的保證屬性,因此這些頻道可能適用于許多程序。最困難的要求之一是支持一次讀取/阻塞多個通道(select語句),并且如果select語句中可以有多個分支可以繼續(xù)進行,則要公平地選擇一個通道,這樣就不會留下任何消息。 。Python的queue.Queue不提供此功能,因此根本無法使用它來存檔相同的行為。


因此,如果要繼續(xù)使用queue.Queue,則需要找到解決該問題的方法。但是,變通辦法有其自身的缺點列表,并且較難維護。尋找另一個提供所需功能的生產(chǎn)者-消費者隊列可能是一個更好的主意!無論如何,這是兩個可能的解決方法:


輪詢


while True:

  try:

    i1 = c1.get_nowait()

    print "received %s from c1" % i1

  except queue.Empty:

    pass

  try:

    i2 = c2.get_nowait()

    print "received %s from c2" % i2

  except queue.Empty:

    pass

  time.sleep(0.1)

在輪詢通道時,這可能會占用大量CPU周期,并且在有很多消息時可能會變慢。使用具有指數(shù)退避時間(而不是此處顯示的恒定0.1秒)的time.sleep()可能會大大改善此版本。


單個通知隊列


queue_id = notify.get()

if queue_id == 1:

  i1 = c1.get()

  print "received %s from c1" % i1

elif queue_id == 2:

  i2 = c2.get()

  print "received %s from c2" % i2

使用此設置,您必須在發(fā)送到c1或c2之后將某些內容發(fā)送到通知隊列。只要您只有一個這樣的通知隊列就足夠了(即您沒有多個“選擇”,每個“選擇”阻塞在您通道的不同子集上),這可能對您有用。


另外,您也可以考慮使用Go。無論如何,Go的goroutines和并發(fā)支持比Python的有限線程功能強大得多。


查看完整回答
反對 回復 2021-05-10
?
紅顏莎娜

TA貢獻1842條經(jīng)驗 獲得超13個贊

如果使用queue.PriorityQueue,則可以使用通道對象作為優(yōu)先級獲得類似的行為:


import threading, logging

import random, string, time

from queue import PriorityQueue, Empty

from contextlib import contextmanager


logging.basicConfig(level=logging.NOTSET,

                    format="%(threadName)s - %(message)s")


class ChannelManager(object):

    next_priority = 0


    def __init__(self):

        self.queue = PriorityQueue()

        self.channels = []


    def put(self, channel, item, *args, **kwargs):

        self.queue.put((channel, item), *args, **kwargs)


    def get(self, *args, **kwargs):

        return self.queue.get(*args, **kwargs)


    @contextmanager

    def select(self, ordering=None, default=False):

        if default:

            try:

                channel, item = self.get(block=False)

            except Empty:

                channel = 'default'

                item = None

        else:

            channel, item = self.get()

        yield channel, item



    def new_channel(self, name):

        channel = Channel(name, self.next_priority, self)

        self.channels.append(channel)

        self.next_priority += 1

        return channel



class Channel(object):

    def __init__(self, name, priority, manager):

        self.name = name

        self.priority = priority

        self.manager = manager


    def __str__(self):

        return self.name


    def __lt__(self, other):

        return self.priority < other.priority


    def put(self, item):

        self.manager.put(self, item)



if __name__ == '__main__':

    num_channels = 3

    num_producers = 4

    num_items_per_producer = 2

    num_consumers = 3

    num_items_per_consumer = 3


    manager = ChannelManager()

    channels = [manager.new_channel('Channel#{0}'.format(i))

                for i in range(num_channels)]


    def producer_target():

        for i in range(num_items_per_producer):

            time.sleep(random.random())

            channel = random.choice(channels)

            message = random.choice(string.ascii_letters)

            logging.info('Putting {0} in {1}'.format(message, channel))

            channel.put(message)


    producers = [threading.Thread(target=producer_target,

                                  name='Producer#{0}'.format(i))

                 for i in range(num_producers)]

    for producer in producers:

        producer.start()

    for producer in producers:

        producer.join()

    logging.info('Producers finished')


    def consumer_target():

        for i in range(num_items_per_consumer):

            time.sleep(random.random())

            with manager.select(default=True) as (channel, item):

                if channel:

                    logging.info('Received {0} from {1}'.format(item, channel))

                else:

                    logging.info('No data received')


    consumers = [threading.Thread(target=consumer_target,

                                  name='Consumer#{0}'.format(i))

                 for i in range(num_consumers)]

    for consumer in consumers:

        consumer.start()

    for consumer in consumers:

        consumer.join()

    logging.info('Consumers finished')

輸出示例:


Producer#0 - Putting x in Channel#2

Producer#2 - Putting l in Channel#0

Producer#2 - Putting A in Channel#2

Producer#3 - Putting c in Channel#0

Producer#3 - Putting z in Channel#1

Producer#1 - Putting I in Channel#1

Producer#1 - Putting L in Channel#1

Producer#0 - Putting g in Channel#1

MainThread - Producers finished

Consumer#1 - Received c from Channel#0

Consumer#2 - Received l from Channel#0

Consumer#0 - Received I from Channel#1

Consumer#0 - Received L from Channel#1

Consumer#2 - Received g from Channel#1

Consumer#1 - Received z from Channel#1

Consumer#0 - Received A from Channel#2

Consumer#1 - Received x from Channel#2

Consumer#2 - Received None from default

MainThread - Consumers finished

在這個例子中,ChannelManager只是一個包裝器queue.PriorityQueue,將select方法實現(xiàn)contextmanager為使其看起來類似于selectGo中的語句。

注意事項:

  • 定購

    • 在Go示例中,select如果有多個通道可用的數(shù)據(jù),則在語句中寫入通道的順序確定將執(zhí)行哪個通道的代碼。

    • 在python示例中,順序由分配給每個通道的優(yōu)先級確定。但是,可以將優(yōu)先級動態(tài)分配給每個通道(如示例中所示),因此可以使用更復雜的select方法來更改順序,該方法將根據(jù)該方法的參數(shù)來分配新的優(yōu)先級。同樣,一旦上下文管理器完成,可以重新建立舊的順序。

  • 封鎖

    • 在Go示例中,select如果default存在案例,則該語句將阻塞。

    • 在python示例中,必須將boolean參數(shù)傳遞給該select方法,以在需要阻止/非阻止時使其清晰可見。在非阻塞情況下,上下文管理器返回的通道只是字符串,'default'因此在內部代碼中很容易在with語句內部的代碼中檢測到此情況。

  • 線程:queue如示例中所示,模塊中的對象已經(jīng)為多生產(chǎn)者,多消費者的場景做好了準備。


查看完整回答
反對 回復 2021-05-10
?
SMILET

TA貢獻1796條經(jīng)驗 獲得超4個贊

pychan項目復制在Python圍棋頻道,包括復用。它實現(xiàn)了與Go相同的算法,因此符合您所有需要的屬性:

  • 多個生產(chǎn)者和消費者可以通過Chan進行交流。當生產(chǎn)者和消費者都準備就緒時,他們對

  • 生產(chǎn)者和消費者按到達順序得到服務(FIFO)

  • 空(滿)隊列將阻止使用者(生產(chǎn)者)。

您的示例如下所示:

c1 = Chan(); c2 = Chan(); c3 = Chan()


try:

    chan, value = chanselect([c1, c3], [(c2, i2)])

    if chan == c1:

        print("Received %r from c1" % value)

    elif chan == c2:

        print("Sent %r to c2" % i2)

    else:  # c3

        print("Received %r from c3" % value)

except ChanClosed as ex:

    if ex.which == c3:

        print("c3 is closed")

    else:

        raise

(完全公開:我寫了這個庫)


查看完整回答
反對 回復 2021-05-10
  • 3 回答
  • 0 關注
  • 266 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯(lián)系客服咨詢優(yōu)惠詳情

幫助反饋 APP下載

慕課網(wǎng)APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網(wǎng)微信公眾號