StreamingProSpark Streaming 框架
概述
Spark 是一個(gè)可擴(kuò)展的可編程框架,用于數(shù)據(jù)集的大規(guī)模分布式處理, 稱為彈性分布式數(shù)據(jù)集(Resilient Distributed Datasets,RDD)。
Spark Streaming 是 Spark API 核心的擴(kuò)展,它支持來自各種來源的流處理。
StreamingPro 是一個(gè)可擴(kuò)展、可編程的 Spark Streaming 框架(也包括 Spark,Storm),可以輕松地用于構(gòu)建流式應(yīng)用。
StreamingPro 支持以 Spark、Flink 等作為底層分布式計(jì)算引擎,通過一套統(tǒng)一的配置文件完成批處理、流式計(jì)算與Rest 服務(wù)的開發(fā)。 特點(diǎn)有:
-
使用Json描述文件完成流式,批處理的開發(fā),不用寫代碼。
-
支持SQL Server,支持XSQL/MLSQL(重點(diǎn)),完成批處理,機(jī)器學(xué)習(xí),即席查詢等功能。
-
標(biāo)準(zhǔn)化輸入輸出,支持UDF函數(shù)注冊(cè),支持自定義模塊開發(fā)
-
支持Web化管理Spark應(yīng)用的啟動(dòng),監(jiān)控
如果更細(xì)節(jié)好處有:
-
跨版本:StreamingPro可以讓你不用任何變更就可以輕易的運(yùn)行在spark 1.6/2.1/2.2上。
-
新語法:提供了新的DSl查詢語法/Json配置語法
-
程序的管理工具:提供web界面啟動(dòng)/監(jiān)控 Spark 程序
-
功能增強(qiáng):2.1之后Structured Streaming 不支持kafka 0.8/0.9 ,Structured,此外還有比如spark streaming 支持offset 保存等
-
簡(jiǎn)化Spark SQL Server搭建成本:提供rest接口/thrift 接口,支持spark sql server 的負(fù)載均衡,自動(dòng)將driver 注冊(cè)到zookeeper上
-
探索更多的吧
項(xiàng)目模塊說明
| 模塊名 | 描述 | 備注 |
|---|---|---|
| streamingpro-commons | 一些基礎(chǔ)工具類 | |
| streamingpro-spark-common | Spark有多個(gè)版本,所以可以共享一些基礎(chǔ)的東西 | |
| streamingpro-flink | streamingpro對(duì)flink的支持 | |
| streamingpro-spark | streamingpro對(duì)spark 1.6.x的支持 | |
| streamingpro-spark-2.0 | streamingpro對(duì)spark 2.x的支持 | |
| streamingpro-api | streamingpro把底層的spark API暴露出來,方便用戶靈活處理問題 | |
| streamingpro-manager | 通過該模塊,可以很方便的通過web界面啟動(dòng),管理,監(jiān)控 spark相關(guān)的應(yīng)用 | |
| streamingpro-dls | 自定義connect,load,select,save,train,register等語法,便于用類似sql的方式做批處理任務(wù),機(jī)器學(xué)習(xí)等 |
相關(guān)概念
如果你使用StreamingPro,那么所有的工作都是在編輯一個(gè)Json配置文件。通常一個(gè)處理流程,會(huì)包含三個(gè)概念:
-
多個(gè)輸入
-
多個(gè)連續(xù)/并行的數(shù)據(jù)處理
-
多個(gè)輸出
StreamingPro會(huì)通過'compositor'的概念來描述他們,你可以理解為一個(gè)處理單元。一個(gè)典型的輸入compositor如下:
{
"name": "batch.sources",
"params": [
{
"path": "file:///tmp/hdfsfile/abc.txt",
"format": "json",
"outputTable": "test"
},
{
"path": "file:///tmp/parquet/",
"format": "parquet",
"outputTable": "test2"
}
]
}
batch.sources 就是一個(gè)compositor的名字。 這個(gè)compositor 把一個(gè)本地磁盤的文件映射成了一張表,并且告知系統(tǒng),abc.txt里的內(nèi)容 是json格式的。這樣,我們?cè)诤罄m(xù)的compositor模塊就可以使用這個(gè)test表名了。通常,StreamingPro希望整個(gè)處理流程, 也就是不同的compositor都采用表來進(jìn)行銜接。
StreamingPro不僅僅能做批處理,還能做流式,流式支持Spark Streaming,Structured Streaming。依然以輸入compositor為例,假設(shè) 我們使用的是Structured Streaming,則可以如下配置。
{
"name": "ss.sources",
"params": [
{
"format": "kafka9",
"outputTable": "test",
"kafka.bootstrap.servers": "127.0.0.1:9092",
"topics": "test",
"path": "-"
},
{
"format": "com.databricks.spark.csv",
"outputTable": "sample",
"header": "true",
"path": "/Users/allwefantasy/streamingpro/sample.csv"
}
]
}
第一個(gè)表示我們對(duì)接的數(shù)據(jù)源是kafka 0.9,我們把Kafka的數(shù)據(jù)映射成表test。 因?yàn)槲覀兛赡苓€需要一些元數(shù)據(jù),比如ip和城市的映射關(guān)系, 所以我們還可以配置一些其他的非流式的數(shù)據(jù)源,我們這里配置了一個(gè)smaple.csv文件,并且命名為表sample。
如果你使用的是kafka >= 1.0,則 topics 參數(shù)需要換成'subscribe',并且使用時(shí)可能需要對(duì)內(nèi)容做下轉(zhuǎn)換,類似:
select CAST(key AS STRING) as k, CAST(value AS STRING) as v from test
啟動(dòng)時(shí),你需要把-streaming.platform 設(shè)置為 ss。
如果我們的輸入輸出都是Hive的話,可能就不需要batch.sources/batch.outputs 等組件了,通常一個(gè)batch.sql就夠了。比如:
"without-sources-job": {
"desc": "-",
"strategy": "spark",
"algorithm": [],
"ref": [],
"compositor": [
{
"name": "batch.sql",
"params": [
{
"sql": "select * from hiveTable",
"outputTableName": "puarquetTable"
}
]
},
{
"name": "batch.outputs",
"params": [
{
"format": "parquet",
"inputTableName": "puarquetTable",
"path": "/tmp/wow",
"mode": "Overwrite"
}
]
}
],
"configParams": {
}
}
在批處理里,batch.sources/batch.outputs 都是可有可無的,但是對(duì)于流式程序,stream.sources/stream.outputs/ss.sources/ss.outputs 則是必須的。
StreamingPro的一些參數(shù)
| Property Name | Default | Meaning |
|---|---|---|
| streaming.name | (none) required | 等價(jià)于 spark.app.name |
| streaming.master | (none) required | 等價(jià)于 spark.master |
| streaming.duration | 10 seconds | spark streaming 周期,默認(rèn)單位為秒 |
| streaming.rest | true/false,default is false | 是否提供http接口 |
| streaming.spark.service | true/false,default is false | 開啟該選項(xiàng)時(shí),streaming.platform必須為spark. 該選項(xiàng)會(huì)保證spark實(shí)例不會(huì)退出 |
| streaming.platform | spark/spark_streaming/ss/flink,default is spark | 基于什么平臺(tái)跑 |
| streaming.checkpoint | (none) | spark streaming checkpoint 目錄 |
| streaming.kafka.offsetPath | (none) | kafka的偏移量保存目錄。如果沒有設(shè)置,會(huì)保存在內(nèi)存中 |
| streaming.driver.port | 9003 | 配置streaming.rest使用,streaming.rest為true,你可以設(shè)置一個(gè)http端口 |
| streaming.spark.hadoop.* | (none) | hadoop configuration,eg. -streaming.spark.hadoop.fs.defaultFS hdfs://name:8020 |
| streaming.job.file.path | (none) | 配置文件路徑,默認(rèn)從hdfs加載 |
| streaming.jobs | (none) | json配置文件里的job名稱,按逗號(hào)分隔。如果沒有配置該參數(shù),默認(rèn)運(yùn)行所有job |
| streaming.zk.servers | (none) | 如果把spark作為一個(gè)server,那么streamingpro會(huì)把driver地址注冊(cè)到zookeeper上 |
| streaming.zk.conf_root_dir | (none) | 配置streaming.zk.servers使用 |
| streaming.enableHiveSupport | false | 是否支持Hive |
| streaming.thrift | false | 是否thrift server |
| streaming.sql.source.[name].[參數(shù)] | (none) | batch/ss/stream.sources 中,你可以替換里面的任何一個(gè)參數(shù) |
| streaming.sql.out.[name].[參數(shù)] | (none) | batch/ss/stream.outputs 中,你可以替換里面的任何一個(gè)參數(shù) |
| streaming.sql.params.[param-name] | (none) | batch/ss/stream.sql中,你是可以寫表達(dá)式的,比如 select * from :table, 之后你可以通過命令行傳遞該table參數(shù) |
后面三個(gè)參數(shù)值得進(jìn)一步說明:
假設(shè)我們定義了兩個(gè)數(shù)據(jù)源,firstSource,secondSource,描述如下:
{
"name": "batch.sources",
"params": [
{
"name":"firstSource",
"path": "file:///tmp/sample_article.txt",
"format": "com.databricks.spark.csv",
"outputTable": "article",
"header":true
},
{
"name":"secondSource",
"path": "file:///tmp/sample_article2.txt",
"format": "com.databricks.spark.csv",
"outputTable": "article2",
"header":true
}
]
}
我們希望path不是固定的,而是啟動(dòng)時(shí)候決定的,這個(gè)時(shí)候,我們可以在啟動(dòng)腳本中使用-streaming.sql.source.[name].[參數(shù)] 來完成這個(gè)需求。 比如:
-streaming.sql.source.firstSource.path file:///tmp/wow.txt
這個(gè)時(shí)候,streamingpro啟動(dòng)的時(shí)候會(huì)動(dòng)態(tài)將path 替換成你要的。包括outputTable等都是可以替換的。
有時(shí)候我們需要定時(shí)執(zhí)行一個(gè)任務(wù),而sql語句也是動(dòng)態(tài)變化的,具體如下:
{
"name": "batch.sql",
"params": [
{
"sql": "select * from test where hp_time=:today",
"outputTableName": "finalOutputTable"
}
]
},
這個(gè)時(shí)候我們?cè)趩?dòng)streamingpro的時(shí)候,通過參數(shù):
-streaming.sql.params.today "2017"
動(dòng)態(tài)替換 sql語句里的:today
