1. 前言
RabbitMQ經(jīng)常被用于服務(wù)模塊之間的解耦以及高并發(fā)削峰場景,之前的章節(jié)討論了不同服務(wù)模式的特點(diǎn),但是在生產(chǎn)環(huán)境中,因?yàn)闄C(jī)器以及網(wǎng)絡(luò)設(shè)備的不可靠,保證消息的可靠是待解決的問題。在特定場景下消息可能存在丟失風(fēng)險(xiǎn),本文將介紹如何預(yù)防這類的風(fēng)險(xiǎn)。
2. RabbitMQ消息丟失的場景
面試官提問:RabbitMQ 消息隊(duì)列,在哪些場景下可能會丟失消息?
題目解析:
我們可以將 RabbitMQ 消息處理的過程分為三個(gè)步驟:
(1)生產(chǎn)階段:生產(chǎn)者生產(chǎn)消息并且發(fā)送到消息隊(duì)列;
(2)儲存階段:消息隊(duì)列存儲和處理消息;
(3)消費(fèi)階段:消息隊(duì)列將消息轉(zhuǎn)發(fā)到消費(fèi)者。
上述每個(gè)步驟都有消息丟失的風(fēng)險(xiǎn),候選人需要按順序分別解釋不同場景可能丟失的原因以及解決方案。
2.1 生產(chǎn)者生產(chǎn)消息并且發(fā)送到消息隊(duì)列
可能發(fā)生消息丟失的場景:網(wǎng)絡(luò)故障。網(wǎng)絡(luò)環(huán)境的不可靠導(dǎo)致消息發(fā)送失敗,例如網(wǎng)絡(luò)丟包、網(wǎng)絡(luò)故障。數(shù)據(jù)在網(wǎng)絡(luò)中傳輸會經(jīng)過諸多網(wǎng)絡(luò)設(shè)備,只要其中一個(gè)網(wǎng)絡(luò)鏈接在數(shù)據(jù)抵達(dá)前已經(jīng)流量滿載,新到的數(shù)據(jù)將會阻塞一段時(shí)間段。另外比較少見的例子是施工挖斷光纖或者其他原因?qū)е掠布用娴拈L時(shí)間不可用。
參考解決方案是使用AMQP協(xié)議的事務(wù)機(jī)制。生產(chǎn)者在發(fā)出消息之后,消息是否到達(dá)RabbitMQ服務(wù)器是默認(rèn)不可知的,所以在生產(chǎn)者發(fā)送消息之前,調(diào)用channel.txSelect
語句開啟事務(wù),如果消息發(fā)送失敗,那么調(diào)用channel.txRollback
回滾事務(wù),重新發(fā)送一條消息;如果消息發(fā)送成功,那么調(diào)用channel.txCommit
提交事務(wù)。
采用事務(wù)的缺點(diǎn)是增加耗時(shí),會降低RabbitMQ的吞吐性能。
所以RabbitMQ還有一種性能改進(jìn)方案,即Confirm機(jī)制,步驟如下:
(1)生產(chǎn)者調(diào)用channel.confirmSelect
將通信方式設(shè)置為confirm
模式;
(2)生產(chǎn)者發(fā)送的所有消息都會被分配一個(gè)唯一 ID;
(3)當(dāng)生產(chǎn)者發(fā)送的消息成功投遞到隊(duì)列之后,RabbitMQ會發(fā)送一個(gè)確認(rèn)給生產(chǎn)者,生產(chǎn)者即得知這條消息已經(jīng)成功發(fā)送。
2.2 消息隊(duì)列存儲和處理消息
可能發(fā)生消息丟失的場景:服務(wù)器宕機(jī)。消息存儲在 RabbitMQ 隊(duì)列中,如果隊(duì)列沒有持久化,RabbitMQ 服務(wù)器重啟會導(dǎo)致消息丟失。
參考解決方案是對消息隊(duì)列持久化,分為三個(gè)步驟:
(1)Exchange 持久化:以 Direct 模式為例,將 durable 參數(shù)設(shè)置為 true。示例:
@Bean
DirectExchange testExchange() {
return new DirectExchange(Constants.EXCHANGE_NAME, true, false);
}
(2)Queue 持久化:將 durable 參數(shù)設(shè)置為 true,但是這樣只能保證持久化 Queue 的元數(shù)據(jù),但是不會持久化 Queue 里存儲的消息。示例:
@Bean
public Queue testQueue() {
return new Queue(Constants.QUEUE_NAME);
}
(3)消息持久化:發(fā)送消息的時(shí)候?qū)eliveryMode設(shè)置為2,SpringBoot中的rabbitTemplate默認(rèn)設(shè)置消息是持久化,所以我們不需要手動(dòng)配置,具體原因可參考源碼,示例:
public enum MessageDeliveryMode {
NON_PERSISTENT,
PERSISTENT;
private MessageDeliveryMode() {
}
public static int toInt(MessageDeliveryMode mode) {
switch(mode) {
//非持久化模式
case NON_PERSISTENT:
return 1;
//持久化模式
case PERSISTENT:
return 2;
default:
return -1;
}
}
2.3 消息隊(duì)列將消息轉(zhuǎn)發(fā)到消費(fèi)者
可能發(fā)生消息丟失的場景:消費(fèi)者在收到消息之后,還沒來得及處理消息的消費(fèi)邏輯,所在機(jī)器就宕機(jī)了,導(dǎo)致內(nèi)存中的消息丟失。
參考解決方案是在消費(fèi)端開啟手動(dòng) ACK 模式。RabbitMQ 默認(rèn)采用自動(dòng) ACK 機(jī)制,在沒有處理業(yè)務(wù)邏輯之前,消費(fèi)者就會告知消息隊(duì)列已經(jīng)成功收到消息,這種方式并不符合我們的預(yù)期。
以 SpringBoot 的配置方式為例,有兩種配置手動(dòng) ACK 的方式:
(1)yml文件修改全局確認(rèn)模式,示例:
spring.rabbitmq.listener.simple.acknowledge-mode=manual
(2)在自動(dòng)注入 RabbitListenerContainerFactory 時(shí)開啟手動(dòng)ACK,示例:
@Bean
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
//1. 創(chuàng)建工廠
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
//2. 設(shè)置手動(dòng)ACK模式,即AcknowledgeMode.MANUAL
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return factory;
}
3. 小結(jié)
本章節(jié)介紹了 RabbitMQ 作為消息隊(duì)列,容易產(chǎn)生消息丟失的三種場景,以及針對每種場景的關(guān)鍵解決方案,從性質(zhì)上可以分為持久化和消息確認(rèn)機(jī)制。拋開題目本身來說,建議候選人通過本地環(huán)境實(shí)戰(zhàn)來體驗(yàn)每種解決方案的具體編碼,加強(qiáng)對方案的理解。