Hadoop架構(gòu):流水線(PipeLine)
流水線(PipeLine),簡單地理解就是客戶端向DataNode傳輸數(shù)據(jù)(Packet)和接收DataNode回復(fù)(ACK)[Acknowledge]的數(shù)據(jù)通路。
整條流水線由若干個DataNode串聯(lián)而成,數(shù)據(jù)由客戶端流向PipeLine,在流水線上,假如DataNode A 比 DataNode B 更接近流水線
那么稱A在B的上游(Upstream),稱B在A的下游(Downstream)。
流水線上傳輸數(shù)據(jù)步驟
1. 客戶端向整條流水線的第一個DataNode發(fā)送Packet,第一個DataNode收到Packet就向下個DataNode轉(zhuǎn)發(fā),下游DataNode照做。
2. 接收到Packet的DataNode將Packet數(shù)據(jù)寫入磁盤
3. 流水線上最后一個DataNode接收到Packet后向前一個DataNode發(fā)送ACK響應(yīng),表示自己已經(jīng)收到Packet,上游DataNode照做
4. 當(dāng)客戶端收到第一個DataNode的ACK,表明此次Packet的傳輸成功
一.流水線基礎(chǔ)概念
流水線就像一條水管,數(shù)據(jù)(Packets)從一端流進(jìn)去,依次經(jīng)過流水線上的各個DataNode。
回復(fù)(ACK)則是相反,ACK從最后一個節(jié)點依次向前傳遞,流回客戶端
多么藝術(shù)的設(shè)計!

但是,有一個問題,要知道,若干個Packet才能傳輸完一個Block,并且多個Block組成一個文件
所以從文件或者Block的角度來看,即使每臺機(jī)器的效率接近,也可能出現(xiàn)流水線不均勻的情況(接收文件數(shù)據(jù)量不均勻)

出現(xiàn)的情況往往是第一個節(jié)點接收的數(shù)據(jù)量最多,其后的節(jié)點遞減,所以我們可以考慮把第一個DataNode選為性能較好的節(jié)點,或者是離客戶端盡可能近的節(jié)點。但實際上,節(jié)點的選擇是由NameNode根據(jù)機(jī)架感知等技術(shù)實現(xiàn)的。并且客戶端的流水線節(jié)點選取是由NameNode決定的。
還有一個問題。HDFS是支持一寫多讀機(jī)制的,意味著在流水線上的DataNode(正在被寫)允許被其他客戶端讀取(Reader 以下均稱此類讀客戶端為Reader)。這樣就會產(chǎn)生讀的不一致性,比如說我在流水線上游的某個DataNode中讀到“武漢加油!”這條數(shù)據(jù),但是去下游的DataNode讀,卻讀不到。這是因為下游的DataNode可能還沒收到數(shù)據(jù)。

雖然說一般客戶端只會讀取一個DataNode的信息,但如果被讀取的DataNode宕機(jī),那么客戶端就要另選DataNode,可能造成前后數(shù)據(jù)不一致。
或者有多個客戶端需要根據(jù)對方的數(shù)據(jù)協(xié)調(diào)工作,每個客戶端讀的不是一個DataNode,那么對同一讀取目標(biāo),讀出來的數(shù)據(jù)不一致。這種水平上的不一致可能也會導(dǎo)致業(yè)務(wù)出錯。
那么,怎么解決呢?
二.流水線讀一致性設(shè)計
我們先來定義一下概念
首先提出問題,在流水線中的某個DataNode,怎么樣判斷自己的數(shù)據(jù)是否可以給Reader讀取。
就比如上面那張圖,不能一致性讀的原因是下游的DataNode3沒有接收到DataNode1已經(jīng)接收的Packet。那么如果DataNode1確定DataNode3已經(jīng)接收到Packet了,那不就能放心地把Packet的數(shù)據(jù)給Reader了嗎?就算Reader再去DataNode3讀,也會讀到同樣的數(shù)據(jù),而不會出現(xiàn)數(shù)據(jù)找不到或者數(shù)據(jù)不一致的情況。
于是有了定義:對于一個數(shù)據(jù)塊,一個DataNode接收到的數(shù)據(jù)為DR(Data Received),根據(jù)下游收到的ACK,已被下游確認(rèn)接收的數(shù)據(jù)為DA(Data Acknowledged)

順便定義:對于 i 節(jié)點的DA是DAi , DR是DRi , 對于客戶端,客戶端發(fā)出去的數(shù)據(jù)為CS(Clent Send) ,而客戶端確認(rèn)的數(shù)據(jù)為CA(Client Acknowlege)
DA和DR其實是一個增量的概念,并且針對的是一個Block。下圖是一個DataNode中的Replica(Block在DataNode中稱為Replica,強(qiáng)調(diào)是Block的副本)在逐漸被寫滿的過程

我們可以分析一下,整個流水線上,各個節(jié)點的DR和DA的走勢

以及從圖形上看,DR和DA在一來一回的流水線上的分布情況

我們發(fā)現(xiàn)Writer發(fā)送數(shù)據(jù)(第一個DataNode的DR)最多,但是確認(rèn)了的數(shù)據(jù)DA最少,原因是Packet和ACK在流水線一來一回需要路程時間
Reader直接訪問一個DataNode中Replica的數(shù)據(jù)時,需要提供四個數(shù)據(jù)<BlockId,BGS(Block Generation Stamp 可以理解成Block的版本號) , offset, len>
BlockId 和 BGS 用來識別一個Block,當(dāng)DataNode中不存在指定BlockId的Replica或者Replica的BGS比Reader給出的BGS舊,那么DataNode將拒絕這次讀請求
offset 表示Reader將從哪里開始讀取數(shù)據(jù),len表示欲讀取的數(shù)據(jù)長度,因為DA是線性增長的,所以只要保證 offset + len <= DA ,DataNode就允許這次讀請求(當(dāng)然offset 和 len 都大于0)
具體怎么做才能實現(xiàn)呢?有兩種做法。
做法一,當(dāng)其他應(yīng)用請求一個Reader客戶端讀取數(shù)據(jù)的時候,Reader會向?qū)⒁x的DataNode發(fā)送請求,詢問DataNode的DA。如果應(yīng)用請求的數(shù)據(jù)規(guī)模(offset + len)大于DA,那么將拋出異常
否則,Reader將獲取DataNode的Min(DR, offset + len)長度數(shù)據(jù)放到緩存Q中,并且安全地返回 off + len 數(shù)據(jù)給應(yīng)用,隨后Reader監(jiān)聽這個DataNode的DA的變化,直到應(yīng)用放棄對文件的讀取。如果DA增加,表示Reader能從緩存Q中讀到的最大數(shù)據(jù)量增加,也就是offset + len能達(dá)到更大的值。當(dāng)讀取任意一個DataNode P,假設(shè)他的DA是m,如果這個DataNode剛好宕機(jī),1. Reader轉(zhuǎn)而訪問上游的DataNode,上游DataNode的DR比下游的DR大,隨著時間的推遲,上游DataNode會把整個DR暴露給Reader,其中包含下游DR的數(shù)據(jù),下游的數(shù)據(jù)在上游仍然能訪問。2.Reader轉(zhuǎn)而訪問下游的DataNode,下游的DataNode的DA比P的要大,所以在P讀到的數(shù)據(jù)在下游中仍然找得到。一致性讀達(dá)成。
這種做法的缺點是客戶端的代碼和算法實現(xiàn)復(fù)雜,要時刻監(jiān)聽DA的變化。

做法二,為了更清楚地描述,分一下步驟
1.Reader向DataNode a發(fā)出<BlockId,BGS(Block Generation Stamp 可以理解成Block的版本號) , offset, len>,DataNode a的DA必須大于等于offset + len
2.讀取的請求不是發(fā)給DataNode a,而是將請求發(fā)給另外一個DataNode b
3.如果
1.offset + len <= DAb,那么可以安全地返回數(shù)據(jù)
2.如果offset + len > DAb ,因為DAa >= offset + len > DAb。所以DAa > DAb,所以b在a的上游,所以DRb > DRa,所以在b上有a已經(jīng)ACK了的數(shù)據(jù)。所以b也可以安全地返回offset + len的數(shù)據(jù)給Reader
3.如果offset + len > DRb,那么將拋出異常。
雖然上述步驟2訪問了DR,但是DR中被訪問的數(shù)據(jù)已經(jīng)在下游被ACK了,只是Reader自己移動到了上游去找數(shù)據(jù)。
當(dāng)前訪問的DataNode a如果宕機(jī)
1.向下游讀,下游的DA大于上游,故在上游的數(shù)據(jù)一般能在下游找得到,經(jīng)過步驟1將數(shù)據(jù)返回
2.向上游讀,因為之前已經(jīng)規(guī)定好,只能訪問offset + len范圍的數(shù)據(jù),并且上游的BR總是包含DAa,所以 offset + len 長度的數(shù)據(jù)總是能在上游找到。
一致性讀解決
做法二雖然簡單但是要訪問兩個節(jié)點。網(wǎng)絡(luò)上的切換的開銷不小。
具體HDFS實現(xiàn)了哪一個,需要看版本決定,筆者暫時還沒有找到官方給定哪些版本實現(xiàn)哪種方案和研究源碼,日后填坑。
三.流水線的生命周期
1.流水線被建立(Setup) : 客戶端Writer通告NameNode獲得Block信息,通知信息里locations(Replica所在)包含的DataNode,告知這些DataNode將要創(chuàng)建一條流水線,DataNode收到后會回復(fù)。
2.數(shù)據(jù)傳輸(DataStream) : 當(dāng)Writer在步驟1接收到如數(shù)的DataNode的回應(yīng)后,流水線正式創(chuàng)建,Writer能夠在流水線上以Packet為單位傳輸數(shù)據(jù)。
3.恢復(fù)(Recovery) : 恢復(fù)分三種情況 :1.流水線創(chuàng)建時失敗 2.流水線傳輸過程失敗 3.流水線關(guān)閉失敗
4.關(guān)閉(Close) : 當(dāng)一個塊被寫滿,Writer將通知DataNode流水線關(guān)閉,DataNode可以將塊的狀態(tài)設(shè)置為FINALIZED并且DataNode向NameNode匯報
四.流水線的建立
流水線建立的時機(jī):
1.客戶端請求新建一個Block,需要新建流水線,以便將新Block的數(shù)據(jù)寫入到DataNode的Replica里
2.客戶端請求打開一個文件并且對這個文件進(jìn)行append操作,這個文件末尾的最后一個塊如果沒有滿,那么所有擁有這個Block的Replica的DataNode將被連起來成為一條流水線,以便對這些沒寫滿的Replica進(jìn)行追加,(其實是對Block進(jìn)行追加)
3.在恢復(fù)過程中需要建立流水線
流水線建立流程:
客戶端的行為:
1.客戶端首先需要詢問NameNode相關(guān)信息,比如對應(yīng)Block的Replica在哪,Block的BGS和ID等信息。如果流水線的建立的是為了恢復(fù)流水線,或者文件被打開用來append,那么客戶端還會為Block向NameNode申請新的BGS。
2.根據(jù)1中獲取的信息,客戶端試圖和流水線的第一個DataNode通過Socket建立連接。
3.客戶端將1中獲得的信息發(fā)布到流水線上,告知線上的DataNode,該Block對應(yīng)的Replica需要被操作。
發(fā)送的信息具體按流水線的用途分為:

DataNode行為:
.1.當(dāng)DataNode從3中得知信息后,將按情況進(jìn)行如下操作

最后一步:
如果建立的流水線是用來恢復(fù)或者Append的,那么將會通知NameNode,流水線完成,告知NameNode更新流水線信息(塊的位置等)。
重新架構(gòu)流水線:
如果上述所有步驟不成功,則會重新建立流水線(進(jìn)行流水線恢復(fù))。
