社區(qū)精選 |我有 7種 實(shí)現(xiàn)web實(shí)時(shí)消息推送的方案,7種!
今天小編為大家?guī)淼氖巧鐓^(qū)作者 程序員小富 的文章,在這篇文章中看看他如何實(shí)現(xiàn) 7 種web實(shí)時(shí)消息推送的方案。
大家好,我是小富~
我有一個(gè)朋友~
做了一個(gè)小破站,現(xiàn)在要實(shí)現(xiàn)一個(gè)站內(nèi)信web消息推送的功能,對(duì),就是下圖這個(gè)小紅點(diǎn),一個(gè)很常用的功能。

不過他還沒想好用什么方式做,這里我?guī)退砹艘幌聨追N方案,并簡(jiǎn)單做了實(shí)現(xiàn)。

案例下載:https://github.com/chengxy-nds/Springboot-Notebook/tree/master/springboot-realtime-data 記得Star 哦
什么是消息推送(push)
推送的場(chǎng)景比較多,比如有人關(guān)注我的公眾號(hào),這時(shí)我就會(huì)收到一條推送消息,以此來吸引我點(diǎn)擊打開應(yīng)用。
消息推送(push)通常是指網(wǎng)站的運(yùn)營工作等人員,通過某種工具對(duì)用戶當(dāng)前網(wǎng)頁或移動(dòng)設(shè)備APP進(jìn)行的主動(dòng)消息推送。
消息推送一般又分為web端消息推送和移動(dòng)端消息推送。

上邊的這種屬于移動(dòng)端消息推送,web端消息推送常見的諸如站內(nèi)信、未讀郵件數(shù)量、監(jiān)控報(bào)警數(shù)量等,應(yīng)用的也非常廣泛。

在具體實(shí)現(xiàn)之前,咱們?cè)賮矸治鲆幌虑斑叺男枨?,其?shí)功能很簡(jiǎn)單,只要觸發(fā)某個(gè)事件(主動(dòng)分享了資源或者后臺(tái)主動(dòng)推送消息),web頁面的通知小紅點(diǎn)就會(huì)實(shí)時(shí)的+1就可以了。
通常在服務(wù)端會(huì)有若干張消息推送表,用來記錄用戶觸發(fā)不同事件所推送不同類型的消息,前端主動(dòng)查詢(拉)或者被動(dòng)接收(推)用戶所有未讀的消息數(shù)。

消息推送無非是推(push)和拉(pull)兩種形式,下邊我們逐個(gè)了解下。
短輪詢
輪詢(polling)應(yīng)該是實(shí)現(xiàn)消息推送方案中最簡(jiǎn)單的一種,這里我們暫且將輪詢分為短輪詢和長(zhǎng)輪詢。
短輪詢很好理解,指定的時(shí)間間隔,由瀏覽器向服務(wù)器發(fā)出HTTP請(qǐng)求,服務(wù)器實(shí)時(shí)返回未讀消息數(shù)據(jù)給客戶端,瀏覽器再做渲染顯示。
一個(gè)簡(jiǎn)單的JS定時(shí)器就可以搞定,每秒鐘請(qǐng)求一次未讀消息數(shù)接口,返回的數(shù)據(jù)展示即可。
setInterval(() => {
// 方法請(qǐng)求
messageCount().then((res) => {
if (res.code === 200) {
this.messageCount = res.data
}
})
}, 1000);
效果還是可以的,短輪詢實(shí)現(xiàn)固然簡(jiǎn)單,缺點(diǎn)也是顯而易見,由于推送數(shù)據(jù)并不會(huì)頻繁變更,無論后端此時(shí)是否有新的消息產(chǎn)生,客戶端都會(huì)進(jìn)行請(qǐng)求,勢(shì)必會(huì)對(duì)服務(wù)端造成很大壓力,浪費(fèi)帶寬和服務(wù)器資源。

長(zhǎng)輪詢
長(zhǎng)輪詢是對(duì)上邊短輪詢的一種改進(jìn)版本,在盡可能減少對(duì)服務(wù)器資源浪費(fèi)的同時(shí),保證消息的相對(duì)實(shí)時(shí)性。長(zhǎng)輪詢?cè)谥虚g件中應(yīng)用的很廣泛,比如Nacos和apollo配置中心,消息隊(duì)列kafka、RocketMQ中都有用到長(zhǎng)輪詢。
Nacos配置中心交互模型是push還是pull?一文中我詳細(xì)介紹過Nacos長(zhǎng)輪詢的實(shí)現(xiàn)原理,感興趣的小伙伴可以瞅瞅。
文章鏈接:https://mp.weixin.qq.com/s/94ftESkDoZI9gAGflLiGwg
這次我使用apollo配置中心實(shí)現(xiàn)長(zhǎng)輪詢的方式,應(yīng)用了一個(gè)類DeferredResult,它是在servelet3.0后經(jīng)過Spring封裝提供的一種異步請(qǐng)求機(jī)制,直意就是延遲結(jié)果。

DeferredResult可以允許容器線程快速釋放占用的資源,不阻塞請(qǐng)求線程,以此接受更多的請(qǐng)求提升系統(tǒng)的吞吐量,然后啟動(dòng)異步工作線程處理真正的業(yè)務(wù)邏輯,處理完成調(diào)用DeferredResult.setResult(200)提交響應(yīng)結(jié)果。
下邊我們用長(zhǎng)輪詢來實(shí)現(xiàn)消息推送。
因?yàn)橐粋€(gè)ID可能會(huì)被多個(gè)長(zhǎng)輪詢請(qǐng)求監(jiān)聽,所以我采用了guava包提供的Multimap結(jié)構(gòu)存放長(zhǎng)輪詢,一個(gè)key可以對(duì)應(yīng)多個(gè)value。一旦監(jiān)聽到key發(fā)生變化,對(duì)應(yīng)的所有長(zhǎng)輪詢都會(huì)響應(yīng)。前端得到非請(qǐng)求超時(shí)的狀態(tài)碼,知曉數(shù)據(jù)變更,主動(dòng)查詢未讀消息數(shù)接口,更新頁面數(shù)據(jù)。
@Controller
@RequestMapping("/polling")
public class PollingController {
// 存放監(jiān)聽某個(gè)Id的長(zhǎng)輪詢集合
// 線程同步結(jié)構(gòu)
public static Multimap<String, DeferredResult<String>> watchRequests = Multimaps.synchronizedMultimap(HashMultimap.create());
/**
* 公眾號(hào):程序員小富
* 設(shè)置監(jiān)聽
*/
@GetMapping(path = "watch/{id}")
@ResponseBody
public DeferredResult<String> watch(@PathVariable String id) {
// 延遲對(duì)象設(shè)置超時(shí)時(shí)間
DeferredResult<String> deferredResult = new DeferredResult<>(TIME_OUT);
// 異步請(qǐng)求完成時(shí)移除 key,防止內(nèi)存溢出
deferredResult.onCompletion(() -> {
watchRequests.remove(id, deferredResult);
});
// 注冊(cè)長(zhǎng)輪詢請(qǐng)求
watchRequests.put(id, deferredResult);
return deferredResult;
}
/**
* 公眾號(hào):程序員小富
* 變更數(shù)據(jù)
*/
@GetMapping(path = "publish/{id}")
@ResponseBody
public String publish(@PathVariable String id) {
// 數(shù)據(jù)變更 取出監(jiān)聽ID的所有長(zhǎng)輪詢請(qǐng)求,并一一響應(yīng)處理
if (watchRequests.containsKey(id)) {
Collection<DeferredResult<String>> deferredResults = watchRequests.get(id);
for (DeferredResult<String> deferredResult : deferredResults) {
deferredResult.setResult("我更新了" + new Date());
}
}
return "success";
}
當(dāng)請(qǐng)求超過設(shè)置的超時(shí)時(shí)間,會(huì)拋出AsyncRequestTimeoutException異常,這里直接用@ControllerAdvice全局捕獲統(tǒng)一返回即可,前端獲取約定好的狀態(tài)碼后再次發(fā)起長(zhǎng)輪詢請(qǐng)求,如此往復(fù)調(diào)用。
@ControllerAdvice
public class AsyncRequestTimeoutHandler {
@ResponseStatus(HttpStatus.NOT_MODIFIED)
@ResponseBody
@ExceptionHandler(AsyncRequestTimeoutException.class)
public String asyncRequestTimeoutHandler(AsyncRequestTimeoutException e) {
System.out.println("異步請(qǐng)求超時(shí)");
return "304";
}
}
我們來測(cè)試一下,首先頁面發(fā)起長(zhǎng)輪詢請(qǐng)求/polling/watch/10086監(jiān)聽消息更變,請(qǐng)求被掛起,不變更數(shù)據(jù)直至超時(shí),再次發(fā)起了長(zhǎng)輪詢請(qǐng)求;緊接著手動(dòng)變更數(shù)據(jù)/polling/publish/10086,長(zhǎng)輪詢得到響應(yīng),前端處理業(yè)務(wù)邏輯完成后再次發(fā)起請(qǐng)求,如此循環(huán)往復(fù)。

長(zhǎng)輪詢相比于短輪詢?cè)谛阅苌咸嵘撕芏?,但依然?huì)產(chǎn)生較多的請(qǐng)求,這是它的一點(diǎn)不完美的地方。
iframe流
iframe流就是在頁面中插入一個(gè)隱藏的<iframe>標(biāo)簽,通過在src中請(qǐng)求消息數(shù)量API接口,由此在服務(wù)端和客戶端之間創(chuàng)建一條長(zhǎng)連接,服務(wù)端持續(xù)向iframe傳輸數(shù)據(jù)。
傳輸?shù)臄?shù)據(jù)通常是HTML、或是內(nèi)嵌的javascript腳本,來達(dá)到實(shí)時(shí)更新頁面的效果。

這種方式實(shí)現(xiàn)簡(jiǎn)單,前端只要一個(gè)<iframe>標(biāo)簽搞定了。
<iframe src="/iframe/message" style="display:none"></iframe>
服務(wù)端直接組裝html、js腳本數(shù)據(jù)向response寫入就行了。
@Controller
@RequestMapping("/iframe")
public class IframeController {
@GetMapping(path = "message")
public void message(HttpServletResponse response) throws IOException, InterruptedException {
while (true) {
response.setHeader("Pragma", "no-cache");
response.setDateHeader("Expires", 0);
response.setHeader("Cache-Control", "no-cache,no-store");
response.setStatus(HttpServletResponse.SC_OK);
response.getWriter().print(" <script type=\"text/javascript\">\n" +
"parent.document.getElementById('clock').innerHTML = \"" + count.get() + "\";" +
"parent.document.getElementById('count').innerHTML = \"" + count.get() + "\";" +
"</script>");
}
}
}
但我個(gè)人不推薦,因?yàn)樗跒g覽器上會(huì)顯示請(qǐng)求未加載完,圖標(biāo)會(huì)不停旋轉(zhuǎn),簡(jiǎn)直是強(qiáng)迫癥殺手。

SSE (我的方式)
很多人可能不知道,服務(wù)端向客戶端推送消息,其實(shí)除了可以用WebSocket這種耳熟能詳?shù)臋C(jī)制外,還有一種服務(wù)器發(fā)送事件(Server-sent events),簡(jiǎn)稱SSE。
SSE它是基于HTTP協(xié)議的,我們知道一般意義上的HTTP協(xié)議是無法做到服務(wù)端主動(dòng)向客戶端推送消息的,但SSE是個(gè)例外,它變換了一種思路。

SSE在服務(wù)器和客戶端之間打開一個(gè)單向通道,服務(wù)端響應(yīng)的不再是一次性的數(shù)據(jù)包而是text/event-stream類型的數(shù)據(jù)流信息,在有數(shù)據(jù)變更時(shí)從服務(wù)器流式傳輸?shù)娇蛻舳恕?/span>
整體的實(shí)現(xiàn)思路有點(diǎn)類似于在線視頻播放,視頻流會(huì)連續(xù)不斷的推送到瀏覽器,你也可以理解成,客戶端在完成一次用時(shí)很長(zhǎng)(網(wǎng)絡(luò)不暢)的下載。

SSE與WebSocket作用相似,都可以建立服務(wù)端與瀏覽器之間的通信,實(shí)現(xiàn)服務(wù)端向客戶端推送消息,但還是有些許不同:
SSE 是基于HTTP協(xié)議的,它們不需要特殊的協(xié)議或服務(wù)器實(shí)現(xiàn)即可工作;WebSocket需單獨(dú)服務(wù)器來處理協(xié)議。 SSE 單向通信,只能由服務(wù)端向客戶端單向通信;webSocket全雙工通信,即通信的雙方可以同時(shí)發(fā)送和接受信息。 SSE 實(shí)現(xiàn)簡(jiǎn)單開發(fā)成本低,無需引入其他組件;WebSocket傳輸數(shù)據(jù)需做二次解析,開發(fā)門檻高一些。 SSE 默認(rèn)支持?jǐn)嗑€重連;WebSocket則需要自己實(shí)現(xiàn)。 SSE 只能傳送文本消息,二進(jìn)制數(shù)據(jù)需要經(jīng)過編碼后傳送;WebSocket默認(rèn)支持傳送二進(jìn)制數(shù)據(jù)。
<script>
let source = null;
let userId = 7777
if (window.EventSource) {
// 建立連接
source = new EventSource('http://localhost:7777/sse/sub/'+userId);
setMessageInnerHTML("連接用戶=" + userId);
/**
* 連接一旦建立,就會(huì)觸發(fā)open事件
* 另一種寫法:source.onopen = function (event) {}
*/
source.addEventListener('open', function (e) {
setMessageInnerHTML("建立連接。。。");
}, false);
/**
* 客戶端收到服務(wù)器發(fā)來的數(shù)據(jù)
* 另一種寫法:source.onmessage = function (event) {}
*/
source.addEventListener('message', function (e) {
setMessageInnerHTML(e.data);
});
} else {
setMessageInnerHTML("你的瀏覽器不支持SSE");
}
</script>
private static Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();
/**
* 創(chuàng)建連接
*
* @date: 2022/7/12 14:51
* @auther: 公眾號(hào):程序員小富
*/
public static SseEmitter connect(String userId) {
try {
// 設(shè)置超時(shí)時(shí)間,0表示不過期。默認(rèn)30秒
SseEmitter sseEmitter = new SseEmitter(0L);
// 注冊(cè)回調(diào)
sseEmitter.onCompletion(completionCallBack(userId));
sseEmitter.onError(errorCallBack(userId));
sseEmitter.onTimeout(timeoutCallBack(userId));
sseEmitterMap.put(userId, sseEmitter);
count.getAndIncrement();
return sseEmitter;
} catch (Exception e) {
log.info("創(chuàng)建新的sse連接異常,當(dāng)前用戶:{}", userId);
}
return null;
}
/**
* 給指定用戶發(fā)送消息
*
* @date: 2022/7/12 14:51
* @auther: 公眾號(hào):程序員小富
*/
public static void sendMessage(String userId, String message) {
if (sseEmitterMap.containsKey(userId)) {
try {
sseEmitterMap.get(userId).send(message);
} catch (IOException e) {
log.error("用戶[{}]推送異常:{}", userId, e.getMessage());
removeUser(userId);
}
}
}


MQTT

首先HTTP協(xié)議它是一種同步協(xié)議,客戶端請(qǐng)求后需要等待服務(wù)器的響應(yīng)。而在物聯(lián)網(wǎng)(IOT)環(huán)境中,設(shè)備會(huì)很受制于環(huán)境的影響,比如帶寬低、網(wǎng)絡(luò)延遲高、網(wǎng)絡(luò)通信不穩(wěn)定等,顯然異步消息協(xié)議更為適合IOT應(yīng)用程序。 HTTP是單向的,如果要獲取消息客戶端必須發(fā)起連接,而在物聯(lián)網(wǎng)(IOT)應(yīng)用程序中,設(shè)備或傳感器往往都是客戶端,這意味著它們無法被動(dòng)地接收來自網(wǎng)絡(luò)的命令。 通常需要將一條命令或者消息,發(fā)送到網(wǎng)絡(luò)上的所有設(shè)備上。HTTP要實(shí)現(xiàn)這樣的功能不但很困難,而且成本極高。
Websocket
<!-- 引入websocket -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
@Component
@Slf4j
@ServerEndpoint("/websocket/{userId}")
public class WebSocketServer {
//與某個(gè)客戶端的連接會(huì)話,需要通過它來給客戶端發(fā)送數(shù)據(jù)
private Session session;
private static final CopyOnWriteArraySet<WebSocketServer> webSockets = new CopyOnWriteArraySet<>();
// 用來存在線連接數(shù)
private static final Map<String, Session> sessionPool = new HashMap<String, Session>();
/**
* 公眾號(hào):程序員小富
* 鏈接成功調(diào)用的方法
*/
@OnOpen
public void onOpen(Session session, @PathParam(value = "userId") String userId) {
try {
this.session = session;
webSockets.add(this);
sessionPool.put(userId, session);
log.info("websocket消息: 有新的連接,總數(shù)為:" + webSockets.size());
} catch (Exception e) {
}
}
/**
* 公眾號(hào):程序員小富
* 收到客戶端消息后調(diào)用的方法
*/
@OnMessage
public void onMessage(String message) {
log.info("websocket消息: 收到客戶端消息:" + message);
}
/**
* 公眾號(hào):程序員小富
* 此為單點(diǎn)消息
*/
public void sendOneMessage(String userId, String message) {
Session session = sessionPool.get(userId);
if (session != null && session.isOpen()) {
try {
log.info("websocket消: 單點(diǎn)消息:" + message);
session.getAsyncRemote().sendText(message);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
<script>
var ws = new WebSocket('ws://localhost:7777/webSocket/10086');
// 獲取連接狀態(tài)
console.log('ws連接狀態(tài):' + ws.readyState);
//監(jiān)聽是否連接成功
ws.onopen = function () {
console.log('ws連接狀態(tài):' + ws.readyState);
//連接成功則發(fā)送一個(gè)數(shù)據(jù)
ws.send('test1');
}
// 接聽服務(wù)器發(fā)回的信息并處理展示
ws.onmessage = function (data) {
console.log('接收到來自服務(wù)器的消息:');
console.log(data);
//完成通信后關(guān)閉WebSocket連接
ws.close();
}
// 監(jiān)聽連接關(guān)閉事件
ws.onclose = function () {
// 監(jiān)聽整個(gè)過程中websocket的狀態(tài)
console.log('ws連接狀態(tài):' + ws.readyState);
}
// 監(jiān)聽并處理error事件
ws.onerror = function (error) {
console.log(error);
}
function sendMessage() {
var content = $("#message").val();
$.ajax({
url: '/socket/publish?userId=10086&message=' + content,
type: 'GET',
data: { "id": "7777", "content": content },
success: function (data) {
console.log(data)
}
})
}
</script>


自定義推送

Github地址
傳送門:https://github.com/chengxy-nds/Springboot-Notebook/tree/master/springboot-realtime-data
SegmentFault 思否社區(qū)小編說
自 2022-07-01 起 SegmentFault 思否公眾號(hào)改版啦!之后將陸續(xù)推出新的欄目和大家見面?。ㄕ?qǐng)拭目以待呀~?)
在「社區(qū)精選」欄目中,我們將為廣大開發(fā)者推薦來自 SegmentFault 思否開發(fā)者社區(qū)的優(yōu)質(zhì)技術(shù)文章,這些文章全部出自社區(qū)中充滿智慧的技術(shù)創(chuàng)作者哦!
希望通過這一欄目,大家可以共同學(xué)習(xí)技術(shù)干貨,GET 新技能和各種花式技術(shù)小 Tips。
歡迎越來越多的開發(fā)者加入創(chuàng)作者的行列,我們將持續(xù)甄選出社區(qū)中優(yōu)質(zhì)的內(nèi)容推介給更多人,讓閃閃發(fā)光的技術(shù)創(chuàng)作者們走到聚光燈下,被更多人認(rèn)識(shí)。
「社區(qū)精選」投稿郵箱:[email protected]
投稿請(qǐng)附上社區(qū)文章地址

