1 回答

TA貢獻(xiàn)1805條經(jīng)驗(yàn) 獲得超10個(gè)贊
考慮以下兩個(gè)演示。當(dāng)您有多個(gè)觀察者時(shí),這些行為會(huì)有所不同。在第一個(gè)演示中,觀察者將競(jìng)爭(zhēng)隊(duì)列中的項(xiàng)目,在第二個(gè)演示中,他們每個(gè)人都會(huì)收到一個(gè)副本。
演示 #1 - 冷可觀察
var queue = new AsyncProducerConsumerQueue<int>();
// This is a cold observable, so each observer is fed by its own individual dequeue loop
// and therefore will be 'competing' with other observers for queued items.
var coldObservable = Observable
// Create an observable that asynchronously waits for items to become available on the
// queue and then emits them to the observer. This will be cancelled when the observer
// is unsubscribed.
.Create<int>(async (observer, cancellationToken) =>
{
while (true)
{
var item = await queue.DequeueAsync(cancellationToken).ConfigureAwait(false);
Console.WriteLine($"Dequeued {item}");
observer.OnNext(item);
}
})
// If an InvalidOperationException is thrown by the above, continue with
// an empty observable instead of the error. This effectively catches an
// `OnError(InvalidOperationException)` and turns it into an `OnCompleted()`.
.Catch<int, InvalidOperationException>(exn =>
{
Console.WriteLine("Caught InvalidOperation");
return Observable.Empty<int>();
});
Console.WriteLine("TEST COLD");
await queue.EnqueueAsync(1);
Console.WriteLine("Enqueued 1");
Console.WriteLine("Subscribing A");
coldObservable.Subscribe(
item => Console.WriteLine($"A received {item}"),
() => Console.WriteLine("A completed"));
Console.WriteLine("Subscribing B");
coldObservable.Subscribe(
item => Console.WriteLine($"B received {item}"),
() => Console.WriteLine("B completed"));
await queue.EnqueueAsync(2);
Console.WriteLine("Enqueued 2");
await queue.EnqueueAsync(3);
Console.WriteLine("Enqueued 3");
queue.CompleteAdding();
Console.WriteLine("Completed adding");
Console.WriteLine("Waiting...");
await Task.Delay(2000);
Console.WriteLine("DONE");
// TEST COLD
// Enqueued 1
// Subscribing A
// Dequeued 1
// A received 1
// Subscribing B
// Enqueued 2
// Enqueued 3
// Completed adding
// Waiting...
// Dequeued 2
// Dequeued 3
// A received 2
// B received 3
// Caught InvalidOperation
// Caught InvalidOperation
// A completed
// B completed
// DONE
演示 #2 - 熱可觀察
var queue = new AsyncProducerConsumerQueue<int>();
var coldObservable = // defined same as above
// This is a hot observable, so each observer receives the same items from the queue.
var hotObservable = coldObservable
// Publish the cold observable to create an `IConnectableObservable` that will subscribe
// to the dequeue loop when connected and emit the same items to all observers.
.Publish()
// Automatically connect to the published observable when the first observer subscribes
// and automatically disconnect when the last observer unsubscribes. This means that the
// first observer will receive any items queued before it subscribes, but additional
// observers will only receive items queued after they subscribed.
.RefCount();
Console.WriteLine("TEST HOT");
await queue.EnqueueAsync(1);
Console.WriteLine("Enqueued 1");
Console.WriteLine("Subscribing A");
hotObservable.Subscribe(
item => Console.WriteLine($"A received {item}"),
() => Console.WriteLine("A completed"));
Console.WriteLine("Subscribing B");
hotObservable.Subscribe(
item => Console.WriteLine($"B received {item}"),
() => Console.WriteLine("B completed"));
await queue.EnqueueAsync(2);
Console.WriteLine("Enqueued 2");
await queue.EnqueueAsync(3);
Console.WriteLine("Enqueued 3");
queue.CompleteAdding();
Console.WriteLine("Completed adding");
Console.WriteLine("Waiting...");
await Task.Delay(2000);
Console.WriteLine("DONE");
// TEST HOT
// Enqueued 1
// Subscribing A
// Dequeued 1
// A received 1
// Subscribing B
// Enqueued 2
// Enqueued 3
// Dequeued 2
// Completed adding
// Waiting...
// A received 2
// B received 2
// Dequeued 3
// A received 3
// B received 3
// Caught InvalidOperation
// A completed
// B completed
// DONE
回答你原來(lái)的問(wèn)題:
是否可以在沒有包裝類的情況下編寫它?
是的,請(qǐng)參閱上面的演示。
是否可以防止將多個(gè)包裝器應(yīng)用于一個(gè)隊(duì)列時(shí)出現(xiàn)錯(cuò)誤?
上面演示的方法不會(huì)阻止其他方將項(xiàng)目出隊(duì)(或在隊(duì)列上執(zhí)行任何其他操作)。如果您想確保只公開給定隊(duì)列的單個(gè)隊(duì)列IObservable<T>,請(qǐng)考慮通過(guò)創(chuàng)建一個(gè)ObservableProducerConsumerQueue<T>在內(nèi)部創(chuàng)建和管理自己的AsyncProducerConsumerQueue. 您可以公開一個(gè)EnqueueAsync僅委托給內(nèi)部隊(duì)列的方法,并使用上面演示的可觀察量之一將可觀察量公開為屬性或?qū)崿F(xiàn)接口IObservable<T>。
我可以讓它在第一個(gè)訂閱上連接,而不是通過(guò)直接調(diào)用 Connect 嗎?如果是這樣,這意味著什么?
演示 #2 顯示了此行為并描述了其含義。如果您希望能夠在連接之前訂閱觀察者,請(qǐng)?zhí)^(guò)調(diào)用RefCount并像以前一樣使用IConnectableObservable返回的值Publish。
最后,你會(huì)怎么做呢?
如上所述,我將封裝隊(duì)列并使用上面演示的方法之一公開IObservable或公開。IConnectableObservable
- 1 回答
- 0 關(guān)注
- 114 瀏覽
添加回答
舉報(bào)