RabbitMQ本身不支持延遲隊(duì)列!
RabbitMQ本身沒有延遲隊(duì)列的支持,但是基于其本身的一些特性,可以做到類似延遲隊(duì)列的效果:基于死信交換器+TTL。
以下介紹下相關(guān)概念及方法
Dead Letter Exchanges
消息在隊(duì)列滿足達(dá)到一定的條件,會(huì)被認(rèn)為是死信消息(dead-lettered),這時(shí)候,RabbitMQ會(huì)重新把這類消息發(fā)到另外一個(gè)的exchange,這個(gè)exchange稱為Dead Letter Exchanges.
以下任一條件滿足,即可認(rèn)為是死信:
消息被拒絕消費(fèi)(basic.reject or basic.nack)并且設(shè)置了requeue=fasle 消息的TTL到了(消息過期) 達(dá)到了隊(duì)列的長(zhǎng)度限制
需要注意的是,Dead letter exchanges (DLXs) 其實(shí)就是普通的exchange,可以和正常的exchange一樣的聲明或者使用。
死信消息路由
隊(duì)列中可以設(shè)置兩個(gè)屬性:
x-dead-letter-exchange x-dead-letter-routing-key
當(dāng)這個(gè)隊(duì)列里面的消息成為死信之后,就會(huì)投遞到x-dead-letter-exchange指定的exchange中,其中帶著的routing key就是中指定的值x-dead-letter-routing-key。
而如果使用默認(rèn)的exchange(routing key就是希望指定的隊(duì)列),則只需要把x-dead-letter-exchange設(shè)置為空(不能不設(shè)置),類似下面

死信消息的路由則會(huì)根據(jù)x-dead-letter-routing-key所指定的進(jìn)行路由,如果這個(gè)值沒有指定,則會(huì)按照消息一開始發(fā)送的時(shí)候指定的routing key進(jìn)行路由
Dead-lettered messages are routed to their dead letter exchange either:
with the routing key specified for the queue they were on; or, if this was not set, with the same routing keys they were originally published with.
例如,如果一開始你對(duì)exchange X發(fā)送消息,帶著routing key “foo”,進(jìn)入了隊(duì)列 Q然后消息變死信后,他會(huì)被重新發(fā)送到 dead letter exchange ,其中發(fā)給dead letter exchange帶著的routing key 還是foo。但如果這個(gè)隊(duì)列Q本身是設(shè)置了x-dead-letter-routing-key bar, 那么他發(fā)送到 dead letter exchange的時(shí)候,帶著的routing key 就是bar。
需要注意的是,當(dāng)死信消息重新路由到新的隊(duì)列的時(shí)候,在死信目標(biāo)隊(duì)列確認(rèn)收到這條死信消息之前,原來隊(duì)列的消息是不會(huì)刪除的,也就是說在某些異常場(chǎng)景下例如broker突然shutdown,是有機(jī)會(huì)存在說一個(gè)消息既存在于原隊(duì)列,又存在于死信目標(biāo)隊(duì)列。具體可參考官方說明:
Dead-lettered messages are re-published with publisher confirms turned on internally so, the “dead-letter queues” (DLX routing targets) the messages eventually land on must confirm the messages before they are removed from the original queue. In other words, the “publishing” (the one in which messages expired) queue will not remove messages before the dead-letter queues acknowledge receiving them (see Confirms for details on the guarantees made). Note that, in the event of an unclean broker shutdown, the same message may be duplicated on both the original queue and on the dead-lettering destination queues.
Time-To-Live(TTL)
開頭我們說過,實(shí)現(xiàn)延遲隊(duì)列除了用死信消息外,還需要利用消息過期的TTL機(jī)制,因?yàn)橹灰⑦^期了,就會(huì)觸發(fā)死信。
RabbitMQ有兩種方法讓設(shè)置消息的TTL:
直接在消息上設(shè)置
byte[] messageBodyBytes = "Hello, world!".getBytes();
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.expiration("60000")
.build();
channel.basicPublish("my-exchange", "routing-key", properties, messageBodyBytes);
為隊(duì)列設(shè)置消息過期TTL

注意,隊(duì)列還有一個(gè)隊(duì)列TTL,x-expires,這個(gè)的意思是隊(duì)列空置經(jīng)過一段時(shí)間(沒有消費(fèi)者,沒有被重新聲明,沒有人在上面獲取消息(basic.get))后,整個(gè)隊(duì)列便會(huì)過期刪除,不要混淆
如果同時(shí)設(shè)置了消息的過期和隊(duì)列消息過期屬性,則取兩個(gè)較小值。
設(shè)計(jì)延遲隊(duì)列:
例如,我們需要觸發(fā)一個(gè)推送新聞,30分鐘后統(tǒng)計(jì)這個(gè)新聞的下發(fā)情況,我們就需要一個(gè)延遲隊(duì)列,新聞推送后,往延遲隊(duì)列發(fā)送一個(gè)消息,這個(gè)隊(duì)列的消息在30分鐘后被消費(fèi),這時(shí)候觸發(fā)即可統(tǒng)計(jì)30分鐘的下發(fā)情況。我們可以這樣設(shè)計(jì):
定義一個(gè)正常的隊(duì)列:ARRIVAL_STAT,統(tǒng)計(jì)程序監(jiān)聽此隊(duì)列,進(jìn)行消費(fèi)。
定義一個(gè)“延遲隊(duì)列”(RabbitMQ沒有這樣的隊(duì)列,這里只是人為的制造一個(gè)這樣的隊(duì)列):DELAY_ARRIVAL_STAT,其中設(shè)置好對(duì)應(yīng)的x-dead-letter-exchange,x-dead-letter-routing-key。為了簡(jiǎn)單說明,我使用默認(rèn)的exchange,那么配置如下:
x-dead-letter-exchange=“”
x-dead-letter-routing-key=“ARRIVAL_STAT”
意思是,消息當(dāng)這個(gè)隊(duì)列DELAY_ARRIVAL_STAT的消息變死信之后,就會(huì)帶著routing key “ARRIVAL_STAT”發(fā)送默認(rèn)的空exchange,即隊(duì)列ARRIVAL_STAT。
并且這個(gè)隊(duì)列不能有消費(fèi)者消費(fèi)消息。
這樣我們就實(shí)現(xiàn)了消息的死信轉(zhuǎn)發(fā)。下一步,只需要讓消息在這個(gè)DELAY_ARRIVAL_STAT在30分鐘后過期變死信即可。按照上文所說,有兩種方法,我們可以為隊(duì)列的消息設(shè)置30分鐘TTL,或者發(fā)送消息的時(shí)候指定消息的TTL為30分鐘即可。
示例如下:

“延遲隊(duì)列”的堵塞缺陷
由于設(shè)置了x-dead-letter-exchange的隊(duì)列本身也是普通隊(duì)列,其過期的順序是按照隊(duì)列頭部順序的過期的。也就是說,如果你隊(duì)列頭的消息A過期時(shí)間是5分鐘,后面對(duì)這個(gè)隊(duì)列發(fā)送消息B的帶著過期時(shí)間1分鐘,那么后面的隊(duì)列B要等隊(duì)列A過期了才會(huì)觸發(fā)過期:
Queues that had a per-message TTL applied to them retroactively (when they already had messages) will discard the messages when specific events occur. Only when expired messages reach the head of a queue will they actually be discarded (or dead-lettered).
所以,對(duì)于此類多延遲時(shí)間的,可以考慮設(shè)置多級(jí)延遲隊(duì)列。例如1分鐘,5分鐘,10分鐘,20分鐘這樣多級(jí)的延遲隊(duì)列,使得延遲相近的盡量放到同一個(gè)隊(duì)列中減少擁堵的最壞情況。

END
順便給大家推薦一個(gè)GitHub項(xiàng)目,這個(gè) GitHub 整理了上千本常用技術(shù)PDF,絕大部分核心的技術(shù)書籍都可以在這里找到,
GitHub地址:https://github.com/javadevbooks/books
Gitee地址:https://gitee.com/javadevbooks/books
電子書已經(jīng)更新好了,你們需要的可以自行下載了,記得點(diǎn)一個(gè)star,持續(xù)更新中..

