第七色在线视频,2021少妇久久久久久久久久,亚洲欧洲精品成人久久av18,亚洲国产精品特色大片观看完整版,孙宇晨将参加特朗普的晚宴

為了賬號(hào)安全,請(qǐng)及時(shí)綁定郵箱和手機(jī)立即綁定
已解決430363個(gè)問(wèn)題,去搜搜看,總會(huì)有你想問(wèn)的

AsyncProducerConsumerQueue 的可觀察包裝器

AsyncProducerConsumerQueue 的可觀察包裝器

C#
慕尼黑8549860 2023-09-16 16:12:18
AsyncProducerConsumerQueue<T>因此,我使用以下代碼為 Stephen Cleary 創(chuàng)建了一個(gè)可觀察的包裝器。我想知道這里是否有人知道我如何以更簡(jiǎn)單的方式做到這一點(diǎn)?是否可以在沒有包裝類的情況下編寫它?是否可以防止將多個(gè)包裝器應(yīng)用于一個(gè)隊(duì)列時(shí)出現(xiàn)錯(cuò)誤?我可以讓它在第一個(gè)訂閱上連接,而不是通過(guò)直接調(diào)用嗎Connect?如果是這樣,這意味著什么?最后,你會(huì)怎么做呢?using Nito.AsyncEx;using System.Reactive;static async Task ExampleUsage() {    var queue = new AsyncProducerConsumerQueue<int>();    var observable = queue.AsConnectableObservable();    await queue.EnqueueAsync(1);    observable.Subscribe(Console.WriteLine);    observable.Connect();    await queue.EnqueueAsync(2);}public static class AsyncExExtensions {    public static IConnectableObservable<T> AsConnectableObservable<T>(this AsyncProducerConsumerQueue<T> queue) {        return new ConnectableObservableForAsyncProducerConsumerQueue<T>(queue);    }}class ConnectableObservableForAsyncProducerConsumerQueue<T> : IConnectableObservable<T> {    readonly AsyncProducerConsumerQueue<T> Queue;    long _isConnected = 0;    ImmutableList<IObserver<T>> Observers = ImmutableList<IObserver<T>>.Empty;    public ConnectableObservableForAsyncProducerConsumerQueue(AsyncProducerConsumerQueue<T> queue) {        Queue = queue;    }    public IDisposable Connect() {        if (Interlocked.Exchange(ref _isConnected, 1) == 1) throw new InvalidOperationException("Observable cannot be connected more than once.");        var cts = new CancellationTokenSource();        var token = cts.Token;        Task.Run(async () => {            try {                while (true) {                    token.ThrowIfCancellationRequested();                    var @event = await Queue.DequeueAsync(token).ConfigureAwait(false);                    foreach (var observer in Observers)                        observer.OnNext(@event);                }            } catch (Exception x) when (x is OperationCanceledException || x is InvalidOperationException) {                foreach (var observer in Observers)                    observer.OnCompleted();            }        });
查看完整描述

1 回答

?
holdtom

TA貢獻(xiàn)1805條經(jīng)驗(yàn) 獲得超10個(gè)贊

考慮以下兩個(gè)演示。當(dāng)您有多個(gè)觀察者時(shí),這些行為會(huì)有所不同。在第一個(gè)演示中,觀察者將競(jìng)爭(zhēng)隊(duì)列中的項(xiàng)目,在第二個(gè)演示中,他們每個(gè)人都會(huì)收到一個(gè)副本。


演示 #1 - 冷可觀察

var queue = new AsyncProducerConsumerQueue<int>();


// This is a cold observable, so each observer is fed by its own individual dequeue loop

// and therefore will be 'competing' with other observers for queued items.

var coldObservable = Observable

    // Create an observable that asynchronously waits for items to become available on the

    // queue and then emits them to the observer. This will be cancelled when the observer

    // is unsubscribed. 

    .Create<int>(async (observer, cancellationToken) =>

    {

        while (true)

        {

            var item = await queue.DequeueAsync(cancellationToken).ConfigureAwait(false);

            Console.WriteLine($"Dequeued {item}");

            observer.OnNext(item);

        }

    })

    // If an InvalidOperationException is thrown by the above, continue with

    // an empty observable instead of the error. This effectively catches an

    // `OnError(InvalidOperationException)` and turns it into an `OnCompleted()`.

    .Catch<int, InvalidOperationException>(exn =>

    {

        Console.WriteLine("Caught InvalidOperation");

        return Observable.Empty<int>();

    });


Console.WriteLine("TEST COLD");


await queue.EnqueueAsync(1);

Console.WriteLine("Enqueued 1");


Console.WriteLine("Subscribing A");

coldObservable.Subscribe(

    item => Console.WriteLine($"A received {item}"),

    () => Console.WriteLine("A completed"));


Console.WriteLine("Subscribing B");

coldObservable.Subscribe(

    item => Console.WriteLine($"B received {item}"),

    () => Console.WriteLine("B completed"));


await queue.EnqueueAsync(2);

Console.WriteLine("Enqueued 2");


await queue.EnqueueAsync(3);

Console.WriteLine("Enqueued 3");


queue.CompleteAdding();

Console.WriteLine("Completed adding");


Console.WriteLine("Waiting...");

await Task.Delay(2000);


Console.WriteLine("DONE");


// TEST COLD

// Enqueued 1

// Subscribing A

// Dequeued 1

// A received 1

// Subscribing B

// Enqueued 2

// Enqueued 3

// Completed adding

// Waiting...

// Dequeued 2

// Dequeued 3

// A received 2

// B received 3

// Caught InvalidOperation

// Caught InvalidOperation

// A completed

// B completed

// DONE

演示 #2 - 熱可觀察

var queue = new AsyncProducerConsumerQueue<int>();


var coldObservable = // defined same as above


// This is a hot observable, so each observer receives the same items from the queue.

var hotObservable = coldObservable

    // Publish the cold observable to create an `IConnectableObservable` that will subscribe

    // to the dequeue loop when connected and emit the same items to all observers.

    .Publish()

    // Automatically connect to the published observable when the first observer subscribes

    // and automatically disconnect when the last observer unsubscribes. This means that the

    // first observer will receive any items queued before it subscribes, but additional

    // observers will only receive items queued after they subscribed.

    .RefCount();


Console.WriteLine("TEST HOT");


await queue.EnqueueAsync(1);

Console.WriteLine("Enqueued 1");


Console.WriteLine("Subscribing A");

hotObservable.Subscribe(

    item => Console.WriteLine($"A received {item}"),

    () => Console.WriteLine("A completed"));


Console.WriteLine("Subscribing B");

hotObservable.Subscribe(

    item => Console.WriteLine($"B received {item}"),

    () => Console.WriteLine("B completed"));


await queue.EnqueueAsync(2);

Console.WriteLine("Enqueued 2");


await queue.EnqueueAsync(3);

Console.WriteLine("Enqueued 3");


queue.CompleteAdding();

Console.WriteLine("Completed adding");


Console.WriteLine("Waiting...");

await Task.Delay(2000);


Console.WriteLine("DONE");


// TEST HOT

// Enqueued 1

// Subscribing A

// Dequeued 1

// A received 1

// Subscribing B

// Enqueued 2

// Enqueued 3

// Dequeued 2

// Completed adding

// Waiting...

// A received 2

// B received 2

// Dequeued 3

// A received 3

// B received 3

// Caught InvalidOperation

// A completed

// B completed

// DONE

回答你原來(lái)的問(wèn)題:


是否可以在沒有包裝類的情況下編寫它?


是的,請(qǐng)參閱上面的演示。


是否可以防止將多個(gè)包裝器應(yīng)用于一個(gè)隊(duì)列時(shí)出現(xiàn)錯(cuò)誤?


上面演示的方法不會(huì)阻止其他方將項(xiàng)目出隊(duì)(或在隊(duì)列上執(zhí)行任何其他操作)。如果您想確保只公開給定隊(duì)列的單個(gè)隊(duì)列IObservable<T>,請(qǐng)考慮通過(guò)創(chuàng)建一個(gè)ObservableProducerConsumerQueue<T>在內(nèi)部創(chuàng)建和管理自己的AsyncProducerConsumerQueue. 您可以公開一個(gè)EnqueueAsync僅委托給內(nèi)部隊(duì)列的方法,并使用上面演示的可觀察量之一將可觀察量公開為屬性或?qū)崿F(xiàn)接口IObservable<T>。


我可以讓它在第一個(gè)訂閱上連接,而不是通過(guò)直接調(diào)用 Connect 嗎?如果是這樣,這意味著什么?


演示 #2 顯示了此行為并描述了其含義。如果您希望能夠在連接之前訂閱觀察者,請(qǐng)?zhí)^(guò)調(diào)用RefCount并像以前一樣使用IConnectableObservable返回的值Publish。


最后,你會(huì)怎么做呢?


如上所述,我將封裝隊(duì)列并使用上面演示的方法之一公開IObservable或公開。IConnectableObservable


查看完整回答
反對(duì) 回復(fù) 2023-09-16
  • 1 回答
  • 0 關(guān)注
  • 114 瀏覽

添加回答

舉報(bào)

0/150
提交
取消
微信客服

購(gòu)課補(bǔ)貼
聯(lián)系客服咨詢優(yōu)惠詳情

幫助反饋 APP下載

慕課網(wǎng)APP
您的移動(dòng)學(xué)習(xí)伙伴

公眾號(hào)

掃描二維碼
關(guān)注慕課網(wǎng)微信公眾號(hào)