Flink 實踐 | Flink CDC 系列 - 實時抽取 Oracle 數(shù)據,排雷和調優(yōu)實踐
摘要:本文作者為中國農業(yè)銀行研發(fā)中心丁楊,在 Flink CDC 2.1 版本發(fā)布后第一時間下載使用,并成功實現(xiàn)了對 Oracle 的實時數(shù)據捕獲以及性能調優(yōu),現(xiàn)將試用過程中的一些關鍵細節(jié)進行分享。主要內容包括:
無法連接數(shù)據庫 無法找到?Oracle 表 數(shù)據延遲較大 調節(jié)參數(shù)繼續(xù)降低數(shù)據延遲 Debezium Oracle?Connector?的隱藏參數(shù)
Flink CDC
https://ververica.github.io/flink-cdc-connectors/release-2.1/content/connectors/oracle-cdc.html
Debezium
https://debezium.io/documentation/reference/1.5/connectors/oracle.html
一、無法連接數(shù)據庫
create table TEST (A string)WITH ('connector'='oracle-cdc','hostname'='10.230.179.125','port'='1521','username'='myname','password'='***','database-name'='MY_SERVICE_NAME','schema-name'='MY_SCHEMA','table-name'='TEST' );
[ERROR] Could not execute SQL statement. Reason:oracle.net.ns.NetException: Listener refused the connection with the following error:ORA-12505, TNS:listener does not currently know of SID given in connect descriptor
public static Connection openConnection(Properties properties) throws SQLException {DriverManager.registerDriver(new oracle.jdbc.OracleDriver());String hostname = properties.getProperty("database.hostname");String port = properties.getProperty("database.port");String dbname = properties.getProperty("database.dbname");String userName = properties.getProperty("database.user");String userpwd = properties.getProperty("database.password");return DriverManager.getConnection("jdbc:oracle:thin:@" + hostname + ":" + port + ":" + dbname, userName, userpwd);}
在 Flink CDC 的 create table 語句中,將 database-name 由 Service Name 替換成其中一個 SID。該方式能解決連接問題,但無法適應主流的 Oracle 集群部署的真實場景; 對該源碼進行修改。具體可在新建工程中,重寫 com.ververica.cdc.connectors.oracle.OracleValidator 方法,修改為 Service Name 的連接方式 (即 port 和 dbname 中間使用 “ / ” 分隔開),即: "jdbc:oracle:thin:@" + hostname + ":" + port + "/" + dbname, userName, userpwd);
該問題已提交至 Flink CDC Issue 701: https://github.com/ververica/flink-cdc-connectors/issues/701
二、無法找到 Oracle 表
[ERROR] Could not execute SQL statement. Reason:io.debezium.DebeziumException: Supplemental logging not configured for table MY_SERVICE_NAME.MY_SCHEMA.test. Use command: ALTER TABLE MY_SCHEMA.test ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS
private TableId toLowerCaseIfNeeded(TableId tableId) {return tableIdCaseInsensitive ? tableId.toLowercase() : tableId;}
見?https://docs.oracle.com/cd/E11882_01/server.112/e41084/sql_elements008.htm “Nonquoted identifiers are not case sensitive. Oracle interprets them as uppercase”
如需使用 Oracle “大小寫不敏感” 的特性,可直接修改源碼,將上述 toLowercase 修改為 toUppercase (這也是筆者選擇的方法); 如果不愿意修改源碼,且無需使用 Oracle “大小寫不敏感” 的特性,可以在 create 語句中加上 'debezium.database.tablename.case.insensitive'='false',如下示例:
create table TEST (A string)WITH ('connector'='oracle-cdc','hostname'='10.230.179.125','port'='1521','username'='myname','password'='***','database-name'='MY_SERVICE_NAME','schema-name'='MY_SCHEMA','table-name'='TEST','debezium.database.tablename.case.insensitive'='false' );
該問題已提交至 Flink CDC Issue 702: https://github.com/ververica/flink-cdc-connectors/issues/702
三、數(shù)據延遲較大
'debezium.log.mining.strategy'='online_catalog','debezium.log.mining.continuous.mine'='true'

ORA-00308: cannot open archive log '/path/to/archive/log/...'ORA-27037: unable to obtain file status
四、調節(jié)參數(shù)繼續(xù)降低數(shù)據延遲

從上述的流程圖中可以看出,debezium 給出 log.mining.batch.size.* 和 log.mining.sleep.time.* 兩組參數(shù),就是為了讓每一次 logMiner 運行的步長能夠盡可能和數(shù)據庫自身 SCN 增加的步長一致。由此可見:
log.mining.batch.size.* 和 log.mining.sleep.time.* 參數(shù)的設定,和數(shù)據庫整體的表現(xiàn)有關,和單個表的數(shù)據變化情況無關; log.mining.batch.size.default 不僅僅是監(jiān)控時序范圍的起始值,還是監(jiān)控時序范圍變化的閾值。所以如果要實現(xiàn)更靈活的監(jiān)控時序范圍調整,可考慮適當減小該參數(shù); 由于每一次確定監(jiān)控時序范圍時,都會根據 topScn 和 currentScn 的大小來調整 sleepTime,所以為了實現(xiàn)休眠時間更靈活的調整,可考慮適當增大 log.mining.sleep.time.increment.ms; log.mining.batch.size.max 不能過小,否則會有監(jiān)控時序范圍永遠無法追上數(shù)據庫當前 SCN 的風險。為此,debezium 在 io.debezium.connector.oracle.OracleStreamingChangeEventSourceMetrics 中存在以下邏輯:
if (currentBatchSize == batchSizeMax) {LOGGER.info("LogMiner is now using the maximum batch size {}. This could be indicative of large SCN gaps", currentBatchSize);}
五、Debezium Oracle Connector
的隱藏參數(shù)

所以我們在分析 Flink CDC 行為時,通過自定義實現(xiàn) io.debezium.connector.oracle.logminer.HistoryRecorder 接口的類,可在不修改源碼的情況下,實現(xiàn)對 Flink CDC 行為的個性化監(jiān)控。
Flink-CDC 項目地址:
https://github.com/ververica/flink-cdc-connectors
相關文章
Flink CDC 系列 - 實現(xiàn) MySQL 數(shù)據實時寫入 Apache Doris
Flink CDC 系列 - 構建 MySQL 和 Postgres 上的 Streaming ETL Flink CDC 2.1 正式發(fā)布,穩(wěn)定性大幅提升,新增 Oracle,MongoDB 支持

?
??戳我,查看更多技術干貨~
