第七色在线视频,2021少妇久久久久久久久久,亚洲欧洲精品成人久久av18,亚洲国产精品特色大片观看完整版,孙宇晨将参加特朗普的晚宴

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

反應式服務。分組和緩存流

反應式服務。分組和緩存流

C#
紅糖糍粑 2023-09-16 16:05:33
新增內容:帶有測試的完整源代碼現在位于https://github.com/bboyle1234/ReactiveTest假設我們有一個視圖狀態(tài)對象,可以通過小的部分視圖更改事件進行更新。以下是總視圖、增量視圖更新事件和Update構建總視圖的累加器函數的一些示例模型:interface IDeviceView : ICloneable {    Guid DeviceId { get; }}class DeviceTotalView : IDeviceView {    public Guid DeviceId { get; set; }    public int Voltage { get; set; }    public int Currents { get; set; }    public object Clone() => this.MemberwiseClone();}class DeviceVoltagesUpdateView : IDeviceView {    public Guid DeviceId { get; set; }    public int Voltage { get; set; }    public object Clone() => this.MemberwiseClone();}class DeviceCurrentsUpdateView : IDeviceView {    public Guid DeviceId { get; set; }    public int Current { get; set; }    public object Clone() => this.MemberwiseClone();}class DeviceUpdateEvent {    public DeviceTotalView View;    public IDeviceView LastUpdate;}static DeviceUpdateEvent Update(DeviceUpdateEvent previousUpdate, IDeviceView update) {    if (update.DeviceId != previousUpdate.View.DeviceId) throw new InvalidOperationException("Device ids do not match (numskull exception).");    var view = (DeviceTotalView)previousUpdate.View.Clone();    switch (update) {        case DeviceVoltagesUpdateView x: {            view.Voltage = x.Voltage;            break;        }        case DeviceCurrentsUpdateView x: {            view.Currents = x.Current;            break;        }    }    return new DeviceUpdateEvent { View = view, LastUpdate = update };}接下來,假設我們已經有一個可注入服務,能夠為所有設備生成小更新事件的可觀察流,并且我們希望創(chuàng)建一個可以為各個設備生成聚合視圖流的服務。這是我們要創(chuàng)建的服務的接口:interface IDeviceService {    /// <summary>    /// Gets an observable that produces aggregated update events for the device with the given deviceId.    /// On subscription, the most recent event is immediately pushed to the subscriber.    /// There can be multiple subscribers.    /// </summary>    IObservable<DeviceUpdateEvent> GetDeviceStream(Guid deviceId);}如何使用System.Reactive v4庫中的反應式擴展(目標)來實現此接口及其要求.netstandard2.0?這是我的帶有注釋的鍋爐代碼,這是我所能得到的。
查看完整描述

2 回答

?
楊__羊羊

TA貢獻1943條經驗 獲得超7個贊

接受的答案中給出的實現雖然對我來說是一次神奇的教育,但也有一些問題需要依次解決。第一個是線程競爭問題,第二個是系統中存在大量設備時的性能問題。我最終解決了線程競爭并通過這個修改后的實現顯著提高了性能:


在構造函數中,分組和掃描的設備流直接訂閱到 a BehaviorSubject,它實現了Replay(1).RefCount()立即通知新訂閱者流中最新值所需的功能。


在該方法中,我們繼續(xù)使用字典查找來查找設備流,如果字典中尚不存在則GetDeviceStream創(chuàng)建預加載。BehaviorSubject我們已經刪除了Where上述問題中先前實現中存在的搜索。使用 where 搜索會導致線程競爭問題,該問題通過使分組流可重播得以解決。這導致了指數性能問題。替換它,將FirstOrDefault花費的時間減少一半,然后完全刪除它以支持GetCreate字典技術,從而獲得完美的性能 O(1) 而不是 O(n2)。


GetCreateSubject使用Lazy代理對象作為字典值,因為有時可以針對單個鍵多次ConcurrentDictionary調用該方法。向字典Create提供 a可確保僅在其中一個懶惰者上調用該屬性,因此每個設備只創(chuàng)建一個。LazyValueBehaviorSubject


class DeviceService : IDeviceService, IDisposable {


    readonly CompositeDisposable _disposable = new CompositeDisposable();

    readonly ConcurrentDictionary<Guid, Lazy<BehaviorSubject<DeviceUpdateEvent>>> _streams = new ConcurrentDictionary<Guid, Lazy<BehaviorSubject<DeviceUpdateEvent>>>();

    BehaviorSubject<DeviceUpdateEvent> GetCreateSubject(Guid deviceId) {

        return _streams.GetOrAdd(deviceId, Create).Value;

        Lazy<BehaviorSubject<DeviceUpdateEvent>> Create(Guid id) {

            return new Lazy<BehaviorSubject<DeviceUpdateEvent>>(() => {

                var subject = new BehaviorSubject<DeviceUpdateEvent>(DeviceUpdateEvent.GetInitialView(deviceId));

                _disposable.Add(subject);

                return subject;

            });

        }

    }


    public DeviceService(IConnectableObservable<IDeviceView> source) {

        _disposable.Add(source

            .GroupBy(x => x.DeviceId)

            .Subscribe(deviceStream => {

                _disposable.Add(deviceStream

                    .Scan(DeviceUpdateEvent.GetInitialView(deviceStream.Key), DeviceUtils.Update)

                    .Subscribe(GetCreateSubject(deviceStream.Key)));

            }));

        _disposable.Add(source.Connect());

    }


    public void Dispose() {

        _disposable.Dispose();

    }


    public IObservable<DeviceUpdateEvent> GetDeviceStream(Guid deviceId) {

        return GetCreateSubject(deviceId).AsObservable();

    }

}


[TestMethod]

public async Task Test2() {

    var input = new AsyncProducerConsumerQueue<IDeviceView>();

    var source = new ConnectableObservableForAsyncProducerConsumerQueue<IDeviceView>(input);

    var service = new DeviceService(source);


    var ids = Enumerable.Range(0, 100000).Select(i => Guid.NewGuid()).ToArray();

    var idsRemaining = ids.ToHashSet();

    var t1 = Task.Run(async () => {

        foreach (var id in ids) {

            await input.EnqueueAsync(new DeviceVoltagesUpdateView { DeviceId = id, Voltage = 1 });

        }

    });

    var t2 = Task.Run(() => {

        foreach (var id in ids) {

            service.GetDeviceStream(id).Subscribe(x => idsRemaining.Remove(x.View.DeviceId));

        }

    });

    await Task.WhenAll(t1, t2);

    var sw = Stopwatch.StartNew();

    while (idsRemaining.Count > 0) {

        if (sw.Elapsed.TotalSeconds > 600) throw new Exception("Failed");

        await Task.Delay(100);

    }

}

在這里查看整個問題源代碼和測試代碼: https: //github.com/bboyle1234/ReactiveTest


查看完整回答
反對 回復 2023-09-16
?
九州編程

TA貢獻1785條經驗 獲得超4個贊

你的要點中有一些奇怪的代碼。這是我的工作內容:


public class DeviceService : IDeviceService, IDisposable

{


    readonly IObservable<IDeviceView> Source;

    private readonly Dictionary<Guid, IObservable<DeviceUpdateEvent>> _updateStreams = new Dictionary<Guid, IObservable<DeviceUpdateEvent>>();

    private readonly IObservable<(Guid, IObservable<DeviceUpdateEvent>)> _groupedStream;

    private readonly CompositeDisposable _disposable = new CompositeDisposable();


    public DeviceService(IObservable<IDeviceView> source)

    {

        Source = source;


        _groupedStream = source

            .GroupBy(v => v.DeviceId)

            .Select(o => (o.Key, o

                .Scan(new DeviceUpdateEvent { View = DeviceTotalView.GetInitialView(o.Key), LastUpdate = null }, (lastTotalView, newView) => lastTotalView.Update(newView))

                .Replay(1)

                .RefCount()

            ));


        var groupSubscription = _groupedStream.Subscribe(t =>

        {

            _updateStreams[t.Item1] = t.Item2;

            _disposable.Add(t.Item2.Subscribe());

        });

        _disposable.Add(groupSubscription);

    }


    public void Dispose()

    {

        _disposable.Dispose();

    }


    public IObservable<DeviceUpdateEvent> GetDeviceStream(Guid deviceId)

    {

        /// How do we implement this? The observable that we return must be pre-loaded with the latest update

        if(this._updateStreams.ContainsKey(deviceId))

            return this._updateStreams[deviceId];

        return _groupedStream

            .Where(t => t.Item1 == deviceId)

            .Select(t => t.Item2)

            .Switch();



    }

}

這里的肉就是_groupedStream一塊。正如您所說,您按 DeviceId 進行分組,然后用于Scan更新狀態(tài)。我還轉移Update到靜態(tài)類并使其成為擴展方法。你需要一個初始狀態(tài),所以我修改了你的DeviceTotalView類來獲得它。相應修改:


public class DeviceTotalView : IDeviceView

{

    public Guid DeviceId { get; set; }

    public int Voltage { get; set; }

    public int Currents { get; set; }

    public object Clone() => this.MemberwiseClone();

    public static DeviceTotalView GetInitialView(Guid deviceId)

    {

        return new DeviceTotalView

        {

            DeviceId = deviceId,

            Voltage = 0,

            Currents = 0

        };

    }

}

接下來,它.Replay(1).Refcount()會記住最新的更新,然后在訂閱時提供該更新。然后,我們將所有這些子可觀察量填充到字典中,以便在方法調用時輕松檢索。虛擬訂閱 ( _disposable.Add(t.Item2.Subscribe())) 是Replay正常工作所必需的。


如果早期請求尚未更新的 DeviceId,我們會訂閱該設備,它將_groupedStream等待第一次更新,生成該 Id 的可觀察值,然后.Switch訂閱該子可觀察值。


然而,所有這些都在你的測試代碼中失敗了,我猜是因為這個ConnectableObservableForAsyncProducerConsumerQueue類。我不想調試它,因為我不建議這樣做。一般來說,不建議混合 TPL 和 Rx 代碼。他們解決的問題大部分是重疊的,而且互相妨礙。因此,我修改了您的測試代碼,用重播主題替換了可連接的可觀察隊列。


我還添加了早期請求的測試用例(在該設備的更新到達之前):


DeviceUpdateEvent deviceView1 = null;

DeviceUpdateEvent deviceView2 = null;

DeviceUpdateEvent deviceView3 = null;


var subject = new ReplaySubject<IDeviceView>();


var id1 = Guid.NewGuid();

var id2 = Guid.NewGuid();

var id3 = Guid.NewGuid();


subject.OnNext(new DeviceVoltagesUpdateView { DeviceId = id1, Voltage = 1 });

subject.OnNext(new DeviceVoltagesUpdateView { DeviceId = id1, Voltage = 2 });

subject.OnNext(new DeviceVoltagesUpdateView { DeviceId = id2, Voltage = 100 });


var service = new DeviceService(subject);


service.GetDeviceStream(id1).Subscribe(x => deviceView1 = x);

service.GetDeviceStream(id2).Subscribe(x => deviceView2 = x);

service.GetDeviceStream(id3).Subscribe(x => deviceView3 = x);


/// I believe there is no need to pause here because the Subscribe method calls above 

/// block until the events have all been pushed into the subscribers above.


Assert.AreEqual(deviceView1.View.DeviceId, id1);

Assert.AreEqual(deviceView2.View.DeviceId, id2);

Assert.AreEqual(deviceView1.View.Voltage, 2);

Assert.AreEqual(deviceView2.View.Voltage, 100);

Assert.IsNull(deviceView3);


subject.OnNext(new DeviceVoltagesUpdateView { DeviceId = id2, Voltage = 101 });

Assert.AreEqual(deviceView2.View.Voltage, 101);


subject.OnNext(new DeviceVoltagesUpdateView { DeviceId = id3, Voltage = 101 });

Assert.AreEqual(deviceView3.View.DeviceId, id3);

Assert.AreEqual(deviceView3.View.Voltage, 101);

這一切都很好,并且可以在沒有異步的情況下運行。


另外,作為一般提示,我建議使用Microsoft.Reactive.Testing包對 Rx 代碼進行單元測試,而不是進行時間間隔測試。


查看完整回答
反對 回復 2023-09-16
  • 2 回答
  • 0 關注
  • 138 瀏覽

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優(yōu)惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號