如何在 Rust 中實(shí)現(xiàn) HTTP 長輪詢
實(shí)時通信顧名思義就是盡可能快地傳播新數(shù)據(jù)。在討論實(shí)時數(shù)據(jù)時,有兩種工作負(fù)載:
低延遲、多向流:Websockets。 中等延遲、單向流:短輪詢、服務(wù)器發(fā)送事件 (SSE) 和長輪詢。
今天,我們將研究后者,因為它是你在開發(fā) Web 應(yīng)用程序時最常遇到的工作負(fù)載。
短輪詢

實(shí)時通信的第一種方法是短輪詢。
在這種情況下,客戶端向服務(wù)器發(fā)送請求,服務(wù)器立即回復(fù)。如果沒有新數(shù)據(jù),則響應(yīng)為空。而大多數(shù)時候,情況就是這樣的。所以大多數(shù)時候,服務(wù)器的響應(yīng)是空的,這本來是可以避免的。
因此,短輪詢在網(wǎng)絡(luò)傳輸和 CPU 方面都是浪費(fèi)的,因為每次都需要解析和編碼請求。
唯一的優(yōu)點(diǎn)就是簡單。
服務(wù)器發(fā)送事件 (SSE)

與 WebSockets 相反,SSE 流是單向的:只有服務(wù)器可以將數(shù)據(jù)發(fā)送回客戶端。此外,自動重新連接的機(jī)制(通常)內(nèi)置于客戶端。
缺點(diǎn)是實(shí)現(xiàn)服務(wù)器端并不容易。
長輪詢

最后是長輪詢:客戶端發(fā)出請求,并指示它擁有的最后一條數(shù)據(jù),服務(wù)器僅在有新數(shù)據(jù)可用或達(dá)到一定時間時才將響應(yīng)發(fā)送回去。
它的優(yōu)點(diǎn)是實(shí)現(xiàn)起來極其簡單,因為它不是一個流,而是一個簡單的請求—響應(yīng)方案,因此非常健壯,不需要復(fù)雜的自動重連算法,并且可以優(yōu)雅地處理網(wǎng)絡(luò)錯誤。此外,與短輪詢相反,長輪詢在資源使用方面的浪費(fèi)較少。
唯一的缺點(diǎn)是,長輪詢的延遲不如 WebSockets 好,但在大多數(shù)情況下并不重要。
長輪詢在 Rust 中非常有效:多虧有 async,使得每個打開的連接使用的資源(一個簡單的任務(wù))很少,而許多語言使用整個操作系統(tǒng)線程。
最后,由于長輪詢是簡單的 HTTP 請求,因此這種技術(shù)更有可能不被某種激進(jìn)的防火墻或網(wǎng)絡(luò)設(shè)備阻止。
Rust 中的長輪詢
我們將使用由tokio 團(tuán)隊[1]開發(fā)的新 Web 框架:axum[2]。它的性能和簡單性在 Rust 界中是無與倫比的。另外,請注意,將此代碼移植到另一個 Web 框架很容易。
我們將實(shí)現(xiàn)一個簡單的聊天服務(wù)器,因為聊天是從長輪詢中獲益最多的教科書應(yīng)用程序。
有 3 個技巧可以使這個實(shí)現(xiàn)起來更高效,可以關(guān)注一下。
聊天服務(wù)
聊天服務(wù)是一個封裝了我們所有業(yè)務(wù)邏輯的對象。為了使示例簡單,我們將只進(jìn)行數(shù)據(jù)庫調(diào)用。
這是我們的第一個技巧:為了啟用消息排序,我們不使用 UUIDv4。相反,我們使用轉(zhuǎn)換為 UUID 的 ULID[3],因此序列化/反序列化它沒有問題:Uuid = Ulid::new().into()
chat.rs
impl?ChatService?{
????pub?fn?new(db:?DB)?->?Self?{
????????ChatService?{?db?}
????}
????pub?async?fn?create_message(&self,?body:?String)?->?Result?{
????????if?body.len()?>?10_000?{
????????????return?Err(Error::InvalidArgument("Message?is?too?large".to_string()));
????????}
????????let?created_at?=?chrono::Utc::now();
????????let?id:?Uuid?=?Ulid::new().into();
????????let?query?=?"INSERT?INTO?messages
????????????(id,?created_at,?body)
????????????VALUES?($1,?$2,?$3)";
????????sqlx::query(query)
????????????.bind(id)
????????????.bind(created_at)
????????????.bind(&body)
????????????.execute(&self.db)
????????????.await?;
????????Ok(Message?{
????????????id,
????????????created_at,
????????????body,
????????})
????}
}
這是我們的第二個技巧:注意 after.unwrap_or(Uuid::nil()) 返回 “0” UUID ( 00000000-0000-0000-0000-000000000000) ?。有了 WHERE id > $1 它,我們就可以返回所有消息(如果 after 是 none)。
例如,恢復(fù)客戶端的整個狀態(tài)是很有用的。
????pub?async?fn?find_messages(&self,?after:?Option)?->?Result<Vec,?Error>?{
????????let?query?=?"SELECT?*
????????????FROM?messages
????????????WHERE?id?>?$1";
????????let?messages:?Vec?=?sqlx::query_as::<_,?Message>(query)
????????????.bind(after.unwrap_or(Uuid::nil()))
????????????.fetch_all(&self.db)
????????????.await?;
????????Ok(messages)
????}
}
網(wǎng)絡(luò)服務(wù)器
接下來,運(yùn)行 Web 服務(wù)器的樣板。
由于 .layer(AddExtensionLayer::new(ctx)),ServerContext 被注入到所有路由中,因此我們可以調(diào)用 ChatService 的方法。
struct?ServerContext?{
????chat_service:?chat::ChatService,
}
#[tokio::main]
async?fn?main()?->?Result<(),?anyhow::Error>?{
????std::env::set_var("RUST_LOG",?"rust_long_polling=info");
????env_logger::init();
????let?database_url?=?std::env::var("DATABASE_URL")
????????.map_err(|_|?Error::BadConfig("DATABASE_URL?env?var?is?missing".to_string()))?;
????let?db?=?db::connect(&database_url).await?;
????db::migrate(&db).await?;
????let?chat_service?=?chat::ChatService::new(db);
????let?ctx?=?Arc::new(ServerContext::new(chat_service));
????let?app?=?Router::new()
????????.route(
????????????"/messages",
????????????get(handler_find_messages).post(handler_create_message),
????????)
????????.or(handler_404.into_service())
????????.layer(AddExtensionLayer::new(ctx));
????log::info!("Starting?server?on?0.0.0.0:8080");
????axum::Server::bind(
????????&"0.0.0.0:8080"
????????????.parse()
????????????.expect("parsing?server's?bind?address"),
????)
????.serve(app.into_make_service())
????.await
????.expect("running?server");
????Ok(())
}
長輪詢
最后,我們的第三個技巧:長輪詢是一個簡單的循環(huán) tokio::time::sleep。
通過使用 tokio::time::sleep,活動連接在等待時幾乎不會使用任何資源。
如果找到新數(shù)據(jù),我們立即返回新數(shù)據(jù)。否則,我們再等一秒鐘。
10 秒后,我們返回空數(shù)據(jù)。
main.rs
async?fn?handler_find_messages(
????Extension(ctx):?Extension>,
????query_params:?Query,
)?->?ResultVec>,?Error>?{
????let?sleep_for?=?Duration::from_secs(1);
????//?long?polling:?10?secs
????for?_?in?0..10u64?{
????????let?messages?=?ctx.chat_service.find_messages(query_params.after).await?;
????????if?messages.len()?!=?0?{
????????????return?Ok(messages.into());
????????}
????????tokio::time::sleep(sleep_for).await;
????}
????//?return?an?empty?response
????Ok(Vec::new().into())
}
代碼在 GitHub 上
像往常一樣,你可以在 GitHub 上找到代碼:github.com/skerkour/kerkour.com[4]。
參考資料
tokio 團(tuán)隊: https://github.com/tokio-rs
[2]axum: https://github.com/tokio-rs/axum
[3]ULID: https://github.com/ulid/spec
[4]github.com/skerkour/kerkour.com: https://github.com/skerkour/kerkour.com/tree/main/2021/rust_long_polling
我是 polarisxu,北大碩士畢業(yè),曾在 360 等知名互聯(lián)網(wǎng)公司工作,10多年技術(shù)研發(fā)與架構(gòu)經(jīng)驗!2012 年接觸 Go 語言并創(chuàng)建了 Go 語言中文網(wǎng)!著有《Go語言編程之旅》、開源圖書《Go語言標(biāo)準(zhǔn)庫》等。
堅持輸出技術(shù)(包括 Go、Rust 等技術(shù))、職場心得和創(chuàng)業(yè)感悟!歡迎關(guān)注「polarisxu」一起成長!也歡迎加我微信好友交流:gopherstudio
