2 回答

TA貢獻1795條經(jīng)驗 獲得超7個贊
由于MessageReceiver.CloseAsync()提到如下:
關(guān)閉客戶端。關(guān)閉由它打開的連接。
按我的測試,稱為后MessageReceiver.CloseAsync()
,后續(xù)調(diào)用CompleteAsync
,DeadLetterAsync
將自實例失敗IMessageReceiver
已被釋放。如果你仍然想完成你的隊列消息,你需要創(chuàng)建一個新的MessageReceiver
.
有什么方法可以通過庫本身檢查操作是否仍在運行以延遲任務取消?
AFAIK,SDK 目前不提供上述功能。此外,這里還有一個類似的關(guān)于正常關(guān)閉 Azure 服務總線的消息泵的反饋。
當服務停止時,我想在進程被終止之前給所有操作完成的機會。
對于您的要求,我假設您需要自己實現(xiàn)它以確保即使在 MessageReceiver 關(guān)閉后也可以成功處理接收到的隊列消息?;蛘吣梢詫?code>CancellationToken參數(shù)傳遞到您的HandleMessage
方法中以顯式取消而不是完成檢索到的消息。

TA貢獻1786條經(jīng)驗 獲得超13個贊
另一個答案表明,不幸的是,現(xiàn)在無法實現(xiàn)該功能,即使該功能在庫本身中不可用,也很受歡迎。一種替代方法是創(chuàng)建您自己的接收消息泵,但隨后您就可以自己進行斷開連接、管理等操作,盡管正常關(guān)閉本身并不難。使用當前的方法,我設法編寫了一種解決方法,即使它很笨拙,它似乎也能正常工作。
private async Task StartReceiveLoop(IMessageReceiver receiver, CancellationToken cancellationToken)
{
int activeMessageHandlersCount = 0;
var doneReceiving = new TaskCompletionSource<bool>();
cancellationToken.Register(() =>
{
lock (receiver)
{
int attemptCount = 0;
while (attemptCount++ < 10 && activeMessageHandlersCount > 0)
{
Thread.Sleep(1000);
}
receiver.CloseAsync();
}
doneReceiving.SetResult(true);
});
receiver.RegisterMessageHandler(
async (message, ct) =>
{
bool canBeProcessed;
lock (receiver)
{
canBeProcessed = !cancellationToken.IsCancellationRequested;
if (canBeProcessed)
{
Interlocked.Increment(ref activeMessageHandlersCount);
}
}
if (canBeProcessed)
{
try
{
await HandleMessage(receiver, message);
}
finally
{
Interlocked.Decrement(ref activeMessageHandlersCount);
}
}
else
{
await Task.Delay(60000); // Otherwise message receiver will keep pumping message during graceful shutdown
}
}, new MessageHandlerOptions(HandleException));
await doneReceiving.Task;
}
另一個缺點是 Receiver 將接收大量消息而不進行處理,并將它們保留到鎖定到期。
- 2 回答
- 0 關(guān)注
- 219 瀏覽
添加回答
舉報