Redis應(yīng)用-異步消息隊列與延時隊列
異步消息隊列
說道消息隊列,你肯定會想到Kafka、Rabbitmq等消息中間件,這些專業(yè)的消息中間件提供了很多功能特性,當(dāng)然他的部署使用維護(hù)都是比較麻煩的。如果你對消息隊列沒那么高要求,想要輕量級的,使用Redis就沒錯啦。
Redis通過list數(shù)據(jù)結(jié)構(gòu)來實現(xiàn)消息隊列.主要使用到如下命令:
lpush和rpush入隊列
lpop和rpop出隊列
blpop和brpop阻塞式出隊列

廢話補(bǔ)不多說上代碼:
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
//發(fā)送消息
$redis->lPush($list, $value);
//消費(fèi)消息
while (true) {
try {
$msg = $redis->rPop($list);
if (!$msg) {
sleep(1);
}
//業(yè)務(wù)處理
} catch (Exception $e) {
echo $e->getMessage();
}
}
上面代碼會有個問題如果隊列長時間是空的,那pop就不會不斷的循環(huán),這樣會導(dǎo)致redis的QPS升高,影響性能。所以我們使用sleep來解決,當(dāng)沒有消息的時候阻塞一段時間。但其實這樣還會帶來另一個問題,就是sleep會導(dǎo)致消息的處理延遲增加。這個問題我們可以通過blpop/brpop?來阻塞讀取隊列。
blpop/brpop在隊列沒有數(shù)據(jù)的時候,會立即進(jìn)入休眠狀態(tài),一旦數(shù)據(jù)到來,則立刻醒過來。消息的延遲幾乎為零。用blpop/brpop替代前面的lpop/rpop,就完美解決了上面的問題。
還有一個需要注意的點(diǎn)是我們需要是用try/catch來進(jìn)行異常捕獲,如果一直阻塞在那里,Redis服務(wù)器一般會主動斷開掉空鏈接,來減少閑置資源的占用。

延遲隊列
你是否在做電商項目的時候會遇到如下場景:
訂單下單后超過一小時用戶未支付,需要關(guān)閉訂單
訂單的評論如果7天未評價,系統(tǒng)需要自動產(chǎn)生一條評論
這個時候我們就需要用到延時隊列了,顧名思義就是需要延遲一段時間后執(zhí)行。Redis可通過zset來實現(xiàn)。我們可以將有序集合的value設(shè)置為我們的消息任務(wù),把value的score設(shè)置為消息的到期時間,然后輪詢獲取有序集合的中的到期消息進(jìn)行處理。
實現(xiàn)代碼如下:
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
$redis->zAdd($delayQueue,$tts, $value);
while(true) {
try{
$msg = $redis->zRangeByScore($delayQueue,0,time(),0,1);
if($msg){
continue;
}
//刪除消息
$ok = $redis.zrem($delayQueue,$msg);
if($ok){
//業(yè)務(wù)處理
}
} catch(\Exception $e) {
}
}
這里又產(chǎn)生了一個問題,同一個任務(wù)可能會被多個進(jìn)程取到之后再使用 zrem 進(jìn)行爭搶,那些沒搶到的進(jìn)程都是白取了一次任務(wù),這是浪費(fèi)。解決辦法:將?zrangebyscore和zrem使用lua腳本進(jìn)行原子化操作,這樣多個進(jìn)程之間爭搶任務(wù)時就不會出現(xiàn)這種浪費(fèi)了。
