1 回答

TA貢獻1826條經驗 獲得超6個贊
這很有趣。必須稍微調整一下算法。還可以進一步改進。
假設:
有兩個通用類型 的
streamA
流。streamB
T
兩個流分別排序,使得
streamA[i] < streamA[i+1]
和streamB[i] < stream[i+1]
。您不能假設
streamA[i]
和之間有任何關系streamB[i]
。流 A 和 B 是謹慎的:相同的元素不會從兩者中發(fā)出。如果發(fā)生這種情況,我會扔掉
NotImplementedException
。這個案子很容易處理,但我想避免歧義。有一個
min
類型的函數T
。沒有對兩條流的相對速度做出任何假設,但如果其中一條始終比另一條快,則背壓將成為問題。
這是我使用的算法:
設兩個隊列,
qA
并且qB
。當您從 獲取一個項目時
streamA
,將其排隊到qA
。當您從 獲取一個項目時
streamB
,將其排隊到qB
。當和
qA
中都有一個項目時qB
,比較兩個隊列的頂部項目。刪除并發(fā)出這兩者的最小值。如果兩個隊列仍然非空,則重復。如果 或
streamA
完成streamB
,則轉儲隊列的內容并終止。注意:這無疑是懶惰的,可能應該更改為轉儲,然后繼續(xù)返回未完成的 observable。
這是代碼:
public static IObservable<T> SortedMerge<T>(this IObservable<T> source, IObservable<T> other)
{
return SortedMerge(source, other, (a, b) => Enumerable.Min(new[] { a, b}));
}
public static IObservable<T> SortedMerge<T>(this IObservable<T> source, IObservable<T> other, Func<T, T, T> min)
{
return source
.Select(i => (key: 1, value: i)).Materialize()
.Merge(other.Select(i => (key: 2, value: i)).Materialize())
.Scan((qA: ImmutableQueue<T>.Empty, qB: ImmutableQueue<T>.Empty, exception: (Exception)null, outputMessages: new List<T>()),
(state, message) =>
{
if (message.Kind == NotificationKind.OnNext)
{
var key = message.Value.key;
var value = message.Value.value;
var qA = state.qA;
var qB = state.qB;
if (key == 1)
qA = qA.Enqueue(value);
else
qB = qB.Enqueue(value);
var output = new List<T>();
while(!qA.IsEmpty && !qB.IsEmpty)
{
var aVal = qA.Peek();
var bVal = qB.Peek();
var minVal = min(aVal, bVal);
if(aVal.Equals(minVal) && bVal.Equals(minVal))
throw new NotImplementedException();
if(aVal.Equals(minVal))
{
output.Add(aVal);
qA = qA.Dequeue();
}
else
{
output.Add(bVal);
qB = qB.Dequeue();
}
}
return (qA, qB, null, output);
}
else if (message.Kind == NotificationKind.OnError)
{
return (state.qA, state.qB, message.Exception, new List<T>());
}
else //message.Kind == NotificationKind.OnCompleted
{
var output = state.qA.Concat(state.qB).ToList();
return (ImmutableQueue<T>.Empty, ImmutableQueue<T>.Empty, null, output);
}
})
.Publish(tuples => Observable.Merge(
tuples
.Where(t => t.outputMessages.Any() && (!t.qA.IsEmpty || !t.qB.IsEmpty))
.SelectMany(t => t.outputMessages
.Select(v => Notification.CreateOnNext<T>(v))
.ToObservable()
),
tuples
.Where(t => t.outputMessages.Any() && t.qA.IsEmpty && t.qB.IsEmpty)
.SelectMany(t => t.outputMessages
.Select(v => Notification.CreateOnNext<T>(v))
.ToObservable()
.Concat(Observable.Return(Notification.CreateOnCompleted<T>()))
),
tuples
.Where(t => t.exception != null)
.Select(t => Notification.CreateOnError<T>(t.exception))
))
.Dematerialize();
ImmutableQueue來自System.Collections.Immutable. Scan需要跟蹤狀態(tài)。由于OnCompleted處理需要具體化。誠然,這是一個復雜的解決方案,但我不確定是否有更干凈的以 Rx 為中心的方法。
如果您需要進一步說明,請告訴我。
- 1 回答
- 0 關注
- 130 瀏覽
添加回答
舉報