Apache Doris實(shí)時(shí)數(shù)據(jù)分析保姆級(jí)使用教程

Doris安裝
集群部署
官網(wǎng)下載地址:
https://doris.apache.org/zh-CN/downloads/downloads.html

選擇二進(jìn)制下載,源碼下載需要自己編譯。解壓doris文件:
tar -zxvf apache-doris-1.0.0-incubating-bin.tar.gz -C /opt/module/
集群規(guī)劃

FE部署
修改配置文件vim conf/fe.conf
meta_dir = /opt/module/doris-meta

集群中分發(fā)存儲(chǔ)路徑和FE配置文件,啟動(dòng)FE。
# 創(chuàng)建meta文件夾存儲(chǔ)路徑
mkdir /opt/module/doris-meta
# 三臺(tái)機(jī)器都要執(zhí)行
sh bin/start_fe.sh --daemon

BE部署
修改配置文件vim conf/be.conf
# storage_root_path配置存儲(chǔ)目錄,可以用;來指定多個(gè)目錄,每個(gè)目錄后可以跟逗號(hào),指定大小默認(rèn)GB
storage_root_path = /opt/module/doris_storage1,10;/opt/module/doris_storage2

集群中分發(fā)存儲(chǔ)路徑和BE配置文件,啟動(dòng)BE
# 創(chuàng)建storage_root_path存儲(chǔ)路徑
mkdir /opt/module/doris_storage1
mkdir /opt/module/doris_storage2
# 三臺(tái)機(jī)器都要執(zhí)行
sh bin/start_be.sh --daemon

訪問Doris PE節(jié)點(diǎn)
doris可以使用mysql客戶端訪問,如果未安裝,則需要安裝mysql-client。
# 第一次訪問不需要密碼,可以自行設(shè)置密碼
mysql -hdoris1 -P 9030 -uroot
# 修改密碼
set password for 'root' = password('root');

添加BE節(jié)點(diǎn)
通過mysql客戶端登入后,添加be節(jié)點(diǎn),port為be上的heartbeat_service_port端口,默認(rèn)9050
mysql> ALTER SYSTEM ADD BACKEND "hadoop102:9050";
mysql> ALTER SYSTEM ADD BACKEND "hadoop103:9050";
mysql> ALTER SYSTEM ADD BACKEND "hadoop104:9050";
通過mysql客戶端,檢測(cè)be節(jié)點(diǎn)狀態(tài),alive必須為true
mysql> SHOW PROC '/backends';

BROKER部署
可選,非必須部署,啟動(dòng)BROKER
# 三臺(tái)集群都要啟動(dòng)
sh bin/start_broker.sh --daemon

使用mysql客戶端訪問pe,添加broker節(jié)點(diǎn)
mysql> ALTER SYSTEM ADD BROKER broker_name "hadoop102:8000","hadoop103:8000","hadoop104:8000";查看broker狀態(tài)
mysql> SHOW PROC "/brokers";
擴(kuò)容縮容
Doris可以很方便的擴(kuò)容和縮容FE、BE、Broker實(shí)例。通過頁面訪問進(jìn)行監(jiān)控,訪問8030,賬戶為root,密碼默認(rèn)為空不用填寫,除非上述設(shè)置了密碼使用密碼登錄http://hadoop102:8030

FE 擴(kuò)容和縮容
FE 節(jié)點(diǎn)的擴(kuò)容和縮容過程,不影響當(dāng)前系統(tǒng)運(yùn)行。
使用mysql登錄客戶端后,可以使用sql命令查看FE狀態(tài),目前就一臺(tái)FE。
mysql> SHOW PROC '/frontends';


增加FE節(jié)點(diǎn),F(xiàn)E分為Leader,Follower和Observer三種角色。默認(rèn)一個(gè)集群只能有一個(gè)Leader,可以有多個(gè)Follower和Observer.其中Leader和Follower組成一個(gè)Paxos選擇組,如果Leader宕機(jī),則剩下的Follower會(huì)成為Leader,保證HA。Observer是負(fù)責(zé)同步Leader數(shù)據(jù)的不參與選舉。如果只部署一個(gè)FE,則FE默認(rèn)就是Leader
第一個(gè)啟動(dòng)的FE自動(dòng)成為Leader。在此基礎(chǔ)上,可以添加若干Follower和Observer。添加Follower或Observer。使用mysql-client連接到已啟動(dòng)的FE,并執(zhí)行:在doris2部署Follower,doris3上部署Observer
# 執(zhí)行其中的一個(gè)即可,注解如下
# follower/observer_host IP節(jié)點(diǎn)位置
# edit_log_port fe.conf配置文件中可以查詢到
# ALTER SYSTEM ADD FOLLOWER "follower_host:edit_log_port";
ALTER SYSTEM ADD FOLLOWER "hadoop103:9010";
# ALTER SYSTEM ADD OBSERVER "observer_host:edit_log_port";
ALTER SYSTEM ADD OBSERVER "hadoop104:9010";

需要重啟配置節(jié)點(diǎn)的FE,并添加如下參數(shù)啟動(dòng)
# --helper參數(shù)指定leader地址和端口號(hào)
sh bin/start_fe.sh --helper hadoop102:9010 --daemon
sh bin/start_fe.sh --helper hadoop102:9010 --daemon
全部啟動(dòng)完畢后,再通過mysql客戶端,查看FE狀況
mysql> SHOW PROC '/frontends';

使用以下命令刪除對(duì)應(yīng)的FE節(jié)點(diǎn)ALTER SYSTEM DROP FOLLOWER[OBSERVER] "fe_host:edit_log_port";刪除Follower FE時(shí),確保最終剩余的Follower(包括 Leader)節(jié)點(diǎn)為奇數(shù)
ALTER SYSTEM DROP FOLLOWER "hadoop103:9010";
ALTER SYSTEM DROP OBSERVER "hadoop104:9010";
BE 擴(kuò)容和縮容
增加BE節(jié)點(diǎn),就像上面安裝一樣在mysql客戶端,使用ALTER SYSTEM ADD BACKEND
刪除BE節(jié)點(diǎn),使用ALTER SYSTEM DROP BACKEND "be_host:be_heartbeat_service_port";
具體文檔請(qǐng)查看官網(wǎng)。
Doris操作手冊(cè)
創(chuàng)建用戶
# 連接doris
mysql -hhadoop102 -P 9030 -uroot
# 創(chuàng)建用戶
mysql> create user 'test' identified by 'test';
# 退出使用test即可登錄
mysql> exit;
mysql -hhadoop102 -P 9030 -utest -ptest
表操作
# 創(chuàng)建數(shù)據(jù)庫
mysql> create database test_db;
# 賦予test用戶test庫權(quán)限
mysql> grant all on test_dn to test;
# 使用數(shù)據(jù)庫
mysql> use test_db;
分區(qū)表
分區(qū)表分為單分區(qū)和復(fù)合分區(qū)
單分區(qū)表,建立一張student表。分桶列為id,桶數(shù)為10,副本數(shù)為1
CREATE TABLE student
(
id INT,
name VARCHAR(50),
age INT,
count BIGINT SUM DEFAULT '0'
)
AGGREGATE KEY (id,name,age)
DISTRIBUTED BY HASH(id) buckets 10
PROPERTIES("replication_num" = "1");
復(fù)合分區(qū)表,第一級(jí)稱為Partition,即分區(qū)。用戶指定某一維度列做為分區(qū)列(當(dāng)前只支持整型和時(shí)間類型的列),并指定每個(gè)分區(qū)的取值范圍。第二級(jí)稱為Distribution,即分桶。用戶可以指定一個(gè)或多個(gè)維度列以及桶數(shù)進(jìn)行HASH分布
#創(chuàng)建student2表,使用dt字段作為分區(qū)列,并且創(chuàng)建3個(gè)分區(qū)發(fā),分別是:
#P202007 范圍值是是小于2020-08-01的數(shù)據(jù)
#P202008 范圍值是2020-08-01到2020-08-31的數(shù)據(jù)
#P202009 范圍值是2020-09-01到2020-09-30的數(shù)據(jù)
CREATE TABLE student2
(
dt DATE,
id INT,
name VARCHAR(50),
age INT,
count BIGINT SUM DEFAULT '0'
)
AGGREGATE KEY (dt,id,name,age)
PARTITION BY RANGE(dt)
(
PARTITION p202007 VALUES LESS THAN ('2020-08-01'),
PARTITION p202008 VALUES LESS THAN ('2020-09-01'),
PARTITION p202009 VALUES LESS THAN ('2020-10-01')
)
DISTRIBUTED BY HASH(id) buckets 10
PROPERTIES("replication_num" = "1");
復(fù)合分區(qū)表,第一級(jí)稱為Partition,即分區(qū)。用戶指定某一維度列做為分區(qū)列(當(dāng)前只支持整型和時(shí)間類型的列),并指定每個(gè)分區(qū)的取值范圍。第二級(jí)稱為Distribution,即分桶。用戶可以指定一個(gè)或多個(gè)維度列以及桶數(shù)進(jìn)行HASH分布.
#創(chuàng)建student2表,使用dt字段作為分區(qū)列,并且創(chuàng)建3個(gè)分區(qū)發(fā),分別是:
#P202007 范圍值是是小于2020-08-01的數(shù)據(jù)
#P202008 范圍值是2020-08-01到2020-08-31的數(shù)據(jù)
#P202009 范圍值是2020-09-01到2020-09-30的數(shù)據(jù)
CREATE TABLE student2
(
dt DATE,
id INT,
name VARCHAR(50),
age INT,
count BIGINT SUM DEFAULT '0'
)
AGGREGATE KEY (dt,id,name,age)
PARTITION BY RANGE(dt)
(
PARTITION p202007 VALUES LESS THAN ('2020-08-01'),
PARTITION p202008 VALUES LESS THAN ('2020-09-01'),
PARTITION p202009 VALUES LESS THAN ('2020-10-01')
)
DISTRIBUTED BY HASH(id) buckets 10
PROPERTIES("replication_num" = "1");
數(shù)據(jù)模型
AGGREGATE KEY
AGGREGATE KEY相同時(shí),新舊記錄將會(huì)進(jìn)行聚合操作
AGGREGATE KEY模型可以提前聚合數(shù)據(jù),適合報(bào)表和多維度業(yè)務(wù)
UNIQUE KEY
UNIQUE KEY相同時(shí),新記錄覆蓋舊記錄。目前UNIQUE KEY和AGGREGATE KEY的REPLACE聚合方法一致。適用于有更新需求的業(yè)務(wù)。
DUPLICATE KEY
只指定排序列,相同的行并不會(huì)合并。適用于數(shù)據(jù)無需提前聚合的分析業(yè)務(wù)
數(shù)據(jù)導(dǎo)入
為適配不同的數(shù)據(jù)導(dǎo)入需求,Doris系統(tǒng)提供5種不同的導(dǎo)入方式。每種導(dǎo)入方式支持不同的數(shù)據(jù)源,存在不同的方式(異步、同步)
Broker load
Broker load是一個(gè)導(dǎo)入的異步方式,支持的數(shù)據(jù)源取決于Broker進(jìn)程支持的數(shù)據(jù)源
基本原理:用戶在提交導(dǎo)入任務(wù)后,F(xiàn)E(Doris系統(tǒng)的元數(shù)據(jù)和調(diào)度節(jié)點(diǎn))會(huì)生成相應(yīng)的PLAN(導(dǎo)入執(zhí)行計(jì)劃,BE會(huì)執(zhí)行導(dǎo)入計(jì)劃將輸入導(dǎo)入Doris中)并根據(jù)BE(Doris系統(tǒng)的計(jì)算和存儲(chǔ)節(jié)點(diǎn))的個(gè)數(shù)和文件的大小,將Plan分給多個(gè)BE執(zhí)行,每個(gè)BE導(dǎo)入一部分?jǐn)?shù)據(jù)。BE在執(zhí)行過程中會(huì)從Broker拉取數(shù)據(jù),在對(duì)數(shù)據(jù)轉(zhuǎn)換之后導(dǎo)入系統(tǒng)。所有BE均完成導(dǎo)入,由FE最終決定是否導(dǎo)入是否成功。
測(cè)試導(dǎo)入HDFS數(shù)據(jù)到Doris
編寫測(cè)試文件,上傳到HDFS.

創(chuàng)建doris表,測(cè)試導(dǎo)入
CREATE TABLE student
(
id INT,
name VARCHAR(50),
age INT,
count BIGINT SUM DEFAULT '0'
)
AGGREGATE KEY (id,name,age)
DISTRIBUTED BY HASH(id) buckets 10
PROPERTIES("replication_num" = "1");
編寫diros導(dǎo)入sql,更多參數(shù)請(qǐng)看官網(wǎng)
LOAD LABEL test_db.label1
(
DATA INFILE("hdfs://bigdata:8020/student")
INTO TABLE student
COLUMNS TERMINATED BY ","
(id,name,age,count)
SET
(
id=id,
name=name,
age=age,
count=count
)
)
WITH BROKER broker_name
(
"username"="root"
)
PROPERTIES
(
"timeout" = "3600"
);
查看doris導(dǎo)入狀態(tài)
use test_db;
show load;

查看數(shù)據(jù)導(dǎo)入是否成功

Routine Load
例行導(dǎo)入(Routine Load)功能為用戶提供了一種自動(dòng)從指定數(shù)據(jù)源進(jìn)行數(shù)據(jù)導(dǎo)入的功能
從Kafka導(dǎo)入數(shù)據(jù)到Doris
創(chuàng)建kafka主題
kafka-topics.sh --zookeeper bigdata:2181 --create --replication-factor 1 --partitions 1 --topic test
啟動(dòng)kafka生產(chǎn)者生產(chǎn)數(shù)據(jù)
kafka-console-producer.sh --broker-list bigdata:9092 --topic test
# 數(shù)據(jù)格式
{"id":"4","name":"czsqhh","age":"18","count":"50"}
在doris中創(chuàng)建對(duì)應(yīng)表
CREATE TABLE kafka_student
(
id INT,
name VARCHAR(50),
age INT,
count BIGINT SUM DEFAULT '0'
)
AGGREGATE KEY (id,name,age)
DISTRIBUTED BY HASH(id) buckets 10
PROPERTIES("replication_num" = "1");

創(chuàng)建導(dǎo)入作業(yè),desired_concurrent_number指定并行度
CREATE ROUTINE LOAD test_db.job1 on kafka_student
PROPERTIES
(
"desired_concurrent_number"="1",
"strict_mode"="false",
"format"="json"
)
FROM KAFKA
(
"kafka_broker_list"= "bigdata:9092",
"kafka_topic" = "test",
"property.group.id" = "test"
);
查看作業(yè)狀態(tài)
SHOW ROUTINE LOAD;

控制作業(yè)
STOP ROUTINE LOAD For jobxxx :停止作業(yè)
PAUSE ROUTINE LOAD For jobxxx:暫停作業(yè)
RESUME ROUTINE LOAD For jobxxx:重啟作業(yè)
數(shù)據(jù)導(dǎo)出
Drois導(dǎo)出數(shù)據(jù)到HDFS
其他參數(shù)詳見官網(wǎng)
EXPORT TABLE test_db.student
PARTITION (student)
TO "hdfs://bigdata:8020/doris/student/"
WITH BROKER broker_name
(
"username" = "root"
);

Doris代碼操作
Spark
引入依賴
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-yarn_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.27</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>1.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.10.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.alibaba/druid -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.1.10</version>
</dependency>
</dependencies>
讀取doris數(shù)據(jù)
object ReadDoris {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("testReadDoris").setMaster("local[*]")
val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
val df = sparkSession.read.format("jdbc")
.option("url", "jdbc:mysql://bigdata:9030/test_db")
.option("user", "root")
.option("password", "root")
.option("dbtable", "student")
.load()
df.show()
sparkSession.close();
}
}

Flink
引入依賴
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.14.3</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.14.3</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.14.3</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.16</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>1.14.3</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch7_2.12</artifactId>
<version>1.14.3</version>
</dependency>
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.12</artifactId>
<version>1.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.12</artifactId>
<version>1.14.3</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>1.14.3</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>1.14.3</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>1.14.3</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.12</artifactId>
<version>1.14.3</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.23</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>1.14.3</version>
</dependency>
</dependencies>
讀取數(shù)據(jù)
public static void main(String[] args) {
EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
TableEnvironment tEnv = TableEnvironment.create(settings);
String sourceSql = "CREATE TABLE student (\n" +
"`id` Integer,\n" +
"`name` STRING,\n" +
"`age` Integer\n" +
")WITH (\n" +
"'connector'='jdbc',\n" +
"'url' = 'jdbc:mysql://bigdata:9030/test_db',\n" +
"'username'='root',\n" +
"'password'='root',\n" +
"'table-name'='student'\n" +
")";
tEnv.executeSql(sourceSql);
Table table = tEnv.sqlQuery("select * from student");
table.execute().print();
}


