2 回答

TA貢獻1943條經驗 獲得超7個贊
接受的答案中給出的實現雖然對我來說是一次神奇的教育,但也有一些問題需要依次解決。第一個是線程競爭問題,第二個是系統中存在大量設備時的性能問題。我最終解決了線程競爭并通過這個修改后的實現顯著提高了性能:
在構造函數中,分組和掃描的設備流直接訂閱到 a BehaviorSubject,它實現了Replay(1).RefCount()立即通知新訂閱者流中最新值所需的功能。
在該方法中,我們繼續(xù)使用字典查找來查找設備流,如果字典中尚不存在則GetDeviceStream創(chuàng)建預加載。BehaviorSubject我們已經刪除了Where上述問題中先前實現中存在的搜索。使用 where 搜索會導致線程競爭問題,該問題通過使分組流可重播得以解決。這導致了指數性能問題。替換它,將FirstOrDefault花費的時間減少一半,然后完全刪除它以支持GetCreate字典技術,從而獲得完美的性能 O(1) 而不是 O(n2)。
GetCreateSubject使用Lazy代理對象作為字典值,因為有時可以針對單個鍵多次ConcurrentDictionary調用該方法。向字典Create提供 a可確保僅在其中一個懶惰者上調用該屬性,因此每個設備只創(chuàng)建一個。LazyValueBehaviorSubject
class DeviceService : IDeviceService, IDisposable {
readonly CompositeDisposable _disposable = new CompositeDisposable();
readonly ConcurrentDictionary<Guid, Lazy<BehaviorSubject<DeviceUpdateEvent>>> _streams = new ConcurrentDictionary<Guid, Lazy<BehaviorSubject<DeviceUpdateEvent>>>();
BehaviorSubject<DeviceUpdateEvent> GetCreateSubject(Guid deviceId) {
return _streams.GetOrAdd(deviceId, Create).Value;
Lazy<BehaviorSubject<DeviceUpdateEvent>> Create(Guid id) {
return new Lazy<BehaviorSubject<DeviceUpdateEvent>>(() => {
var subject = new BehaviorSubject<DeviceUpdateEvent>(DeviceUpdateEvent.GetInitialView(deviceId));
_disposable.Add(subject);
return subject;
});
}
}
public DeviceService(IConnectableObservable<IDeviceView> source) {
_disposable.Add(source
.GroupBy(x => x.DeviceId)
.Subscribe(deviceStream => {
_disposable.Add(deviceStream
.Scan(DeviceUpdateEvent.GetInitialView(deviceStream.Key), DeviceUtils.Update)
.Subscribe(GetCreateSubject(deviceStream.Key)));
}));
_disposable.Add(source.Connect());
}
public void Dispose() {
_disposable.Dispose();
}
public IObservable<DeviceUpdateEvent> GetDeviceStream(Guid deviceId) {
return GetCreateSubject(deviceId).AsObservable();
}
}
[TestMethod]
public async Task Test2() {
var input = new AsyncProducerConsumerQueue<IDeviceView>();
var source = new ConnectableObservableForAsyncProducerConsumerQueue<IDeviceView>(input);
var service = new DeviceService(source);
var ids = Enumerable.Range(0, 100000).Select(i => Guid.NewGuid()).ToArray();
var idsRemaining = ids.ToHashSet();
var t1 = Task.Run(async () => {
foreach (var id in ids) {
await input.EnqueueAsync(new DeviceVoltagesUpdateView { DeviceId = id, Voltage = 1 });
}
});
var t2 = Task.Run(() => {
foreach (var id in ids) {
service.GetDeviceStream(id).Subscribe(x => idsRemaining.Remove(x.View.DeviceId));
}
});
await Task.WhenAll(t1, t2);
var sw = Stopwatch.StartNew();
while (idsRemaining.Count > 0) {
if (sw.Elapsed.TotalSeconds > 600) throw new Exception("Failed");
await Task.Delay(100);
}
}
在這里查看整個問題源代碼和測試代碼: https: //github.com/bboyle1234/ReactiveTest

TA貢獻1785條經驗 獲得超4個贊
你的要點中有一些奇怪的代碼。這是我的工作內容:
public class DeviceService : IDeviceService, IDisposable
{
readonly IObservable<IDeviceView> Source;
private readonly Dictionary<Guid, IObservable<DeviceUpdateEvent>> _updateStreams = new Dictionary<Guid, IObservable<DeviceUpdateEvent>>();
private readonly IObservable<(Guid, IObservable<DeviceUpdateEvent>)> _groupedStream;
private readonly CompositeDisposable _disposable = new CompositeDisposable();
public DeviceService(IObservable<IDeviceView> source)
{
Source = source;
_groupedStream = source
.GroupBy(v => v.DeviceId)
.Select(o => (o.Key, o
.Scan(new DeviceUpdateEvent { View = DeviceTotalView.GetInitialView(o.Key), LastUpdate = null }, (lastTotalView, newView) => lastTotalView.Update(newView))
.Replay(1)
.RefCount()
));
var groupSubscription = _groupedStream.Subscribe(t =>
{
_updateStreams[t.Item1] = t.Item2;
_disposable.Add(t.Item2.Subscribe());
});
_disposable.Add(groupSubscription);
}
public void Dispose()
{
_disposable.Dispose();
}
public IObservable<DeviceUpdateEvent> GetDeviceStream(Guid deviceId)
{
/// How do we implement this? The observable that we return must be pre-loaded with the latest update
if(this._updateStreams.ContainsKey(deviceId))
return this._updateStreams[deviceId];
return _groupedStream
.Where(t => t.Item1 == deviceId)
.Select(t => t.Item2)
.Switch();
}
}
這里的肉就是_groupedStream一塊。正如您所說,您按 DeviceId 進行分組,然后用于Scan更新狀態(tài)。我還轉移Update到靜態(tài)類并使其成為擴展方法。你需要一個初始狀態(tài),所以我修改了你的DeviceTotalView類來獲得它。相應修改:
public class DeviceTotalView : IDeviceView
{
public Guid DeviceId { get; set; }
public int Voltage { get; set; }
public int Currents { get; set; }
public object Clone() => this.MemberwiseClone();
public static DeviceTotalView GetInitialView(Guid deviceId)
{
return new DeviceTotalView
{
DeviceId = deviceId,
Voltage = 0,
Currents = 0
};
}
}
接下來,它.Replay(1).Refcount()會記住最新的更新,然后在訂閱時提供該更新。然后,我們將所有這些子可觀察量填充到字典中,以便在方法調用時輕松檢索。虛擬訂閱 ( _disposable.Add(t.Item2.Subscribe())) 是Replay正常工作所必需的。
如果早期請求尚未更新的 DeviceId,我們會訂閱該設備,它將_groupedStream等待第一次更新,生成該 Id 的可觀察值,然后.Switch訂閱該子可觀察值。
然而,所有這些都在你的測試代碼中失敗了,我猜是因為這個ConnectableObservableForAsyncProducerConsumerQueue類。我不想調試它,因為我不建議這樣做。一般來說,不建議混合 TPL 和 Rx 代碼。他們解決的問題大部分是重疊的,而且互相妨礙。因此,我修改了您的測試代碼,用重播主題替換了可連接的可觀察隊列。
我還添加了早期請求的測試用例(在該設備的更新到達之前):
DeviceUpdateEvent deviceView1 = null;
DeviceUpdateEvent deviceView2 = null;
DeviceUpdateEvent deviceView3 = null;
var subject = new ReplaySubject<IDeviceView>();
var id1 = Guid.NewGuid();
var id2 = Guid.NewGuid();
var id3 = Guid.NewGuid();
subject.OnNext(new DeviceVoltagesUpdateView { DeviceId = id1, Voltage = 1 });
subject.OnNext(new DeviceVoltagesUpdateView { DeviceId = id1, Voltage = 2 });
subject.OnNext(new DeviceVoltagesUpdateView { DeviceId = id2, Voltage = 100 });
var service = new DeviceService(subject);
service.GetDeviceStream(id1).Subscribe(x => deviceView1 = x);
service.GetDeviceStream(id2).Subscribe(x => deviceView2 = x);
service.GetDeviceStream(id3).Subscribe(x => deviceView3 = x);
/// I believe there is no need to pause here because the Subscribe method calls above
/// block until the events have all been pushed into the subscribers above.
Assert.AreEqual(deviceView1.View.DeviceId, id1);
Assert.AreEqual(deviceView2.View.DeviceId, id2);
Assert.AreEqual(deviceView1.View.Voltage, 2);
Assert.AreEqual(deviceView2.View.Voltage, 100);
Assert.IsNull(deviceView3);
subject.OnNext(new DeviceVoltagesUpdateView { DeviceId = id2, Voltage = 101 });
Assert.AreEqual(deviceView2.View.Voltage, 101);
subject.OnNext(new DeviceVoltagesUpdateView { DeviceId = id3, Voltage = 101 });
Assert.AreEqual(deviceView3.View.DeviceId, id3);
Assert.AreEqual(deviceView3.View.Voltage, 101);
這一切都很好,并且可以在沒有異步的情況下運行。
另外,作為一般提示,我建議使用Microsoft.Reactive.Testing包對 Rx 代碼進行單元測試,而不是進行時間間隔測試。
- 2 回答
- 0 關注
- 138 瀏覽
添加回答
舉報