RabbitMQ 中消息確認與返回機制概述
1. 前言
Hello,大家好。本小節(jié)會為大家介紹 RabbitMQ 中的消息確認機制與消息返回機制,這兩個機制是針對消息來說的不同場景而誕生的,其目的就是為了保證 RabbitMQ 中消息的可靠性投遞,以及消息在發(fā)生特定異常時的補償策略。
這兩種機制是 RabbitMQ 自帶的補償機制,在我們開發(fā)應(yīng)用程序時,如果遇到了對應(yīng)的業(yè)務(wù)場景,我們直接來用就可以了,就不需要去使用額外的工具來彌補了。
本節(jié)主要內(nèi)容:
-
消息確認機制概述;
-
消息返回機制概述;
2. 消息確認機制概述
基礎(chǔ)概念:
消息確認機制,是描述消息與 RabbitMQ Server 之間的關(guān)系的一種保障機制,其主要內(nèi)容就是用來監(jiān)聽,當我們應(yīng)用程序中的數(shù)據(jù),即消息,被發(fā)送到 RabbitMQ Server 中之后返回給生產(chǎn)端的一種消息監(jiān)聽機制。消息確認機制描述了一種消息是否已經(jīng)被發(fā)送到 RabbitMQ Server 中以及 RabbitMQ Server 與生產(chǎn)端之間的關(guān)系。
從上述消息確認機制的基本概念可以得出,消息確認機制的作用就是:監(jiān)聽生產(chǎn)端的消息是否已經(jīng)發(fā)送到了 RabbitMQ Server 中,如果消息沒有被發(fā)送到 RabbitMQ Server 中,則消息確認機制不會給生產(chǎn)端返回任何確認應(yīng)答,相反,如果消息被成功發(fā)送到了 RabbitMQ Server 中,則消息確認機制會給生產(chǎn)端返回一個確認應(yīng)答,以通知生產(chǎn)端,消息已經(jīng)發(fā)送到了 RabbitMQ Server 中,概念圖如下所示:

根據(jù)上圖,消息在被成功發(fā)送到 RabbitMQ Server 中之后,RabbitMQ Server 就會給生產(chǎn)端返回一個確認應(yīng)答,這個確認應(yīng)答會包含兩種結(jié)果,一種就是消息發(fā)送到了 RabbitMQ Server ,RabbitMQ Server 收到了該消息,這時會給生產(chǎn)者返回 ack 的確認應(yīng)答, 表示消息已經(jīng)被接收。
另一種就是消息沒有發(fā)送到 RabbitMQ Server ,RabbitMQ Server 沒有收到該消息,這時會給生產(chǎn)者返回一個 nack 的確認應(yīng)答,即 no ack , 表示沒有接收到該消息。
我們在了解了消息確認機制的基礎(chǔ)概念和作用之后,我們還需要了解在 RabbitMQ 中,如何通過代碼來實現(xiàn) RabbitMQ 的消息確認機制。
代碼實現(xiàn):
實現(xiàn)消息確認機制,只需要在生產(chǎn)端進行配置即可,代碼如下:
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("xx");
connectionFactory.setPort("5672");
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChanel();
channel.confirmSelect();
channel.basicPublish(exchangeName, routingKey, false, null, msg.getBytes());
channel.addConfirmListener(new ConfirmListener(){
@Override
public void handleAck(long l, boolean b) throws IOException{
// do something...
}
@Override
public void handleNack(long l, boolean b) throws IOException{
// do something...
}
});
代碼解釋:
第 1-5 行,我們使用 ConnectionFactory 創(chuàng)建了一個客戶端連接 RabbitMQ Server 的連接。
第 6 行,我們使用建立好的連接,來創(chuàng)建了一個頻道 channel 。
第 7 行,我們使用 channel 的 basicPublish 方法來將我們的消息發(fā)送到 RabbitMQ Server 中。
第 8 行,我們?yōu)?channel 綁定了一個消息確認機制的監(jiān)聽器,即 addConfirmListener ,且我們通過 new ConfirmListener 匿名內(nèi)部類的方式,來重寫了消息確認監(jiān)聽器中的 handleAck 方法和 handleNack 方法,其中,handleAck 方法表示消息已經(jīng)被 RabbitMQ Server 接收可以返回 ack 的確認應(yīng)答,handleNack 方法則表示方法沒有被 RabbitMQ Server 接收可以返回 nack 的確認應(yīng)答。
Tips: 1. 配置消息確認機制我們需要先配置 confirmSelect 方法來聲明消息確認機制,接著我們需要為 channel 添加 addConfirmListener 消息確認監(jiān)聽器,并重寫其中的 handleAck 和 handleNack 方法,最后需要根據(jù) RabbitMQ Server 返回的確認應(yīng)答在上述兩個方法中完成我們需要處理的業(yè)務(wù)邏輯;
2. 如果需要啟用消息確認機制,那么我們就不能自主的去關(guān)閉頻道 channel 和 連接 connection,因為消息確認機制的返回結(jié)果是異步返回的,如果我們在向 RabbitMQ Server 發(fā)送了消息之后,就關(guān)閉了對應(yīng)的 channel 和 connection ,那么我們就收不到任何消息確認的結(jié)果了。
3. 消息返回機制概述
基礎(chǔ)概念:
消息返回機制,是描述不可達的消息與生產(chǎn)者之間的一種保障策略,其主要內(nèi)容就是用來監(jiān)聽,RabbitMQ Server 中是否存在不可達的消息,并根據(jù)監(jiān)聽結(jié)果返回給生產(chǎn)端的一種監(jiān)聽機制,即消息返回機制描述了一種 RabbitMQ Server 中的不可達消息與生產(chǎn)端的關(guān)系。
從上述消息返回機制的基本概念可以得出,消息返回機制的作用就是:監(jiān)聽生產(chǎn)端發(fā)動到 RabbitMQ Server 中的消息是否可達,如果消息不可達,則返回一個信號通知生產(chǎn)端,相反,如果消息可達,則不會通知生產(chǎn)端任何信號,概念圖如下所示:

根據(jù)上圖,消息在被成功發(fā)送到 RabbitMQ Server 中之后,如果消息在經(jīng)過當前配置的 exchangeName 或 routingKey 沒有找到指定的交換機,或沒有匹配到對應(yīng)的消息隊列,那么這個消息就被稱為不可達的消息,如果此時配置了消息返回機制,那么此時 RabbitMQ Server 會返回給生產(chǎn)端一個信號,信號中包括消息不可達的原因,以及消息本身的內(nèi)容。
我們在了解了消息返回機制的基礎(chǔ)概念和作用之后,我們還需要了解在 RabbitMQ 中,如何通過代碼來實現(xiàn) RabbitMQ 的消息返回機制。
代碼實現(xiàn):
實現(xiàn)消息返回機制,也是只需要在生產(chǎn)端進行配置即可,代碼如下:
// 省略客戶端連接 RabbitMQ Server 的過程
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChanel();
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int i, String s, String s1, String s2, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException {
// do something...
);
}
});
channel.basicPublish(exchangeName, routingKey + "01", true, null, msg.getBytes());
代碼解釋:
第 1-2 行,我們創(chuàng)建了客戶端連接 RabbitMQ Server 的連接,并且創(chuàng)建了一個 channel 。
第 3 行,我們?yōu)?channel 添加了 addReturnListener 消息返回的監(jiān)聽器,并且通過 new ReturnListener 匿名內(nèi)部類的方式來重寫 handleReturn 方法。
第 12 行,我們通過 channel 的 basicPublish 方法,將我們的消息發(fā)送出去,其中,在 basicPublish 方法中,我們只需要了解前三個參數(shù)即可:第一個參數(shù)表示交換機的名稱,第二個參數(shù)表示路由 Key 的名稱,第三個參數(shù)是 mandatory 屬性,表示是否開啟消息返回機制,如果這個屬性被置為 false ,則消息返回監(jiān)聽器就不會生效。
Tips: 1. 要使用消息返回機制,就一定要配置 basicPublish 方法中的第三個參數(shù)的值為 true ,否則,即使添加了 addReturnListener 監(jiān)聽器,不可達的消息也不會被監(jiān)聽到;
2. 如果我們沒有配置第三個參數(shù)的屬性為 true ,那么,當 RabbitMQ Server 中存在不可達消息時,RabbitMQ 就會自動將該消息刪除。
4. 小結(jié)

本小節(jié)為各位同學介紹了 RabbitMQ 中的消息確認機制,以及消息返回機制。從消息確認機制和消息返回機制的基礎(chǔ)概念開始,到不同機制的代碼實現(xiàn)結(jié)束,詳細介紹了什么是消息確認機制,以及什么是消息返回機制,且通過不同機制的代碼實現(xiàn),分別闡述了如何通過代碼來對兩種機制進行配置。希望同學們可以完全理解兩種機制的基礎(chǔ)概念和實現(xiàn)方式。
消息確認機制和消息返回機會的基本內(nèi)容就全部介紹完畢了,但是有一種現(xiàn)象同學們可以課下嘗試一下,那就是當兩種機制都進行配置之后,如果我們的消息不可達了,那么消息確認的監(jiān)聽器還會監(jiān)聽到嗎?