第七色在线视频,2021少妇久久久久久久久久,亚洲欧洲精品成人久久av18,亚洲国产精品特色大片观看完整版,孙宇晨将参加特朗普的晚宴

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

合并兩個可觀察量時保留排序

合并兩個可觀察量時保留排序

C#
森林海 2023-07-22 16:19:15
我想合并 2 個可觀察值并保持順序(可能基于選擇器)。我還想對可觀察的來源施加反壓力。因此,選擇器會選擇其中一個項目通過可觀察對象進行推送,而另一個項目也會等待另一個項目進行比較。Src1、Src2 和 Result 都是 類型IObservable<T>。Src1: { 1,3,6,8,9,10 }Src2: { 2,4,5,7,11,12 }Result: 1,2,3,4,5,6,7,8,9,10,11,12Timeline:Src1:    -1---3----6------8----9-10Src2:    --2-----4---5-7----11---------12Result:  --1--2--3-4-5-6--7-8--9-10-11-12在上面的示例中,src1 發(fā)出“1”并被阻塞,直到 src2 發(fā)出它的第一項“2”。應用一個選擇器來選擇最小的項目,該選擇器從 src1 中選擇項目。Src2 現在等待下一個項目(來自 src1)與其當前項目(“2”)進行比較。當 src1 發(fā)出下一個項目“3”時,再次運行選擇,這次從 src2 中選擇該項目。重復此過程,直到其中一個可觀察量完成。然后,剩余的 observable 會推送項目直到完成。使用現有的 .net Rx 方法可以實現這一點嗎?編輯:請注意,保證 2 個源可觀察量是有序的。測試示例:var source1 = new List<int>() { 1, 4, 6, 7, 8, 10, 14 }.AsEnumerable();var source2 = new List<int>() { 2, 3, 5, 9, 11, 12, 13, 15 }.AsEnumerable();var src1 = source1.ToObservable();var src2 = source2.ToObservable();var res = src1.SortedMerge(src2, (a, b) =>    {       if (a <= b)           return a;       else           return b;    });res.Subscribe((x) => Console.Write($"{x}, "));期望結果:1,2,3,4,5,6,7,8,9,10,11,12,13,14,15
查看完整描述

1 回答

?
躍然一笑

TA貢獻1826條經驗 獲得超6個贊

這很有趣。必須稍微調整一下算法。還可以進一步改進。

假設:

  1. 有兩個通用類型 的streamA流。streamBT

  2. 兩個流分別排序,使得streamA[i] < streamA[i+1]streamB[i] < stream[i+1]

  3. 您不能假設streamA[i]和之間有任何關系streamB[i]。

  4. 流 A 和 B 是謹慎的:相同的元素不會從兩者中發(fā)出。如果發(fā)生這種情況,我會扔掉NotImplementedException。這個案子很容易處理,但我想避免歧義。

  5. 有一個min類型的函數T。

  6. 沒有對兩條流的相對速度做出任何假設,但如果其中一條始終比另一條快,則背壓將成為問題。

這是我使用的算法:

  • 設兩個隊列,qA并且qB。

  • 當您從 獲取一個項目時streamA,將其排隊到qA。

  • 當您從 獲取一個項目時streamB,將其排隊到qB

  • 當和 qA中都有一個項目時qB,比較兩個隊列的頂部項目。刪除并發(fā)出這兩者的最小值。如果兩個隊列仍然非空,則重復。

  • 如果 或streamA完成streamB,則轉儲隊列的內容并終止。注意這無疑是懶惰的,可能應該更改為轉儲,然后繼續(xù)返回未完成的 observable。

這是代碼:

public static IObservable<T> SortedMerge<T>(this IObservable<T> source, IObservable<T> other)

{

    return SortedMerge(source, other, (a, b) => Enumerable.Min(new[] { a, b}));

}


public static IObservable<T> SortedMerge<T>(this IObservable<T> source, IObservable<T> other, Func<T, T, T> min)

{

    return source

        .Select(i => (key: 1, value: i)).Materialize()

        .Merge(other.Select(i => (key: 2, value: i)).Materialize())

        .Scan((qA: ImmutableQueue<T>.Empty, qB: ImmutableQueue<T>.Empty, exception: (Exception)null, outputMessages: new List<T>()), 

            (state, message) =>

        {

            if (message.Kind == NotificationKind.OnNext)

            {

                var key = message.Value.key;

                var value = message.Value.value;

                var qA = state.qA;

                var qB = state.qB;

                if (key == 1)

                    qA = qA.Enqueue(value);

                else

                    qB = qB.Enqueue(value);

                var output = new List<T>();

                while(!qA.IsEmpty && !qB.IsEmpty)

                {

                    var aVal = qA.Peek();

                    var bVal = qB.Peek();

                    var minVal = min(aVal, bVal);

                    if(aVal.Equals(minVal) && bVal.Equals(minVal))

                        throw new NotImplementedException();


                    if(aVal.Equals(minVal))

                    {

                        output.Add(aVal);

                        qA = qA.Dequeue();

                    }

                    else

                    {

                        output.Add(bVal);

                        qB = qB.Dequeue();

                    }

                }

                return (qA, qB, null, output);

            }

            else if (message.Kind == NotificationKind.OnError)

            {

                return (state.qA, state.qB, message.Exception, new List<T>());

            }

            else //message.Kind == NotificationKind.OnCompleted

            {

                var output = state.qA.Concat(state.qB).ToList();

                return (ImmutableQueue<T>.Empty, ImmutableQueue<T>.Empty, null, output);

            }

        })

        .Publish(tuples => Observable.Merge(

            tuples

                .Where(t => t.outputMessages.Any() && (!t.qA.IsEmpty || !t.qB.IsEmpty))

                .SelectMany(t => t.outputMessages

                    .Select(v => Notification.CreateOnNext<T>(v))

                    .ToObservable()

            ),

            tuples

                .Where(t => t.outputMessages.Any() && t.qA.IsEmpty && t.qB.IsEmpty)

                .SelectMany(t => t.outputMessages

                    .Select(v => Notification.CreateOnNext<T>(v))

                    .ToObservable()

                    .Concat(Observable.Return(Notification.CreateOnCompleted<T>()))

            ),

            tuples

                .Where(t => t.exception != null)

                .Select(t => Notification.CreateOnError<T>(t.exception))

        ))

        .Dematerialize();

ImmutableQueue來自System.Collections.Immutable. Scan需要跟蹤狀態(tài)。由于OnCompleted處理需要具體化。誠然,這是一個復雜的解決方案,但我不確定是否有更干凈的以 Rx 為中心的方法。


如果您需要進一步說明,請告訴我。


查看完整回答
反對 回復 2023-07-22
  • 1 回答
  • 0 關注
  • 130 瀏覽

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優(yōu)惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號