如何保證消息隊(duì)列里的數(shù)據(jù)順序執(zhí)行?
使用MQ的時(shí)候,經(jīng)常會(huì)有按順序消費(fèi)的需求,比如大數(shù)據(jù)團(tuán)隊(duì)為了做數(shù)據(jù)分析,會(huì)把數(shù)據(jù)庫(kù)里數(shù)據(jù)同步到其他系統(tǒng)做一些數(shù)據(jù)統(tǒng)計(jì)分析。同步MySQL的時(shí)候,為了保證數(shù)據(jù)同步的實(shí)時(shí)性,會(huì)在中間加一個(gè)MQ,多個(gè)線程來(lái)消費(fèi)MQ里的數(shù)據(jù)。
這種同步一般是讀取binlog數(shù)據(jù),你在MySQL里增改刪了數(shù)據(jù),對(duì)應(yīng)出來(lái)就是3條增改刪binlog日志發(fā)送到MQ里面,消費(fèi)的時(shí)候肯定必須要按照增改刪的順序執(zhí)行。如果你換成刪除、修改、增加,就導(dǎo)致數(shù)據(jù)亂套了。

圖1?binlog同步
我們以kafka舉例,看下哪些環(huán)節(jié)會(huì)出現(xiàn)數(shù)據(jù)順序不一致情況,又怎么解決。
假設(shè)kafka分配了3個(gè)partition,kafka的一個(gè)特性就是,能保證寫(xiě)入一個(gè)partition中的數(shù)據(jù)一定是有順序的。
生產(chǎn)者寫(xiě)的時(shí)候,可以指定一個(gè)key,比如是訂單id作為key,這個(gè)id對(duì)應(yīng)的數(shù)據(jù)一定會(huì)寫(xiě)到同一個(gè)partition中去,而且這個(gè)partition中的數(shù)據(jù)都是有順序的。

圖2?kafka partition
kafka的消費(fèi)者開(kāi)始消費(fèi)partition中的數(shù)據(jù),一個(gè)消費(fèi)消費(fèi)一個(gè)partition,一個(gè)partition只能被一個(gè)消費(fèi)者消費(fèi),不會(huì)出現(xiàn)一個(gè)消費(fèi)者同時(shí)消費(fèi)多個(gè)partition的情況。假如現(xiàn)在有3個(gè)partition,你啟動(dòng)4個(gè)消費(fèi)者,那么就會(huì)有一個(gè)消費(fèi)者消費(fèi)不到數(shù)據(jù)。

圖3 一個(gè)消費(fèi)者消費(fèi)一個(gè)partition
到目前為止,每個(gè)消費(fèi)者消費(fèi)到的數(shù)據(jù)都是有順序性的。但消費(fèi)者內(nèi)部如果是單線程的,效率就會(huì)比較低,如果生產(chǎn)者寫(xiě)入kafka的數(shù)據(jù)量比較大,消費(fèi)不及時(shí),就會(huì)出現(xiàn)消息堆積的情況,所以消費(fèi)者需要多線程的方式運(yùn)行。
假如消費(fèi)者里啟動(dòng)了3個(gè)線程,并發(fā)的來(lái)消費(fèi)數(shù)據(jù),線程之間如果不做同步控制,還是會(huì)導(dǎo)致數(shù)據(jù)亂掉。

圖4 消費(fèi)者多線程消費(fèi)MQ
那如何保證kafka消費(fèi)者多線程按順序消費(fèi)數(shù)據(jù)呢?
多個(gè)線程不能直接拿數(shù)據(jù)去處理,此時(shí)我們可以在同步系統(tǒng)中搞多個(gè)內(nèi)存隊(duì)列,消費(fèi)者拿到數(shù)據(jù)之后,根據(jù)每條數(shù)據(jù)的key做hash取模,把相同id的數(shù)據(jù)分配到同一個(gè)內(nèi)存隊(duì)列中去。
每個(gè)內(nèi)存隊(duì)列里的數(shù)據(jù)都是有順序性的,給每個(gè)內(nèi)存隊(duì)列都對(duì)應(yīng)一個(gè)線程,去消費(fèi)內(nèi)存隊(duì)列中的數(shù)據(jù)。

假如有3條增改刪的數(shù)據(jù),都是對(duì)同一個(gè)id的處理,那么hash取模后就會(huì)寫(xiě)入到同一個(gè)內(nèi)存隊(duì)列里去,由同一個(gè)線程去消費(fèi),然后按順序?qū)懭霐?shù)據(jù)庫(kù)中。
如果消費(fèi)者按照單線程消費(fèi)處理,一條數(shù)據(jù)耗費(fèi)幾十毫秒,1秒鐘只能處理十幾條數(shù)據(jù),吞吐量就會(huì)非常低。如果開(kāi)啟多線程的方式處理,就會(huì)幾倍的提高吞吐量,同時(shí)也保證了數(shù)據(jù)的順序性。
整個(gè)流程按這樣的設(shè)計(jì)方案來(lái)處理,就可以保證數(shù)據(jù)的順序性。
有道無(wú)術(shù),術(shù)可成;有術(shù)無(wú)道,止于術(shù)
歡迎大家關(guān)注Java之道公眾號(hào)
好文章,我在看??
