1 回答

TA貢獻(xiàn)1772條經(jīng)驗(yàn) 獲得超8個(gè)贊
Take()阻塞,直到有一個(gè)項(xiàng)目可以從 中刪除BlockingCollection<int>或者您調(diào)用CompleteAdding()它。
ProcessedValues()鑒于您當(dāng)前的實(shí)現(xiàn),您訂閱并執(zhí)行循環(huán)的線程while將永遠(yuǎn)不會(huì)完成。
您應(yīng)該BlockingCollection<int>在單獨(dú)的線程上使用它。Task例如,您可以在ProcessedValues()調(diào)用時(shí)創(chuàng)建消耗??紤]以下實(shí)現(xiàn),它也處理BlockingCollection<int>:
public sealed class WorkingClass : IDisposable
{
private BlockingCollection<int> _collection = new BlockingCollection<int>(1);
private List<Task> _consumerTasks = new List<Task>();
public WorkingClass(IObservable<int> rawValues)
{
rawValues.Subscribe(x => _collection.Add(x));
}
public IObservable<int> ProcessedValues()
{
return Observable.Create<int>(observer =>
{
_consumerTasks.Add(Task.Factory.StartNew(() => Consume(observer), TaskCreationOptions.LongRunning));
return Disposable.Empty;
});
}
private void Consume(IObserver<int> observer)
{
try
{
foreach (int value in _collection.GetConsumingEnumerable())
{
Thread.Sleep(1000); //Simulate long work
observer.OnNext(value * 2);
}
}
catch (Exception ex)
{
observer.OnError(ex);
}
}
public void Dispose()
{
_collection.CompleteAdding();
Task.WaitAll(_consumerTasks.ToArray());
_collection.Dispose();
}
}
可以使用以下代碼進(jìn)行測(cè)試:
var sourceValuesScheduler = new TestScheduler();
var source = sourceValuesScheduler.CreateHotObservable(
new Recorded<Notification<int>>(1000, Notification.CreateOnNext(1)));
var observer = sourceValuesScheduler.CreateObserver<int>();
using (var sut = new WorkingClass(source))
{
sourceValuesScheduler.AdvanceTo(1000); //add to collection
sut.ProcessedValues().Subscribe(observer); //consume
} //...and wait until the loop exists
observer.Messages.AssertEqual(new Recorded<Notification<int>>(1000, Notification.CreateOnNext(2)));
- 1 回答
- 0 關(guān)注
- 111 瀏覽
添加回答
舉報(bào)