Java代碼中,如何監(jiān)控Mysql的binlog?
binlog的主從復制了,但是放在我們的場景中,還有幾個問題:第一,并不是需要復制所有表的數(shù)據(jù),復制對象只有少量的幾張表
第二,也是比較麻煩的,兩個業(yè)務系統(tǒng)數(shù)據(jù)庫表結(jié)構(gòu)可能不一致。例如,要同步數(shù)據(jù)庫1的A表中的某些字段到數(shù)據(jù)庫2的B表中,在這一過程中,A表和B表的字段并不是完全相同
binlog,因此我們就需要在代碼中對binlog進行一下監(jiān)控。mysql-binlog-connector-java,用來監(jiān)控binlog變化并獲取數(shù)據(jù),獲取數(shù)據(jù)后再手動插入到另一個庫的表中,基于它來實現(xiàn)了數(shù)據(jù)的同步。這個工具的git項目地址如下:https://github.com/shyiko/mysql-binlog-connector-java
mysql的binlog,binlog是一個二進制文件,它保存在磁盤中,是用來記錄數(shù)據(jù)庫表結(jié)構(gòu)變更、表數(shù)據(jù)修改的二進制日志。其實除了數(shù)據(jù)復制外,它還可以實現(xiàn)數(shù)據(jù)恢復、增量備份等功能。mysql服務已經(jīng)啟用了binlog:show variables like 'log_bin';
OFF,表示沒有啟用,那么需要首先啟用binlog,修改配置文件:log_bin=mysql-bin
binlog-format=ROW
server-id=1
在配置文件中加入了
log_bin配置項后,表示啟用了binlogbinlog-format是binlog的日志格式,支持三種類型,分別是STATEMENT、ROW、MIXED,我們在這里使用ROW模式server-id用于標識一個sql語句是從哪一個server寫入的,這里一定要進行設置,否則我們在后面的代碼中會無法正常監(jiān)聽到事件
mysql服務。再次查看是否啟用binlog,返回為ON,表示已經(jīng)開啟成功。maven坐標:<dependency>
<groupId>com.github.shyiko</groupId>
<artifactId>mysql-binlog-connector-java</artifactId>
<version>0.21.0</version>
</dependency>
public static void main(String[] args) {
BinaryLogClient client = new BinaryLogClient("127.0.0.1", 3306, "hydra", "123456");
client.setServerId(2);
client.registerEventListener(event -> {
EventData data = event.getData();
if (data instanceof TableMapEventData) {
System.out.println("Table:");
TableMapEventData tableMapEventData = (TableMapEventData) data;
System.out.println(tableMapEventData.getTableId()+": ["+tableMapEventData.getDatabase() + "-" + tableMapEventData.getTable()+"]");
}
if (data instanceof UpdateRowsEventData) {
System.out.println("Update:");
System.out.println(data.toString());
} else if (data instanceof WriteRowsEventData) {
System.out.println("Insert:");
System.out.println(data.toString());
} else if (data instanceof DeleteRowsEventData) {
System.out.println("Delete:");
System.out.println(data.toString());
}
});
try {
client.connect();
} catch (IOException e) {
e.printStackTrace();
}
}
BinaryLogClient客戶端對象,初始化時需要傳入mysql的連接信息,創(chuàng)建完成后,給客戶端注冊一個監(jiān)聽器,來實現(xiàn)它對binlog的監(jiān)聽和解析。在監(jiān)聽器中,我們暫時只對4種類型的事件數(shù)據(jù)進行了處理,除了WriteRowsEventData、DeleteRowsEventData、UpdateRowsEventData對應增刪改操作類型的事件數(shù)據(jù)外,還有一個TableMapEventData類型的數(shù)據(jù),包含了表的對應關(guān)系,在后面的例子中再具體說明。DML語句和DDL語句,所以我們只需要處理我們關(guān)心的事件數(shù)據(jù)就行,否則會收到大量的冗余數(shù)據(jù)。com.github.shyiko.mysql.binlog.BinaryLogClient openChannelToBinaryLogStream
信息: Connected to 127.0.0.1:3306 at mysql-bin.000002/1046 (sid:2, cid:10)
binlog成功,接下來,我們在數(shù)據(jù)庫中插入一條數(shù)據(jù),這里操作的數(shù)據(jù)庫名字是tenant,表是dept:insert into dept VALUES(8,"人力","","1");
Table:
108: [tenant-dept]
Insert:
WriteRowsEventData{tableId=108, includedColumns={0, 1, 2, 3}, rows=[
[8, 人力, , 1]
]}
TableMapEventData,通過它可以獲取操作的數(shù)據(jù)庫名稱、表名稱以及表的id。之所以我們要監(jiān)聽這個事件,是因為之后監(jiān)聽的實際操作中返回數(shù)據(jù)中包含了表的id,而沒有表名等信息,所以如果我們想知道具體的操作是在哪一張表的話,就要先維護一個id與表的對應關(guān)系。WriteRowsEventData,其中記錄了insert語句作用的表,插入涉及到的列,以及實際插入的數(shù)據(jù)。另外,如果我們只需要對特定的一張或幾張表進行處理的話,也可以提前設置表的名單,在這里根據(jù)表id到表名的映射關(guān)系,實現(xiàn)數(shù)據(jù)的過濾,update語句:update dept set tenant_id=3 where id=8 or id=9
Table:
108: [tenant-dept]
Update:
UpdateRowsEventData{tableId=108, includedColumnsBeforeUpdate={0, 1, 2, 3}, includedColumns={0, 1, 2, 3}, rows=[
{before=[8, 人力, , 1], after=[8, 人力, , 3]},
{before=[9, 人力, , 1], after=[9, 人力, , 3]}
]}
update語句時,可能會作用于多條數(shù)據(jù),因此在實際修改的數(shù)據(jù)中,可能包含多行記錄,這一點體現(xiàn)在上面的rows中,包含了id為8和9的兩條數(shù)據(jù)。delete語句:delete from dept where tenant_id=3
rows中同樣返回了生效的兩條數(shù)據(jù):Table:
108: [tenant-dept]
Delete:
DeleteRowsEventData{tableId=108, includedColumns={0, 1, 2, 3}, rows=[
[8, 人力, , 3],
[9, 人力, , 3]
]}
update操作為例,我們要對提取的數(shù)據(jù)后進行一下處理,更改上面例子中的方法:if (data instanceof UpdateRowsEventData) {
System.out.println("Update:");
UpdateRowsEventData updateRowsEventData = (UpdateRowsEventData) data;
for (Map.Entry<Serializable[], Serializable[]> row : updateRowsEventData.getRows()) {
List<Serializable> entries = Arrays.asList(row.getValue());
System.out.println(entries);
JSONObject dataObject = getDataObject(entries);
System.out.println(dataObject);
}
}
data強制轉(zhuǎn)換為UpdateRowsEventData后,可以使用getRows方法獲取到更新的行數(shù)據(jù),并且能夠取到每一列的值。getDataObject方法,用它來實現(xiàn)數(shù)據(jù)到列的綁定過程:private static JSONObject getDataObject(List message) {
JSONObject resultObject = new JSONObject();
String format = "{\"id\":\"0\",\"dept_name\":\"1\",\"comment\":\"2\",\"tenant_id\":\"3\"}";
JSONObject json = JSON.parseObject(format);
for (String key : json.keySet()) {
resultObject.put(key, message.get(json.getInteger(key)));
}
return resultObject;
}
format字符串中,提前維護了一個數(shù)據(jù)庫表的字段順序的字符串,標識了每個字段位于順序中的第幾個位置。通過上面這個函數(shù),能夠?qū)崿F(xiàn)數(shù)據(jù)到列的填裝過程,我們再執(zhí)行一條update語句來查看一下結(jié)果:update dept set tenant_id=3,comment="1" where id=8
Table:
108: [tenant-dept]
Update:
[8, 人力, 1, 3]
{"tenant_id":3,"dept_name":"人力","comment":"1","id":8}
評論
圖片
表情
