AI預測:基于流計算Oceanus (Flink) 實現病癥的實時預測

一、方案描述
(一)概述

(二)方案架構

流計算Oceanus(Flink)
智能鈦機器學習平臺(TI-ONE)
智能鈦彈性模型服務(TI-EMS)
消息隊列CKafka
云數據倉庫ClickHouse
對象存儲(COS)
二、前置準備
(一)創(chuàng)建私有網絡VPC
(二)創(chuàng)建流計算Oceanus集群
流計算Oceanus是大數據產品生態(tài)體系的實時化分析利器,是基于Apache Flink構建的具備一站開發(fā)、無縫連接、亞秒延時、低廉成本、安全穩(wěn)定等特點的企業(yè)級實時大數據分析平臺。流計算Oceanus以實現企業(yè)數據價值最大化為目標,加速企業(yè)實時化數字化的建設進程。
在流計算Oceanus控制臺[2]的【集群管理】->【新建集群】頁面創(chuàng)建集群,選擇地域、可用區(qū)、VPC、日志、存儲,設置初始密碼等。VPC及子網使用剛剛創(chuàng)建好的網絡。創(chuàng)建完后Flink的集群如下:

(三)創(chuàng)建CKafka實例
(四)創(chuàng)建COS實例
(五)創(chuàng)建ClickHouse集群
# 下載 ClickHouse-Client 命令wget https://repo.yandex.ru/clickhouse/rpm/stable/x86_64/clickhouse-client-20.7.2.30-2.noarch.rpmwget https://repo.yandex.ru/clickhouse/rpm/stable/x86_64/clickhouse-common-static-20.7.2.30-2.x86_64.rpm# 安裝客戶端rpm -ivh *.rpm# 使用 tcp 端口登陸 ClickHouse 集群,IP 地址可通過控制臺查看clickhouse-client -hxxx.xxx.xxx.xxx --port 9000
-- 創(chuàng)建數據庫CREATE DATABASE IF NOT EXISTS testdb ON CLUSTER default_cluster;-- 創(chuàng)建表CREATE TABLE testdb.model_predict_result_1 on cluster default_cluster (res String,Sign Int8) ENGINE = ReplicatedCollapsingMergeTree('/clickhouse/tables/{layer}-{shard}/testdb/model_predict_result_1', '{replica}',Sign) ORDER BY res;
(六)注冊開通TI-ONE服務
智能鈦機器學習平臺是為AI工程師打造的一站式機器學習服務平臺,為用戶提供從數據預處理、模型構建、模型訓練、模型評估到模型服務的全流程開發(fā)及部署支持。
單擊【前往訪問管理】,頁面將跳轉至訪問管理控制臺。
單擊【同意授權】,即可創(chuàng)建服務預設角色并授予智能鈦機器學習平臺相關權限。
(七)注冊開通TI-EMS服務
智能鈦彈性模型服務(Tencent Intelligence Elastic Model Service,TI-EMS)是具備虛擬化異構算力和彈性擴縮容能力的無服務器化在線推理平臺。
三、方案實現
(一)離線模型訓練
數據集介紹


離線模型訓練



(二)實時特征工程
創(chuàng)建Source
-- random source 用于模擬患者病歷實時特征數據CREATE TABLE random_source (ClumpThickness INT,UniformityOfCellSize INT,UniformityOfCellShape INT,MarginalAdhsion INT,SingleEpithelialCellSize INT,BareNuclei INT,BlandChromation INT,NormalNucleoli INT,Mitoses INT) WITH ('connector' = 'datagen','rows-per-second'='1', -- 每秒產生的數據條數'fields.ClumpThickness.kind'='random', -- 無界的隨機數'fields.ClumpThickness.min'='0', -- 隨機數的最小值'fields.ClumpThickness.max'='10', -- 隨機數的最大值'fields.UniformityOfCellSize.kind'='random', -- 無界的隨機數'fields.UniformityOfCellSize.min'='0', -- 隨機數的最小值'fields.UniformityOfCellSize.max'='10', -- 隨機數的最大值'fields.UniformityOfCellShape.kind'='random', -- 無界的隨機數'fields.UniformityOfCellShape.min'='0', -- 隨機數的最小值'fields.UniformityOfCellShape.max'='10', -- 隨機數的最大值'fields.MarginalAdhsion.kind'='random', -- 無界的隨機數'fields.MarginalAdhsion.min'='0', -- 隨機數的最小值'fields.MarginalAdhsion.max'='10', -- 隨機數的最大值'fields.SingleEpithelialCellSize.kind'='random', -- 無界的隨機數'fields.SingleEpithelialCellSize.min'='0', -- 隨機數的最小值'fields.SingleEpithelialCellSize.max'='10', -- 隨機數的最大值'fields.BareNuclei.kind'='random', -- 無界的隨機數'fields.BareNuclei.min'='0', -- 隨機數的最小值'fields.BareNuclei.max'='10', -- 隨機數的最大值'fields.BlandChromation.kind'='random', -- 無界的隨機數'fields.BlandChromation.min'='0', -- 隨機數的最小值'fields.BlandChromation.max'='10', -- 隨機數的最大值'fields.NormalNucleoli.kind'='random', -- 無界的隨機數'fields.NormalNucleoli.min'='0', -- 隨機數的最小值'fields.NormalNucleoli.max'='10', -- 隨機數的最大值'fields.Mitoses.kind'='random', -- 無界的隨機數'fields.Mitoses.min'='0', -- 隨機數的最小值'fields.Mitoses.max'='10' -- 隨機數的最大值);
創(chuàng)建Sink
CREATE TABLE `KafkaSink` (ClumpThickness INT,UniformityOfCellSize INT,UniformityOfCellShape INT,MarginalAdhsion INT,SingleEpithelialCellSize INT,BareNuclei INT,BlandChromation INT,NormalNucleoli INT,Mitoses INT) WITH ('connector' = 'kafka', -- 可選 'kafka','kafka-0.11'. 注意選擇對應的內置 Connector'topic' = 'topic-decision-tree-predict-1', -- 替換為您要消費的 Topic'properties.bootstrap.servers' = '172.28.28.211:9092', -- 替換為您的 Kafka 連接地址'properties.group.id' = 'RealTimeFeatures', -- 必選參數, 一定要指定 Group ID'format' = 'csv');
編寫業(yè)務SQL
INSERT INTO `KafkaSink`SELECT * FROM `random_source`
選擇Connector
查詢數據

(三)實時預測
啟動模型服務

公網調用模型測試
單擊右側【更多】>【調用】,創(chuàng)建公網調用地址。

啟動控制臺,新建data.json文件,在某一文件夾下運行如下代碼:
# 請將 <訪問地址>/<密鑰> 替換為實際的 IP 地址/密鑰curl -H "Content-Type: application/json" \-H "x-Auth-Token: <密鑰>" \-X POST <訪問地址>/v1/models/m:predict -d @data.json
{"instances" : [{"_c0": 3, "_c1": 2, "_c2": 3, "_c3": 0, "_c4": 0, "_c5": 2, "_c6": 1, "_c7": 0, "_c8": 1}]}{"predictions": [{"pmml(prediction)":"1","probability(0)":"0.47058823529411764","probability(1)":"0.5294117647058824","prediction":"1.0","label":"1"}]}通過流計算Oceanus調用模型服務
本地代碼開發(fā)、調試。
進入流計算Oceanus控制臺[2],單擊左側【依賴管理】新建依賴并上傳JAR包。?
進入【作業(yè)管理】頁面,創(chuàng)建JAR作業(yè),選擇之前創(chuàng)建好的流計算Oceanus集群。?
單擊【開發(fā)調試】指定相應的主程序包和主類,點擊【作業(yè)調試】,【內置Connector】選擇flink-connector-clickhouse和flink-connector-kafka。

import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.common.restartstrategy.RestartStrategies;import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.api.java.utils.ParameterTool;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.CheckpointConfig;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.source.SourceFunction;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import org.apache.flink.table.api.Table;import org.apache.flink.util.Collector;import org.apache.http.HttpEntity;import org.apache.http.HttpResponse;import org.apache.http.client.HttpClient;import org.apache.http.client.methods.HttpPost;import org.apache.http.entity.StringEntity;import org.apache.http.impl.client.HttpClientBuilder;import org.apache.http.util.EntityUtils;import org.apache.kafka.clients.consumer.ConsumerConfig;import org.json.JSONObject;import org.slf4j.LoggerFactory;import org.slf4j.Logger;import java.util.ArrayList;import java.util.Properties;import org.apache.flink.table.api.EnvironmentSettings;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class OnlinePredict {public static final Logger logger = LoggerFactory.getLogger(OnlinePredict.class);public static void main(String[] args) throws Exception {// kafka配置參數解析final ParameterTool parameterTool = ParameterTool.fromPropertiesFile(OnlinePredict.class.getResourceAsStream("/KafkaSource.properties"));// 實例化運行環(huán)境EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, settings);// checkpoint配置streamEnv.enableCheckpointing(parameterTool.getLong("flink.stream.checkpoint.interval", 30_000));streamEnv.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);// 重啟策略streamEnv.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10_000));// source、transfer、sinkDataStreamstringResult = streamEnv.addSource(buildKafkaSource(parameterTool)) .flatMap(new FlatMapFunction() { @Overridepublic void flatMap(String value, Collectorout) throws Exception { String paramInput = inputDataTransfer(value);String outputData = sendHttpData(paramInput);out.collect(outputData);}});Table tableResult = tableEnv.fromDataStream(stringResult);tableEnv.createTemporaryView("resultSink",tableResult);tableEnv.executeSql("CREATE TABLE `CKSink` (\n" +" res STRING,\n" +" PRIMARY KEY (`res`) NOT ENFORCED\n" +") WITH (\n" +" 'connector' = 'clickhouse',\n" +" 'url' = 'clickhouse://172.28.1.138:8123',\n" +" 'database-name' = 'testdb',\n" +" 'table-name' = 'model_predict_result_1',\n" +" 'table.collapsing.field' = 'Sign'\n" +")");tableEnv.executeSql("insert into CKSink select * from resultSink");}// kafka sourcepublic static SourceFunctionbuildKafkaSource(ParameterTool parameterTool) throws Exception { Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, parameterTool.get("kafka.source.bootstrap.servers"));properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, parameterTool.get("kafka.source.auto.offset.reset", "latest"));properties.put(ConsumerConfig.GROUP_ID_CONFIG, parameterTool.get("kafka.source.group.id"));FlinkKafkaConsumerconsumer = new FlinkKafkaConsumer ( parameterTool.get("kafka.source.topic"),new SimpleStringSchema(),properties);consumer.setStartFromGroupOffsets();return consumer;}// kafka 數據格式轉換// 返回數據格式:{"instances" : [{"_c0": 3, "_c1": 2, "_c2": 3, "_c3": 0, "_c4": 0, "_c5": 2, "_c6": 1, "_c7": 0, "_c8": 1}]}public static String inputDataTransfer(String value) {String[] input = value.split(",");ArrayListdataListMap = new ArrayList (); JSONObject jsondata = new JSONObject();for (int i = 0; i < input.length; i++) {jsondata.put("_c" + i, Double.parseDouble(input[i]));}dataListMap.add(jsondata);String param = "{\"instances\":" + dataListMap.toString() + "}";return param;}// TI-EMS 模型在線推理服務調用// 返回數據格式如下:{"predictions": [{"pmml(prediction)":"1","probability(0)":"0.47058823529411764","probability(1)":"0.5294117647058824","prediction":"1.0","label":"1"}]}public static String sendHttpData(String paramJson) throws Exception {String data = null;try {// 請將 xx.xx.xx.xx:xxxx 替換為實際的 IP 地址,參考 3.2.2 圖中所示 創(chuàng)建 VPC 調用String url = "http://xx.xx.xx.xx:xxxx/v1/models/m:predict";HttpClient client = HttpClientBuilder.create().build();HttpPost post = new HttpPost(url);post.addHeader("Content-type", "application/json");post.addHeader("Accept", "application/json");// 請將 xxxxxxxxxx 替換為實際密鑰,參考 3.2.2 圖中所示 創(chuàng)建 VPC 調用post.addHeader("X-AUTH-TOKEN", "xxxxxxxxxx");StringEntity entity = new StringEntity(paramJson, java.nio.charset.Charset.forName("UTF-8"));post.setEntity(entity);HttpResponse response = client.execute(post);// 判斷是否正常返回if (response.getStatusLine().getStatusCode() == 200) {// 解析數據HttpEntity resEntity = response.getEntity();data = EntityUtils.toString(resEntity);} else {data = "error input";}System.out.print(data);System.out.println(data);} catch (Throwable e) {logger.error("", e);}return data;}}
# source // 請?zhí)鎿Q為實際的參數kafka.source.bootstrap.servers=172.28.28.211:9092kafka.source.topic=topic-decision-tree-predict-1kafka.source.group.id=RealTimePredict1kafka.source.auto.offset.reset=latest
<properties><flink.version>1.11.0flink.version>properties><dependencies><dependency><groupId>org.apache.flinkgroupId><artifactId>flink-streaming-java_2.11artifactId><version>${flink.version}version><scope>providedscope>dependency><dependency><groupId>org.apache.flinkgroupId><artifactId>flink-connector-kafka_2.11artifactId><version>${flink.version}version><scope>providedscope>dependency><dependency><groupId>org.apache.flinkgroupId><artifactId>flink-connector-clickhouseartifactId><version>${flink.version}version><scope>providedscope>dependency><dependency><groupId>org.apache.flinkgroupId><artifactId>flink-table-commonartifactId><version>${flink.version}version><scope>providedscope>dependency><dependency><groupId>org.apache.flinkgroupId><artifactId>flink-table-api-java-bridge_2.11artifactId><version>${flink.version}version><scope>providedscope>dependency><dependency><groupId>org.apache.flinkgroupId><artifactId>flink-table-api-javaartifactId><version>${flink.version}version><scope>providedscope>dependency><dependency><groupId>org.apache.httpcomponentsgroupId><artifactId>httpclientartifactId><version>4.5.3version><scope>compilescope>dependency><dependency><groupId>org.jsongroupId><artifactId>jsonartifactId><version>20201115version><scope>compilescope>dependency>dependencies>
四、總結
新版Flink 1.13集群無需用戶自己選擇內置Connector,平臺將自動匹配。
除了使用CKafka及ClickHouse作為數據倉庫外,還可以使用Hive、Mysql、PG等作為數倉,根據用戶實際需求自行選擇。
本方案最簡化了實時特征工程,用戶可以根據自身業(yè)務需求采用SQL、JAR、ETL作業(yè)的方式完成實時特征工程。
本方案只初始化了一個PMML服務提供流計算Oceanus調用,如遇數據背壓情況可增多PMML服務循環(huán)調用。
TI-ONE、TI-EMS平臺暫時不支持實時訓練模型,如需更新模型可以自行編寫定時腳本拉取數據在TI-ONE平臺訓練更新。
五、參考地址

