HiriverMySQL 數(shù)據(jù)變化的監(jiān)控與分發(fā)框架
什么是hiriver?
hiriver是純java開發(fā)的、高性能的、基于解析mysql row base binlog技術(shù)實現(xiàn)的用于監(jiān)控mysql數(shù)據(jù)變化并分發(fā)這些變化的框架。它提供了一套完整的框架,內(nèi)置數(shù)據(jù)監(jiān)控線程和數(shù)據(jù)消費線程,對外提供簡單的Consumer接口,開發(fā)者可以根據(jù)自己的業(yè)務(wù)場景自行實現(xiàn)Consumer接口,而不不必關(guān)心線程問題。
實現(xiàn)原理
hiriver實現(xiàn)了mysql主從復(fù)制協(xié)議,把自己偽裝成一個mysql的從庫,在接收到binlog后按照mysql binlog協(xié)議進(jìn)行解析,由此獲取mysql的數(shù)據(jù)變化。由于基于mysql的主從復(fù)制協(xié)議,它監(jiān)控數(shù)據(jù)變化特別快,理論上與mysql本身的主從同步一樣快,甚至更快。同時與在應(yīng)用層監(jiān)控數(shù)據(jù)變化不同,它不需要考慮事務(wù)是否成功問題。當(dāng)然,***限制***是mysql binlog的方式必須是***row***方式。
名字的由來
hiriver是hidden river的簡稱,中文名稱"暗渠",用于隱喻在數(shù)據(jù)庫的后面導(dǎo)流數(shù)據(jù),而不必要在應(yīng)用層做任何控制。
支持mysql的版本
hiriver支持mysql 5.6.9+和 mysql5.1+版本。
強烈推薦 使用5.6.9+版本,并使用binlog file name + position的方式處理同步點。
雖然5.6.9+版本提供 gtid 功能,它是用于表示事務(wù)的唯一的id,理論上,基于它可以實現(xiàn)HA功能,當(dāng)mysql出現(xiàn)故障時可以自動從一臺mysql從庫切換到另一臺,并且不會丟失或者重復(fù)數(shù)據(jù),但是 在實際的使用過程中g(shù)tid依然存在bug,并不穩(wěn)定,而且存在多個gtid時很難找到mysql認(rèn)識的初始同步點。
mysql5.6.9之前的版本,必須binlog file name和在該文件中的偏移位置作為同步點。
javadoc
使用教程
quickstart
總體說明
hiriver模塊組主要由2個組件和一個示例組成:mysql-proto、hiriver和hiriver-sample
mysql-proto實現(xiàn)了mysql的client-server協(xié)議,包括Text protocol和主從復(fù)制協(xié)議。Text protocol是從mysql正常讀取數(shù)據(jù)的協(xié)議,它是mysql jdbc驅(qū)動背后的協(xié)議。主從復(fù)制協(xié)議顧名思義就是實現(xiàn)主從之間復(fù)制數(shù)據(jù)的協(xié)議。
hiriver是基于mysql-proto組件封裝的監(jiān)聽mysql變化、記錄同步點、控制數(shù)據(jù)消費的上層應(yīng)用框架。它是hiriver業(yè)務(wù)流程的實現(xiàn)。它需要與spirng集成使用
hiriver-sample一個使用hirvier的示例
準(zhǔn)備數(shù)據(jù)庫環(huán)境
創(chuàng)建自己的mysql 5.6.28
-
開啟row base和gtid 模式(如果使用gtid作為同步點,必須開啟)
log-bin=mysql-bin binlog_format=Row log-slave-updates=ON enforce_gtid_consistency=true gtid_mode=ON
創(chuàng)建自己的復(fù)制賬號,創(chuàng)建repl database和一張表,并在表示寫入數(shù)據(jù)
快速使用-binlogname + 偏移地址模式
-
下載代碼,找到hiriver-sample模塊,它是一個基于spring的web應(yīng)用,有3 spring xml配置文件,分別是:
spring-boot.xml # spring容器描述入口文件
spring-bin.xml # binlogname + 偏移地址模式
spring-gtid.xml # gtid模式
修改示例中hiriver-sample.properties的參數(shù),修改數(shù)據(jù)庫相關(guān)屬性、初始同步點、同步點存儲路徑和表名過濾黑、白名單配置
-
初始化同步點使用channel.0000.binlog和channel.0000.binlog.pos屬性,可以通過執(zhí)行
show master status
-
修改spring-boot.xml中的最后一行為:
<import resource="classpath:spring/spring-binlog.xml"/>
使用tomcat/jetty或maven jetty插件運行示例即可
快速使用-gtid模式
-
下載代碼,找到hiriver-sample模塊,它是一個基于spring的web應(yīng)用,有3 spring xml配置文件,分別是:
spring-boot.xml # spring容器描述入口文件
spring-bin.xml # binlogname + 偏移地址模式
spring-gtid.xml # gtid模式
-
修改示例中hiriver-sample.properties的參數(shù),修改數(shù)據(jù)庫相關(guān)屬性、初始同步點、同步點存儲路徑和表名過濾黑、白名單配置,其中channel_0000.gtid參數(shù)的配置需要從mysql中查詢數(shù)獲取,執(zhí)行
show master status
8c80613e-ac5b-11e5-b170-148044d6636f:1 or 8c80613e-ac5b-11e5-b170-148044d6636f:8
修改spring-boot.xml中的最后一行為:
<import resource="classpath:spring/spring-gtid.xml"/>使用tomcat/jetty或maven jetty插件運行示例即可
詳細(xì)參數(shù)說明
底層socket控制參數(shù)(使用TransportConfig類描述)
| 參數(shù)名稱 | 說明 |
|---|---|
| connectTimeout | socket連接超時,同Socket.connect(SocketAddress endpoint,int timeout),單位ms,缺省15000 |
| soTimeout | socket讀寫超時時間,同Socket.setSoTimeout(int timeout), 單位ms,缺省15000 |
| receiveBufferSize | socket 接收緩沖區(qū)大小,同Socket. setReceiveBufferSize(int size),缺省0,0表示使用系統(tǒng)默認(rèn)緩沖區(qū)大小 |
| sendBufferSize | socket 接收緩沖區(qū)大小,同Socket.setSendBufferSize(int size),缺省0,0表示使用系統(tǒng)默認(rèn)緩沖區(qū)大小 |
| keepAlive | 是否保持長連接,同Socket.setKeepAlive(boolean on) |
| initSqlList | 在建立數(shù)據(jù)庫連接后,需要初始化執(zhí)行sql語句的列表,缺省是僅僅包含"SET AUTOCOMMIT=1" sql語句的列表,該sql在dump mysql binlog時不生效。 TransportConfig 類被dump binlog和執(zhí)行mysql數(shù)據(jù)庫讀取類共用,具體參見 重點類說明章節(jié) |
binlog讀取參數(shù)(DefaultChannelStream類)
| 參數(shù)名稱 | 說明 |
|---|---|
| faultTolerantTimeout | 當(dāng)與mysql失去連接后,線程sleep的時間,超過該時間后再進(jìn)行重連,單位ms,缺省5000 |
| fetalWaitTimeout | 當(dāng)讀取binlog數(shù)據(jù)或者解析數(shù)據(jù)過程中發(fā)生未知異常時到下次重試的間隔時間,默認(rèn)2min,單位ms |
| channelId | dump單個數(shù)據(jù)庫可以理解為是一個數(shù)據(jù)流,channelId是流的名稱,一個hiriver進(jìn)程中可以支持多個流dump多個數(shù)據(jù)庫,其channelId不能重復(fù),默認(rèn)是uuid。 當(dāng)一個場景中需要一個進(jìn)程dump多個數(shù)據(jù)庫時,比如在分庫應(yīng)用中,建議使用channel.0000.id格式命名,其中0000是分庫場景中數(shù)據(jù)庫的編號。 |
| channelBuffer | 用于緩存從數(shù)據(jù)庫dump出數(shù)據(jù),DefaultChannelStream 由兩個線程組成,一個是provider線程,負(fù)責(zé)從mysql dump數(shù)據(jù),另一個是Consumer線程,負(fù)責(zé)消費、使用數(shù)據(jù),使用channelBuffer 進(jìn)行數(shù)據(jù)傳遞,既可以解耦,又可以提高性能,channelBuffer 不能設(shè)置的無限大,需要使用DefaultChannelBuffer.limit屬性控制大小 |
| channel.buffer.limit | channelBuffer 的大小,對應(yīng)DefaultChannelBuffer.limit屬性,默認(rèn)5000 |
| configBinlogPos | 初始同步點,使用BinlogPosition接口描述。支持binlog file name+pos和gtid方式,分別對應(yīng)于BinlogFileBinlogPosition和GTidBinlogPosition |
| binlogPositionStore | 同步點存儲,使用BinlogPositionStore接口描述,默認(rèn)FileBinlogPositionStore實現(xiàn),可以自由擴展 |
| position.store.path | 同步點的存儲路徑,適用與FileBinlogPositionStore 實現(xiàn),對應(yīng)FileBinlogPositionStore.filePath屬性 |
| transactionRecognizer | 事務(wù)開始、結(jié)束識別器,使用TransactionRecognizer描述,針對binlog file name+pos和gtid模式提供BinlogNameAndPosTransactionRecognizer和GTIDTransactionRecognizer實現(xiàn) |
| streamSource | 需要dump數(shù)據(jù)的數(shù)據(jù)源描述,使用StreamSource接口描述,MysqlStreamSource是針對mysql數(shù)據(jù)的實現(xiàn),HAStreamSource是在MysqlStreamSource 之上的封裝,它持有多個MysqlStreamSource 對象,當(dāng)一個發(fā)生故障時,它可以自動切換到其他MysqlStreamSource 上,在gtid模式下推薦使用HAStreamSource,這時一般適用于從從庫dump數(shù)據(jù)。 |
| slaveHostUrl | 從數(shù)據(jù)庫ip:port,對應(yīng)MysqlStreamSource.hostUrl屬性,適用于使用HAStreamSource時 |
| table.white | 根據(jù)表名過濾時的白名單配置,支持正則,參見BlackWhiteNameListTableFilter |
| table.black | 根據(jù)表名過濾時的黑名單配置,支持正則,參見BlackWhiteNameListTableFilter |
數(shù)據(jù)庫配置
| 參數(shù)名稱 | 說明 |
|---|---|
| user_name | 用戶名稱,對應(yīng)MysqlStreamSource.userName屬性 |
| password | 密碼,對應(yīng)MysqlStreamSource.password屬性 |
| hostUrl | 數(shù)據(jù)庫ip:port,對應(yīng)MysqlStreamSource.hostUrl屬性 |
重點類說明
底層通信類
binlog dump類(BinlogStreamBlockingTransportImpl)
實現(xiàn)mysql binlog dump協(xié)議,負(fù)責(zé)與mysql建立socket連接,完成用戶名密碼驗證后,執(zhí)行數(shù)據(jù)dump命令,并持續(xù)的讀取、解析mysql binlog event數(shù)據(jù)。
數(shù)據(jù)庫數(shù)據(jù)讀取類(TextProtocolBlockingTransportImpl)
mysql文本協(xié)議的實現(xiàn),mysql文本協(xié)議即jdbc背后的協(xié)議,主要用于執(zhí)行sql讀取數(shù)據(jù),也可以執(zhí)行一些其他的命令,比如讀取表定義的元數(shù)據(jù)等,之所以不使用mysql jdbc是由于兩個原因:一是不想引入一個第三方包,降低依賴性;二是mysql的文本協(xié)議支持更多指令,比如COM_FIELD_LIST指令方便的獲取到表字段是否為空、是否是索引字段等信息,而jdbc是個通用的api,并沒有暴露這些指令實現(xiàn)。
表名過濾類 (BlackWhiteNameListTableFilter)
支持黑白名單的過濾實現(xiàn)。 按照表名進(jìn)行過濾時,表名格式為database.table(可以為正則),以逗號分隔.
當(dāng)白名單和黑名單同時存在時,只有不在黑名單中同時在白名單中存在的才起作用.
e.g,在properties文件中描述
白名單:filert_white=test.account,test.user_sharding*
白名單:filert_black=test.*bak
binlog row event數(shù)據(jù)描述類(BinlogDataSet)
binlog數(shù)據(jù)是二進(jìn)制數(shù)據(jù),它遵循mysql rowbase binlog協(xié)議,在協(xié)議內(nèi)部event作為一個基本單位用于描述數(shù)據(jù)庫的變更,這里的“變更”不僅僅是數(shù)據(jù)的修改,也可能是事務(wù)的開啟、結(jié)束,表的變更等,在hiriver里我們僅僅關(guān)注表數(shù)據(jù)的變更,BinlogDataSet用于描述一條或多條數(shù)據(jù)的變化,類似于jdbc的RowSet。BinlogDataSet 包括:
channelId
sourceHostUrl,該數(shù)據(jù)來自哪個數(shù)據(jù)庫
gtId, 該數(shù)據(jù)所在的事務(wù)的gtid,在不支持gtid模式下,為null
binlogPos, 當(dāng)前數(shù)據(jù)所在事務(wù)的binlogfile + pos,無論哪種模式,一定補位null
isStartTransEvent, 當(dāng)前是否一個事務(wù)的開啟
isPositionStoreTrigger,當(dāng)前是否一個事務(wù)的結(jié)束,當(dāng)時true時需要記錄同步點。
rowDataMap, 行數(shù)據(jù),每一行使用BinlogResultRow描述
columnDefMap, 類定義描述
BinlogResultRow內(nèi)部是有二個列表,一個記錄變更之前的數(shù)據(jù),另一個記錄變更之后的數(shù)據(jù)。
數(shù)據(jù)消費類 (Consumer)
描述消費BinlogDataSet數(shù)據(jù)的接口,這個留給業(yè)務(wù)實現(xiàn)方來實現(xiàn)。
binlog流(DefaultChannelStream)
mysql binlog dump被抽象成一個流,每一個流僅僅針對一個mysql實例,這個流稱之為ChannelStream, ChannelStream負(fù)責(zé)源源不斷的從mysql實例讀取數(shù)據(jù)并過濾、解析和消費。
DefaultChannelStream是ChannelStream的缺省實現(xiàn),在內(nèi)部它開啟了2條線程:provider和consumer線程,provider線程負(fù)責(zé)從數(shù)據(jù)庫讀取數(shù)據(jù),識別事務(wù)、根據(jù)表名過濾、解析成BinlogDataSet并放入ChannelBuffer;consumer線程負(fù)責(zé)從ChannelBuffer讀取數(shù)據(jù)并調(diào)用Consumer進(jìn)行數(shù)據(jù)消費。
當(dāng)provider線程產(chǎn)生數(shù)據(jù)的速度大于consumer線程消費數(shù)據(jù)的速度時,數(shù)據(jù)會被積壓在ChannelBuffer中,為了防止內(nèi)存被打爆,ChannelBuffer需要實現(xiàn)成有界的,當(dāng)ChannelBuffer達(dá)到上限時會阻塞provider線程產(chǎn)生新數(shù)據(jù)。
數(shù)據(jù)緩存類 (DefaultChannelBuffer)
ChannelStream中provider和consumer線程的數(shù)據(jù)通信基礎(chǔ),它是ChannelBuffer的缺省實現(xiàn)。謹(jǐn)記,需要配置上限。
事務(wù)識別類(TransactionRecognizer)
用于識別事務(wù)的開啟、結(jié)束,并且記錄當(dāng)前事務(wù)的開始位置。針對gtid和binlog file name + pos兩種模式,提供2種實現(xiàn):GTIDTransactionRecognizer和BinlogNameAndPosTransactionRecognizer。
