<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          StreamingProSpark Streaming 框架

          聯(lián)合創(chuàng)作 · 2023-09-30 20:37

          概述

          Spark 是一個(gè)可擴(kuò)展的可編程框架,用于數(shù)據(jù)集的大規(guī)模分布式處理, 稱為彈性分布式數(shù)據(jù)集(Resilient Distributed Datasets,RDD)。

          Spark Streaming 是 Spark API 核心的擴(kuò)展,它支持來自各種來源的流處理。

          StreamingPro 是一個(gè)可擴(kuò)展、可編程的 Spark Streaming 框架(也包括 Spark,Storm),可以輕松地用于構(gòu)建流式應(yīng)用。

          StreamingPro 支持以 Spark、Flink 等作為底層分布式計(jì)算引擎,通過一套統(tǒng)一的配置文件完成批處理、流式計(jì)算與Rest 服務(wù)的開發(fā)。 特點(diǎn)有:

          1. 使用Json描述文件完成流式,批處理的開發(fā),不用寫代碼。

          2. 支持SQL Server,支持XSQL/MLSQL(重點(diǎn)),完成批處理,機(jī)器學(xué)習(xí),即席查詢等功能。

          3. 標(biāo)準(zhǔn)化輸入輸出,支持UDF函數(shù)注冊(cè),支持自定義模塊開發(fā)

          4. 支持Web化管理Spark應(yīng)用的啟動(dòng),監(jiān)控

          如果更細(xì)節(jié)好處有:

          1. 跨版本:StreamingPro可以讓你不用任何變更就可以輕易的運(yùn)行在spark 1.6/2.1/2.2上。

          2. 新語法:提供了新的DSl查詢語法/Json配置語法

          3. 程序的管理工具:提供web界面啟動(dòng)/監(jiān)控 Spark 程序

          4. 功能增強(qiáng):2.1之后Structured Streaming 不支持kafka 0.8/0.9 ,Structured,此外還有比如spark streaming 支持offset 保存等

          5. 簡(jiǎn)化Spark SQL Server搭建成本:提供rest接口/thrift 接口,支持spark sql server 的負(fù)載均衡,自動(dòng)將driver 注冊(cè)到zookeeper上

          6. 探索更多的吧

          項(xiàng)目模塊說明

          模塊名 描述 備注
          streamingpro-commons 一些基礎(chǔ)工具類  
          streamingpro-spark-common Spark有多個(gè)版本,所以可以共享一些基礎(chǔ)的東西  
          streamingpro-flink streamingpro對(duì)flink的支持  
          streamingpro-spark streamingpro對(duì)spark 1.6.x的支持  
          streamingpro-spark-2.0 streamingpro對(duì)spark 2.x的支持  
          streamingpro-api streamingpro把底層的spark API暴露出來,方便用戶靈活處理問題  
          streamingpro-manager 通過該模塊,可以很方便的通過web界面啟動(dòng),管理,監(jiān)控 spark相關(guān)的應(yīng)用  
          streamingpro-dls 自定義connect,load,select,save,train,register等語法,便于用類似sql的方式做批處理任務(wù),機(jī)器學(xué)習(xí)等  

          相關(guān)概念

          如果你使用StreamingPro,那么所有的工作都是在編輯一個(gè)Json配置文件。通常一個(gè)處理流程,會(huì)包含三個(gè)概念:

          1. 多個(gè)輸入

          2. 多個(gè)連續(xù)/并行的數(shù)據(jù)處理

          3. 多個(gè)輸出

          StreamingPro會(huì)通過'compositor'的概念來描述他們,你可以理解為一個(gè)處理單元。一個(gè)典型的輸入compositor如下:

          {
                  "name": "batch.sources",
                  "params": [
                    {
                      "path": "file:///tmp/hdfsfile/abc.txt",
                      "format": "json",
                      "outputTable": "test"
          
                    },
                     {
                        "path": "file:///tmp/parquet/",
                        "format": "parquet",
                        "outputTable": "test2"
          
                      }
                  ]
          }

           

          batch.sources 就是一個(gè)compositor的名字。 這個(gè)compositor 把一個(gè)本地磁盤的文件映射成了一張表,并且告知系統(tǒng),abc.txt里的內(nèi)容 是json格式的。這樣,我們?cè)诤罄m(xù)的compositor模塊就可以使用這個(gè)test表名了。通常,StreamingPro希望整個(gè)處理流程, 也就是不同的compositor都采用表來進(jìn)行銜接。

          StreamingPro不僅僅能做批處理,還能做流式,流式支持Spark Streaming,Structured Streaming。依然以輸入compositor為例,假設(shè) 我們使用的是Structured Streaming,則可以如下配置。

          {
                  "name": "ss.sources",
                  "params": [
                    {
                      "format": "kafka9",
                      "outputTable": "test",
                      "kafka.bootstrap.servers": "127.0.0.1:9092",
                      "topics": "test",
                      "path": "-"
                    },
                    {
                      "format": "com.databricks.spark.csv",
                      "outputTable": "sample",
                      "header": "true",
                      "path": "/Users/allwefantasy/streamingpro/sample.csv"
                    }
                  ]
                }

           

          第一個(gè)表示我們對(duì)接的數(shù)據(jù)源是kafka 0.9,我們把Kafka的數(shù)據(jù)映射成表test。 因?yàn)槲覀兛赡苓€需要一些元數(shù)據(jù),比如ip和城市的映射關(guān)系, 所以我們還可以配置一些其他的非流式的數(shù)據(jù)源,我們這里配置了一個(gè)smaple.csv文件,并且命名為表sample。

          如果你使用的是kafka >= 1.0,則 topics 參數(shù)需要換成'subscribe',并且使用時(shí)可能需要對(duì)內(nèi)容做下轉(zhuǎn)換,類似:

          select CAST(key AS STRING) as k, CAST(value AS STRING) as v from test

           

          啟動(dòng)時(shí),你需要把-streaming.platform 設(shè)置為 ss

          如果我們的輸入輸出都是Hive的話,可能就不需要batch.sources/batch.outputs 等組件了,通常一個(gè)batch.sql就夠了。比如:

          "without-sources-job": {
              "desc": "-",
              "strategy": "spark",
              "algorithm": [],
              "ref": [],
              "compositor": [
                {
                  "name": "batch.sql",
                  "params": [
                    {
                      "sql": "select * from hiveTable",
                      "outputTableName": "puarquetTable"
                    }
                  ]
                },
                {
                  "name": "batch.outputs",
                  "params": [
                    {
                      "format": "parquet",
                      "inputTableName": "puarquetTable",
                      "path": "/tmp/wow",
                      "mode": "Overwrite"
                    }
                  ]
                }
              ],
              "configParams": {
              }
            }

           

          在批處理里,batch.sources/batch.outputs 都是可有可無的,但是對(duì)于流式程序,stream.sources/stream.outputs/ss.sources/ss.outputs 則是必須的。

          StreamingPro的一些參數(shù)

          Property Name Default Meaning
          streaming.name (none) required 等價(jià)于 spark.app.name
          streaming.master (none) required 等價(jià)于 spark.master
          streaming.duration 10 seconds spark streaming 周期,默認(rèn)單位為秒
          streaming.rest true/false,default is false 是否提供http接口
          streaming.spark.service true/false,default is false 開啟該選項(xiàng)時(shí),streaming.platform必須為spark. 該選項(xiàng)會(huì)保證spark實(shí)例不會(huì)退出
          streaming.platform spark/spark_streaming/ss/flink,default is spark 基于什么平臺(tái)跑
          streaming.checkpoint (none) spark streaming checkpoint 目錄
          streaming.kafka.offsetPath (none) kafka的偏移量保存目錄。如果沒有設(shè)置,會(huì)保存在內(nèi)存中
          streaming.driver.port 9003 配置streaming.rest使用,streaming.rest為true,你可以設(shè)置一個(gè)http端口
          streaming.spark.hadoop.* (none) hadoop configuration,eg. -streaming.spark.hadoop.fs.defaultFS hdfs://name:8020
          streaming.job.file.path (none) 配置文件路徑,默認(rèn)從hdfs加載
          streaming.jobs (none) json配置文件里的job名稱,按逗號(hào)分隔。如果沒有配置該參數(shù),默認(rèn)運(yùn)行所有job
          streaming.zk.servers (none) 如果把spark作為一個(gè)server,那么streamingpro會(huì)把driver地址注冊(cè)到zookeeper上
          streaming.zk.conf_root_dir (none) 配置streaming.zk.servers使用
          streaming.enableHiveSupport false 是否支持Hive
          streaming.thrift false 是否thrift server
          streaming.sql.source.[name].[參數(shù)] (none) batch/ss/stream.sources 中,你可以替換里面的任何一個(gè)參數(shù)
          streaming.sql.out.[name].[參數(shù)] (none) batch/ss/stream.outputs 中,你可以替換里面的任何一個(gè)參數(shù)
          streaming.sql.params.[param-name] (none) batch/ss/stream.sql中,你是可以寫表達(dá)式的,比如 select * from :table, 之后你可以通過命令行傳遞該table參數(shù)

          后面三個(gè)參數(shù)值得進(jìn)一步說明:

          假設(shè)我們定義了兩個(gè)數(shù)據(jù)源,firstSource,secondSource,描述如下:

          {
                  "name": "batch.sources",
                  "params": [
                    {
                      "name":"firstSource",
                      "path": "file:///tmp/sample_article.txt",
                      "format": "com.databricks.spark.csv",
                      "outputTable": "article",
                      "header":true
                    },
                    {
                        "name":"secondSource",
                        "path": "file:///tmp/sample_article2.txt",
                        "format": "com.databricks.spark.csv",
                        "outputTable": "article2",
                        "header":true
                      }
                  ]
                }

           

          我們希望path不是固定的,而是啟動(dòng)時(shí)候決定的,這個(gè)時(shí)候,我們可以在啟動(dòng)腳本中使用-streaming.sql.source.[name].[參數(shù)] 來完成這個(gè)需求。 比如:

          -streaming.sql.source.firstSource.path  file:///tmp/wow.txt

           

          這個(gè)時(shí)候,streamingpro啟動(dòng)的時(shí)候會(huì)動(dòng)態(tài)將path 替換成你要的。包括outputTable等都是可以替換的。

          有時(shí)候我們需要定時(shí)執(zhí)行一個(gè)任務(wù),而sql語句也是動(dòng)態(tài)變化的,具體如下:

          {
                  "name": "batch.sql",
                  "params": [
                    {
                      "sql": "select * from test where hp_time=:today",
                      "outputTableName": "finalOutputTable"
                    }
                  ]
                },

           

          這個(gè)時(shí)候我們?cè)趩?dòng)streamingpro的時(shí)候,通過參數(shù):

          -streaming.sql.params.today  "2017"

           

          動(dòng)態(tài)替換 sql語句里的:today

          瀏覽 15
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          編輯 分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          編輯 分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  久热青草视频 | 欧美第一区 | 大香蕉超碰成人网 | 日本中文字幕爱爱 | 男人的天堂伊人 |