第七色在线视频,2021少妇久久久久久久久久,亚洲欧洲精品成人久久av18,亚洲国产精品特色大片观看完整版,孙宇晨将参加特朗普的晚宴

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

如何確保在關(guān)閉 MessageReciever 期間完成 MessageHandler 委托?

如何確保在關(guān)閉 MessageReciever 期間完成 MessageHandler 委托?

C#
繁星點點滴滴 2021-08-22 17:29:03
我有一個服務,它使用MessageReceiverfromMicrosoft.Azure.ServiceBus連續(xù)收聽 ServiceBus 中的訂閱。當服務停止時,我想在進程被終止之前給所有操作完成的機會。這是我根據(jù)庫提供的示例使用的代碼:private async Task StartReceiveLoop(IMessageReceiver receiver, CancellationToken cancellationToken){    var doneReceiving = new TaskCompletionSource<bool>();    cancellationToken.Register(() =>    {        receiver.CloseAsync();        doneReceiving.SetResult(true);    });    receiver.RegisterMessageHandler(        async (message, ct) => await HandleMessage(receiver, message),        new MessageHandlerOptions(HandleException));    await doneReceiving.Task;}在服務停止時,我取消了任務,即使服務HandleMessage仍在運行,它也會立即被終止。有什么方法可以通過庫本身檢查操作是否仍在運行以延遲任務取消?我可以想到一種通過鎖定所有正在運行的任務來進行自己計數(shù)的方法,但我希望有一種更好的方法可以讓我知道正在運行的處理程序的數(shù)量。理想情況下,我想取消注冊處理程序,以便消息泵停止,而接收器本身不會關(guān)閉以允許例如 CompleteAsync 調(diào)用。
查看完整描述

2 回答

?
一只萌萌小番薯

TA貢獻1795條經(jīng)驗 獲得超7個贊

由于MessageReceiver.CloseAsync()提到如下:

關(guān)閉客戶端。關(guān)閉由它打開的連接。

按我的測試,稱為后MessageReceiver.CloseAsync(),后續(xù)調(diào)用CompleteAsync,DeadLetterAsync將自實例失敗IMessageReceiver已被釋放。如果你仍然想完成你的隊列消息,你需要創(chuàng)建一個新的MessageReceiver.

有什么方法可以通過庫本身檢查操作是否仍在運行以延遲任務取消?

AFAIK,SDK 目前不提供上述功能。此外,這里還有一個類似的關(guān)于正常關(guān)閉 Azure 服務總線的消息泵的反饋

當服務停止時,我想在進程被終止之前給所有操作完成的機會。

對于您的要求,我假設您需要自己實現(xiàn)它以確保即使在 MessageReceiver 關(guān)閉后也可以成功處理接收到的隊列消息?;蛘吣梢詫?code>CancellationToken參數(shù)傳遞到您的HandleMessage方法中以顯式取消而不是完成檢索到的消息。


查看完整回答
反對 回復 2021-08-22
?
開滿天機

TA貢獻1786條經(jīng)驗 獲得超13個贊

另一個答案表明,不幸的是,現(xiàn)在無法實現(xiàn)該功能,即使該功能在庫本身中不可用,也很受歡迎。一種替代方法是創(chuàng)建您自己的接收消息泵,但隨后您就可以自己進行斷開連接、管理等操作,盡管正常關(guān)閉本身并不難。使用當前的方法,我設法編寫了一種解決方法,即使它很笨拙,它似乎也能正常工作。


private async Task StartReceiveLoop(IMessageReceiver receiver, CancellationToken cancellationToken)

{

    int activeMessageHandlersCount = 0;

    var doneReceiving = new TaskCompletionSource<bool>();


    cancellationToken.Register(() =>

    {

        lock (receiver)

        {

            int attemptCount = 0;

            while (attemptCount++ < 10 && activeMessageHandlersCount > 0)

            {

                Thread.Sleep(1000);

            }


            receiver.CloseAsync();

        }


        doneReceiving.SetResult(true);

    });


    receiver.RegisterMessageHandler(

        async (message, ct) =>

        {

            bool canBeProcessed;

            lock (receiver)

            {

                canBeProcessed = !cancellationToken.IsCancellationRequested;

                if (canBeProcessed)

                {

                    Interlocked.Increment(ref activeMessageHandlersCount);

                }

            }


            if (canBeProcessed)

            {

                try

                {

                    await HandleMessage(receiver, message);

                }

                finally

                {

                    Interlocked.Decrement(ref activeMessageHandlersCount);

                }

            }

            else

            {

                await Task.Delay(60000); // Otherwise message receiver will keep pumping message during graceful shutdown

            }

        }, new MessageHandlerOptions(HandleException));


    await doneReceiving.Task;

}

另一個缺點是 Receiver 將接收大量消息而不進行處理,并將它們保留到鎖定到期。


查看完整回答
反對 回復 2021-08-22
  • 2 回答
  • 0 關(guān)注
  • 219 瀏覽

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯(lián)系客服咨詢優(yōu)惠詳情

幫助反饋 APP下載

慕課網(wǎng)APP
您的移動學習伙伴

公眾號

掃描二維碼
關(guān)注慕課網(wǎng)微信公眾號