MongoDB + Spark: 完整的大數(shù)據(jù)解決方案
Spark介紹
按照官方的定義,Spark 是一個通用,快速,適用于大規(guī)模數(shù)據(jù)的處理引擎。
通用性:我們可以使用Spark SQL來執(zhí)行常規(guī)分析, Spark Streaming 來流數(shù)據(jù)處理, 以及用Mlib來執(zhí)行機器學(xué)習(xí)等。Java,python,scala及R語言的支持也是其通用性的表現(xiàn)之一。
快速:這個可能是Spark成功的最初原因之一,主要歸功于其基于內(nèi)存的運算方式。當(dāng)需要處理的數(shù)據(jù)需要反復(fù)迭代時,Spark可以直接在內(nèi)存中暫存數(shù)據(jù),而無需像Map Reduce一樣需要把數(shù)據(jù)寫回磁盤。官方的數(shù)據(jù)表明:它可以比傳統(tǒng)的Map Reduce快上100倍。
大規(guī)模:原生支持HDFS,并且其計算節(jié)點支持彈性擴展,利用大量廉價計算資源并發(fā)的特點來支持大規(guī)模數(shù)據(jù)處理。
我們能用它做什么
那我們能用Spark來做什么呢?場景數(shù)不勝數(shù)。
最簡單的可以只是統(tǒng)計一下某一個頁面多少點擊量,復(fù)雜的可以通過機器學(xué)習(xí)來預(yù)測。
個性化?是一個常見的案例,比如說,Yahoo的網(wǎng)站首頁使用Spark來實現(xiàn)快速的用戶興趣分析。應(yīng)該在首頁顯示什么新聞?原始的做法是讓用戶選擇分類;聰明的做法就是在用戶交互的過程中揣摩用戶可能喜歡的文章。另一方面就是要在新聞進來時候進行分析并確定什么樣的用戶是可能的受眾。新聞的時效性非常高,按照常規(guī)的MapReduce做法,對于Yahoo幾億用戶及海量的文章,可能需要計算一天才能得出結(jié)果。Spark的高效運算可以在這里得到充分的運用,來保證新聞推薦在數(shù)十分鐘或更短時間內(nèi)完成。另外,如美國最大的有線電視商Comcast用它來做節(jié)目推薦,最近剛和滴滴聯(lián)姻的uber用它實時訂單分析,優(yōu)酷則在Spark上實現(xiàn)了商業(yè)智能的升級版。
Spark生態(tài)系統(tǒng)
在我們開始談MongoDB 和Spark 之前,我們首先來了解一下Spark的生態(tài)系統(tǒng)。Spark 作為一個大型分布式計算框架,需要和其他組件一起協(xié)同工作。
在Hdaoop里面,HDFS是其核心,作為一個數(shù)據(jù)層。
Spark是Hadoop生態(tài)系統(tǒng)的一顆新星,原生就支持HDFS。大家知道HDFS是用來管理大規(guī)模非結(jié)構(gòu)化數(shù)據(jù)的存儲系統(tǒng),具有高可用和巨大的橫向擴展能力。
而作為一個橫向擴展的分布式集群,資源管理是其核心必備的能力,Spark 可以通過YARN或者MESOS來負(fù)責(zé)資源(CPU)分配和任務(wù)調(diào)度。如果你不需要管理節(jié)點的高可用需求,你也可以直接使用Spark standalone。
在有了數(shù)據(jù)層和資源管理層后, 接下來就是我們真正的計算引擎。
Hadoop技術(shù)的兩大基石之一的MapReduce就是用來實現(xiàn)集群大規(guī)模并行計算。而現(xiàn)在就多了一個選項:Spark。Map Reduce的特點是,用4個字來概括,簡單粗暴。采用divide & conquer戰(zhàn)術(shù),我們可以用Map Reduce來處理PB級的數(shù)據(jù)。而Spark 作為打了雞血的Map Reduce增強版,利用了內(nèi)存價格大量下降的時代因素,充分把計算所用變量和中間結(jié)果放到內(nèi)存里,并且提供了一整套機器學(xué)習(xí)的分析算法,在加上很多語言的支持,使之成為一個較之于Map Reduce更加優(yōu)秀的選擇。
由于MapReduce 是一個相對并不直觀的程序接口,所以為了方便使用,一系列的高層接口如Hive或者Pig應(yīng)運而生。Hive可以讓我們使用非常熟悉的SQL語句的方式來做一些常見的統(tǒng)計分析工作。同理,在Spark 引擎層也有類似的封裝,如Spark SQL、RDD以及2.0版本新推出的Dataframe等。
所以一個完整的大數(shù)據(jù)解決方案,包含了存儲,資源管理,計算引擎及接口層。那么問題來了:我們畫了這么大這么圓的大餅,MongoDB可以吃哪一塊呢?

MongoDB是個什么?是個database。所以自然而然,MongoDB可以擔(dān)任的角色,就是數(shù)據(jù)存儲的這一部分。在和 Spark一起使用的時候,MongoDB就可以扮演HDFS的角色來為Spark提供計算的原始數(shù)據(jù),以及用來持久化分析計算的結(jié)果。
HDFS vs. MongoDB
既然我們說MongoDB可以用在HDFS的地方,那我們來詳細(xì)看看兩者之間的差異性。
在說區(qū)別之前,其實我們可以先來注意一下兩者的共同點。HDFS和MongoDB都是基于廉價x86服務(wù)器的橫向擴展架構(gòu),都能支持到TB到PB級的數(shù)據(jù)量。數(shù)據(jù)會在多節(jié)點自動備份,來保證數(shù)據(jù)的高可用和冗余。兩者都支持非結(jié)構(gòu)化數(shù)據(jù)的存儲,等等。
但是,HDFS和MongoDB更多的是差異點:
如在存儲方式上 HDFS的存儲是以文件為單位,每個文件64MB到128MB不等。而MongoDB則是細(xì)顆?;?、以文檔為單位的存儲。
HDFS不支持索引的概念,對數(shù)據(jù)的操作局限于掃描性質(zhì)的讀,MongoDB則支持基于二級索引的快速檢索。
MongoDB可以支持常見的增刪改查場景,而HDFS一般只是一次寫入后就很難進行修改。
從響應(yīng)時間上來說,HDFS一般是分鐘級別而MongoDB對手請求的響應(yīng)時間通常以毫秒作為單位。
一個日志的例子
如果說剛才的比較有些抽象,我們可以結(jié)合一個實際一點的例子來理解。
比如說,一個比較經(jīng)典的案例可能是日志記錄管理。在HDFS里面你可能會用日期范圍來命名文件,如7月1日,7月2日等等,每個文件是個日志文本文件,可能會有幾萬到幾十萬行日志。
而在MongoDB里面,我們可以采用一個JSON的格式,每一條日志就是一個JSON document。我們可以對某幾個關(guān)心的字段建索引,如時間戳,錯誤類型等。
我們來考慮一些場景,加入我們相對7月份所有日志做一些全量的統(tǒng)計,比如每個頁面的所有點擊量,那么這個HDFS和MongoDB都可以正常處理。
如果有一天你的經(jīng)理告訴你:他想知道網(wǎng)站上每天有多少404錯誤在發(fā)生,這個時候如果你用HDFS,就還是需要通過全量掃描所有行,而MongoDB則可以通過索引,很快地找到所有的404日志,可能花數(shù)秒鐘就可以解答你經(jīng)理的問題。
又比如說,如果你希望對每個日志項加一個自定義的屬性,在進行一些預(yù)處理后,MongoDB就會比較容易地支持到。而一般來說,HDFS是不支持更新類型操作的。
好的,我們了解了MongoDB為什么可以替換HDFS并且為什么有這個必要來做這個事情,下面我們就來看看Spark和MongoDB怎么玩!
Spark + MongoDB
Spark的工作流程可以概括為三部曲:創(chuàng)建并發(fā)任務(wù),對數(shù)據(jù)進行transformation操作,如map, filter,union,intersect等,然后執(zhí)行運算,如reduce,count,或者簡單地收集結(jié)果。
這里是Spark和MongoDB部署的一個典型架構(gòu)。
Spark任務(wù)一般由Spark的driver節(jié)點發(fā)起,經(jīng)過Spark Master進行資源調(diào)度分發(fā)。比如這里我們有4個Spark worker節(jié)點,這些節(jié)點上的幾個executor 計算進程就會同時開始工作。一般一個core就對應(yīng)一個executor。
每個executor會獨立的去MongoDB取來原始數(shù)據(jù),直接套用Spark提供的分析算法或者使用自定義流程來處理數(shù)據(jù),計算完后把相應(yīng)結(jié)果寫回到MongoDB。
我們需要提到的是:在這里,所有和MongoDB的交互都是通過一個叫做Mongo-Spark的連接器來完成的。

另一種常見的架構(gòu)是結(jié)合MongoDB和HDFS的。Hadoop在非結(jié)構(gòu)化數(shù)據(jù)處理的場景下要比MongoDB的普及率高。所以我們可以看到不少用戶會已經(jīng)將數(shù)據(jù)存放在HDFS上。這個時候你可以直接在HDFS上面架Spark來跑,Spark從HDFS取來原始數(shù)據(jù)進行計算,而MongoDB在這個場景下是用來保存處理結(jié)果。為什么要這么麻煩?幾個原因:
Spark處理結(jié)果數(shù)量可能會很大,比如說,個性化推薦可能會產(chǎn)生數(shù)百萬至數(shù)千萬條記錄,需要一個能夠支持每秒萬級寫入能力的數(shù)據(jù)庫
處理結(jié)果可以直接用來驅(qū)動前臺APP,如用戶打開頁面時獲取后臺已經(jīng)為他準(zhǔn)備好的推薦列表。
Mongo Spark Connector 連接器
在這里我們在介紹下MongoDB官方提供的Mongo Spark連接器?。目前有3個連接器可用,包括社區(qū)第三方開發(fā)的和之前Mongo Hadoop連接器等,這個Mongo-Spark是最新的,也是我們推薦的連接方案。
這個連接器是專門為Spark打造的,支持雙向數(shù)據(jù),讀出和寫入。但是最關(guān)鍵的是條件下推,也就是說:如果你在Spark端指定了查詢或者限制條件的情況下,這個條件會被下推到MongoDB去執(zhí)行,這樣可以保證從MongoDB取出來、經(jīng)過網(wǎng)絡(luò)傳輸?shù)絊park計算節(jié)點的數(shù)據(jù)確實都是用得著的。沒有下推支持的話,每次操作很可能需要從MongoDB讀取全量的數(shù)據(jù),性能體驗將會很糟糕。拿剛才的日志例子來說,如果我們只想對404錯誤日志進行分析,看那些錯誤都是哪些頁面,以及每天錯誤頁面數(shù)量的變化,如果有條件下推,那么我們可以給MongoDB一個限定條件:錯誤代碼=404, 這個條件會在MongoDB服務(wù)器端執(zhí)行,這樣我們只需要通過網(wǎng)絡(luò)傳輸可能只是全部日志的0.1%的數(shù)據(jù),而不是沒有條件下推情況下的全部數(shù)據(jù)。
另外,這個最新的連接器還支持和Spark計算節(jié)點Co-Lo 部署。就是說在同一個節(jié)點上同時部署Spark實例和MongoDB實例。這樣做可以減少數(shù)據(jù)在網(wǎng)絡(luò)上的傳輸帶來的資源消耗及時延。當(dāng)然,這種部署方式需要注意內(nèi)存資源和CPU資源的隔離。隔離的方式可以通過Linux的cgroups。
Spark + MongoDB 成功案例
目前已經(jīng)有很多案例在不同的應(yīng)用場景中使用Spark+MongoDB。
法國航空是法國最大的航空公司,為了提高客戶體驗,在最近施行的360度客戶視圖中,使用Spark對已經(jīng)收集在MongoDB里面的客戶數(shù)據(jù)進行分類及行為分析,并把結(jié)果(如客戶的類別、標(biāo)簽等信息)寫回到MongoDB內(nèi)每一個客戶的文檔結(jié)構(gòu)里。
Stratio是美國硅谷一家著名的金融大數(shù)據(jù)公司。他們最近在一家在31個國家有分支機構(gòu)的跨國銀行實施了一個實時監(jiān)控平臺。該銀行希望通過對日志的監(jiān)控和分析來保證客戶服務(wù)的響應(yīng)時間以及實時監(jiān)測一些可能的違規(guī)或者金融欺詐行為。在這個應(yīng)用內(nèi), 他們使用了:
Stratio是美國硅谷一家著名的金融大數(shù)據(jù)公司。他們最近在一家在31個國家有分支機構(gòu)的跨國銀行實施了一個實時監(jiān)控平臺。該銀行希望通過對日志的監(jiān)控和分析來保證客戶服務(wù)的響應(yīng)時間以及實時監(jiān)測一些可能的違規(guī)或者金融欺詐行為。在這個應(yīng)用內(nèi), 他們使用了:
Apache Flume 來收集log
Spark來處理實時的log
MongoDB來存儲收集的log以及Spark分析的結(jié)果,如Key Performance Indicators等
東方航空最近剛完成一個Spark運價的POC測試。
東方航空的挑戰(zhàn)
東方航空作為國內(nèi)的3大行之一,每天有1000多個航班,服務(wù)26萬多乘客。過去,顧客在網(wǎng)站上訂購機票,平均資料庫查詢200次就會下單訂購機票,但是現(xiàn)在平均要查詢1.2萬次才會發(fā)生一次訂購行為,同樣的訂單量,查詢量卻成長百倍。按照50%直銷率這個目標(biāo)計算,東航的運價系統(tǒng)要支持每天16億的運價請求。
思路:空間換時間
當(dāng)前的運價是通過實時計算的,按照現(xiàn)在的計算能力,需要對已有系統(tǒng)進行100多倍的擴容。另一個常用的思路,就是采用空間換時間的方式。與其對每一次的運價請求進行耗時300ms的運算,不如事先把所有可能的票價查詢組合窮舉出來并進行批量計算,然后把結(jié)果存入MongoDB里面。當(dāng)需要查詢運價時,直接按照 出發(fā)+目的地+日期的方式做一個快速的DB查詢,響應(yīng)時間應(yīng)該可以做到幾十毫秒。
那為什么要用MongoDB?因為我們要處理的數(shù)據(jù)量龐大無比。按照1000多個航班,365天,26個倉位,100多渠道以及數(shù)個不同的航程類型,我們要實時存取的運價記錄有數(shù)十億條之多。這個已經(jīng)遠(yuǎn)遠(yuǎn)超出常規(guī)RDBMS可以承受的范圍。
MongoDB基于內(nèi)存緩存的數(shù)據(jù)管理方式?jīng)Q定了對并發(fā)讀寫的響應(yīng)可以做到很低延遲,水平擴展的方式可以通過多臺節(jié)點同時并發(fā)處理海量請求。
事實上,全球最大的航空分銷商,管理者全世界95%航空庫存的Amadeus也正是使用MongoDB作為其1000多億運價緩存的存儲方案。
Spark + MongoDB 方案
我們知道MongoDB可以用來做我們海量運價數(shù)據(jù)的存儲方案,在大規(guī)模并行計算方案上,就可以用到嶄新的Spark技術(shù)。
這里是一個運價系統(tǒng)的架構(gòu)圖。左邊是發(fā)起航班查詢請求的客戶端,首先會有API服務(wù)器進行預(yù)處理。一般航班請求會分為庫存查詢和運價查詢。庫存查詢會直接到東航已有的庫存系統(tǒng)(Seat Inventory),同樣是實現(xiàn)在MongoDB上面的。在確定庫存后根據(jù)庫存結(jié)果再從Fare Cache系統(tǒng)內(nèi)查詢相應(yīng)的運價。
Spark集群則是另外一套計算集群,通過Spark MongoDB連接套件和MongoDB Fare Cache集群連接。Spark 計算任務(wù)會定期觸發(fā)(如每天一次或者每4小時一次),這個任務(wù)會對所有的可能的運價組合進行全量計算,然后存入MongoDB,以供查詢使用。右半邊則把原來實時運算的集群換成了Spark+MongoDB。Spark負(fù)責(zé)批量計算一年內(nèi)所有航班所有倉位的所有價格,并以高并發(fā)的形式存儲到MongoDB里面。每秒鐘處理的運價可以達到數(shù)萬條。
當(dāng)來自客戶端的運價查詢達到服務(wù)端以后,服務(wù)端直接就向MongoDB發(fā)出按照日期,出發(fā)到達機場為條件的mongo查詢。
批處理計算流程

這里是Spark計算任務(wù)的流程圖。需要計算的任務(wù),也就是所有日期航班倉位的組合,事先已經(jīng)存放到MongoDB里面。
任務(wù)遞交到master,然后預(yù)先加載所需參考數(shù)據(jù),broadcast就是把這些在內(nèi)存里的數(shù)據(jù)復(fù)制到每一個Spark計算節(jié)點的JVM,然后所有計算節(jié)點多線程并發(fā)執(zhí)行,從Mongodb里取出需要計算的倉位,調(diào)用東航自己的運價邏輯,得出結(jié)果以后,并保存回MongoDB。
Spark 任務(wù)入口程序
Spark和MongoDB的連接使用非常簡單,下面就是一個代碼示例:
// initialization dependencies including base prices, pricing rules and some reference dataMap dependencies = MyDependencyManager.loadDependencies();// broadcasting dependencies
javaSparkContext.broadcast(dependencies);// create job rdd
cabinsRDD = MongoSpark.load(javaSparkContext).withPipeline(pipeline)// for each cabin, date, airport pair, calculate the price
cabinsRDD.map(function calc_price);// collect the result, which will cause the data to be stored into MongoDB
cabinsRDD.collect()
cabinsRDD.saveToMongo()處理能力和響應(yīng)時間比較
這里是一個在東航POC的簡單測試結(jié)果。從吞吐量的角度,新的API服務(wù)器單節(jié)點就可以處理3400個并發(fā)的運價請求。在顯著提高了并發(fā)的同時,響應(yīng)延遲則降低了10幾倍,平均10ms就可以返回運價結(jié)果。按照這個性能,6臺 API服務(wù)器就可以應(yīng)付將來每天16億的運價查詢。
Spark + MongoDB演示
接下來是一個簡單的Spark+MongoDB演示。
安裝 Spark
# curl -OL http://d3kbcqa49mib13.cloudfront.net/spark-1.6.0-bin-hadoop2.6.tgz# mkdir -p ~/spark# tar -xvf spark-1.6.0-bin-hadoop2.6.tgz -C ~/spark --strip-components=1測試連接器
# cd ~/spark# ./bin/spark-shell \--conf "spark.mongodb.input.uri=mongodb://127.0.0.1/flights.av" \
--conf "spark.mongodb.output.uri=mongodb://127.0.0.1/flights.output" \
--packages org.mongodb.spark:mongo-spark-connector_2.10:1.0.0import com.mongodb.spark._
import org.bson.DocumentMongoSpark.load(sc).take(10).foreach(println)簡單分組統(tǒng)計
數(shù)據(jù):365天,所有航班庫存信息,500萬文檔
任務(wù):按航班統(tǒng)計一年內(nèi)所有余票量
MongoSpark.load(sc)
.map(doc=>(doc.getString("flight") ,doc.getLong("seats")))
.reduceByKey((x,y)=>(x+y))
.take(10)
.foreach(println)簡單分組統(tǒng)計加條件過濾
數(shù)據(jù):365天,所有航班庫存信息,500萬文檔
任務(wù):按航班統(tǒng)計一年內(nèi)所有庫存,但是只處理昆明出發(fā)的航班
import org.bson.DocumentMongoSpark.load(sc)
.withPipeline(Seq(Document.parse("{ $match: { orig : 'KMG' } }")))
.map(doc=>(doc.getString("flight") ,doc.getLong("seats")))
.reduceByKey((x,y)=>(x+y))
.take(10)
.foreach(println)性能優(yōu)化事項
使用合適的chunksize (MB)
Total data size / chunksize = chunks = RDD partitions = spark tasks不要將所有CPU核分配給Spark
預(yù)留1-2個core給操作系統(tǒng)及其他管理進程同機部署
適當(dāng)情況可以同機部署Spark+MongoDB,利用本地IO提高性能
總結(jié)
上面只是一些簡單的演示,實際上Spark + MongoDB的使用可以通過Spark的很多種形式來使用。我們來總結(jié)一下Spark + Mongo的應(yīng)用場景。在座的同學(xué)可能很多人已經(jīng)使用了MongoDB,也有些人已經(jīng)使用了Hadoop。我們可以從兩個角度來考慮這個事情:
對那些已經(jīng)使用MongoDB的用戶,如果你希望在你的MongoDB驅(qū)動的應(yīng)用上提供個性化功能,比如說像Yahoo一樣為你找感興趣的新聞,能夠在你的MongoDB數(shù)據(jù)上利用到Spark強大的機器學(xué)習(xí)或者流處理,你就可以考慮在MongoDB集群上部署Spark來實現(xiàn)這些功能。
如果你已經(jīng)使用Hadoop而且數(shù)據(jù)已經(jīng)在HDFS里面,你可以考慮使用Spark來實現(xiàn)更加實時更加快速的分析型需求,并且如果你的分析結(jié)果有數(shù)據(jù)量大、格式多變以及這些結(jié)果數(shù)據(jù)要及時提供給前臺APP使用的需求,那么MongoDB可以用來作為你分析結(jié)果的一個存儲方案。
--end--
掃描下方二維碼 添加好友,備注【交流】 可私聊交流,也可進資源豐富學(xué)習(xí)群
