第七色在线视频,2021少妇久久久久久久久久,亚洲欧洲精品成人久久av18,亚洲国产精品特色大片观看完整版,孙宇晨将参加特朗普的晚宴

為了賬號(hào)安全,請(qǐng)及時(shí)綁定郵箱和手機(jī)立即綁定
已解決430363個(gè)問(wèn)題,去搜搜看,總會(huì)有你想問(wèn)的

使用 TestSchedulers、Rx 和 BlockingCollection 進(jìn)行死鎖測(cè)試

使用 TestSchedulers、Rx 和 BlockingCollection 進(jìn)行死鎖測(cè)試

C#
莫回?zé)o 2023-09-16 15:01:30
我有以下類,它基本上訂閱int observable 并將值乘以 2。出于現(xiàn)實(shí)目的,我添加了 Thread.Sleep 來(lái)模擬繁重的處理。public class WorkingClass{    private BlockingCollection<int> _collection = new BlockingCollection<int>(1);    public WorkingClass(IObservable<int> rawValues)    {        rawValues.Subscribe(x => _collection.Add(x));    }    public IObservable<int> ProcessedValues()    {        return Observable.Create<int>(observer =>        {            while (true)            {                int value;                try                {                    value = _collection.Take();                }                catch (Exception ex)                {                    observer.OnError(ex);                    break;                }                Thread.Sleep(1000); //Simulate long work                observer.OnNext(value * 2);            }            return Disposable.Empty;        });    }}我在測(cè)試它時(shí)遇到了麻煩,在下面的測(cè)試中我只想斷言如果源流發(fā)出值 1,SUT 將發(fā)出值 2:[Test]public void SimpleTest(){    var sourceValuesScheduler = new TestScheduler();    var newThreadScheduler = new TestScheduler();    var source = sourceValuesScheduler.CreateHotObservable(         new Recorded<Notification<int>>(1000, Notification.CreateOnNext(1)));    var sut = new WorkingClass(source);    var observer = sourceValuesScheduler.CreateObserver<int>();    sut.ProcessedValues()        .SubscribeOn(newThreadScheduler) //The cold part (i.e, the while loop) of the ProcessedValues Observable should run in a different thread        .Subscribe(observer);    sourceValuesScheduler.AdvanceTo(1000);    observer.Messages.AssertEqual(new Recorded<Notification<int>>(1000, Notification.CreateOnNext(2)));}如果我運(yùn)行此測(cè)試,則斷言會(huì)失敗,因?yàn)?newThreadScheduler 從未啟動(dòng),因此從未創(chuàng)建 ProcessedValues observable。如果我這樣做: sourceValuesScheduler.AdvanceTo(1000); newThreadScheduler.AdvanceTo(1000); 它也不起作用,因?yàn)?newThreadScheduler 使用與 sourceValuesScheduler 相同的線程,因此測(cè)試將在處理后的值被發(fā)出后立即掛起,在以下行:value = _collection.Take();有沒(méi)有辦法讓多個(gè) TestScheduler 在不同的線程上運(yùn)行?否則我怎么能測(cè)試這樣的課程呢?
查看完整描述

1 回答

?
料青山看我應(yīng)如是

TA貢獻(xiàn)1772條經(jīng)驗(yàn) 獲得超8個(gè)贊

Take()阻塞,直到有一個(gè)項(xiàng)目可以從 中刪除BlockingCollection<int>或者您調(diào)用CompleteAdding()它。


ProcessedValues()鑒于您當(dāng)前的實(shí)現(xiàn),您訂閱并執(zhí)行循環(huán)的線程while將永遠(yuǎn)不會(huì)完成。


您應(yīng)該BlockingCollection<int>在單獨(dú)的線程上使用它。Task例如,您可以在ProcessedValues()調(diào)用時(shí)創(chuàng)建消耗??紤]以下實(shí)現(xiàn),它也處理BlockingCollection<int>:


public sealed class WorkingClass : IDisposable

{

    private BlockingCollection<int> _collection = new BlockingCollection<int>(1);

    private List<Task> _consumerTasks = new List<Task>();


    public WorkingClass(IObservable<int> rawValues)

    {

        rawValues.Subscribe(x => _collection.Add(x));

    }


    public IObservable<int> ProcessedValues()

    {

        return Observable.Create<int>(observer =>

        {

            _consumerTasks.Add(Task.Factory.StartNew(() => Consume(observer), TaskCreationOptions.LongRunning));

            return Disposable.Empty;

        });

    }


    private void Consume(IObserver<int> observer)

    {

        try

        {

            foreach (int value in _collection.GetConsumingEnumerable())

            {

                Thread.Sleep(1000); //Simulate long work

                observer.OnNext(value * 2);

            }

        }

        catch (Exception ex)

        {

            observer.OnError(ex);

        }

    }


    public void Dispose()

    {

        _collection.CompleteAdding();

        Task.WaitAll(_consumerTasks.ToArray());

        _collection.Dispose();

    }

}

可以使用以下代碼進(jìn)行測(cè)試:


var sourceValuesScheduler = new TestScheduler();


var source = sourceValuesScheduler.CreateHotObservable(

    new Recorded<Notification<int>>(1000, Notification.CreateOnNext(1)));


var observer = sourceValuesScheduler.CreateObserver<int>();


using (var sut = new WorkingClass(source))

{

    sourceValuesScheduler.AdvanceTo(1000); //add to collection

    sut.ProcessedValues().Subscribe(observer); //consume

} //...and wait until the loop exists


observer.Messages.AssertEqual(new Recorded<Notification<int>>(1000, Notification.CreateOnNext(2)));



查看完整回答
反對(duì) 回復(fù) 2023-09-16
  • 1 回答
  • 0 關(guān)注
  • 111 瀏覽

添加回答

舉報(bào)

0/150
提交
取消
微信客服

購(gòu)課補(bǔ)貼
聯(lián)系客服咨詢優(yōu)惠詳情

幫助反饋 APP下載

慕課網(wǎng)APP
您的移動(dòng)學(xué)習(xí)伙伴

公眾號(hào)

掃描二維碼
關(guān)注慕課網(wǎng)微信公眾號(hào)