Pulsar 怎樣為批流處理提供融合的存儲
本文于 2019 年 6 月 4 日由作者翟佳首發(fā)于 InfoQ 平臺,點擊文末「閱讀原文」可直達(dá)

非常榮幸有機(jī)會和大家分享一下 Apache Pulsar 怎樣為批流處理提供融合的存儲。希望今天的分享對做大數(shù)據(jù)處理的同學(xué)能有幫助和啟發(fā)。
這次分享,主要分為四個部分:
介紹與其他消息系統(tǒng)相比, Apache Pulsar 的獨特優(yōu)勢
分析批流處理中的存儲需求
講述 Apache Pulsar 如何完美匹配批流處理中的存儲需求
介紹怎樣使用 Apache Pulsar 提供批流融合的存儲

Apache Pulsar 是新一代云原生分布式消息流平臺,是 Apache 的頂級項目,源于 Yahoo,在 Yahoo 全球數(shù)十個機(jī)房大規(guī)模部署并線上穩(wěn)定使用了 4 年多。Apache Pulsar 設(shè)計中學(xué)習(xí)和借鑒了其他優(yōu)秀的分布式系統(tǒng),在保證一致性和高吞吐的同時,也提供了其他優(yōu)秀特性,比如支持上百萬的 Topic、無縫的多中心互備、靈活的擴(kuò)展性等。
這里我們簡單介紹一下,與其他消息系統(tǒng)相比, Apache Pulsar 擁有的獨特優(yōu)勢,大致有以下3點:
獨特的軟件架構(gòu)(存儲和計算分離,分層分片的存儲)
靈活的消費模型( Exclusive、Failover、Shared 和 KeyShared)
豐富的企業(yè)特性(多租戶)
在介紹 Apache Pulsar 時,通常會用這樣一句話,“Flexible Pub-Sub Messaging backed by durable log Storage”。這句話表明了 Pulsar 和其他消息系統(tǒng)的根本不同,它采用了存儲和計算分離的架構(gòu)。
Pulsar 的服務(wù)層使用 Broker,存儲層使用 BookKeeper,來提供高效和一致的存儲。

從架構(gòu)上來說,Apache Pulsar 采用了分層和分片的架構(gòu)。這是 Pulsar 滿足批流處理中存儲需求的基礎(chǔ)。
在 Apache Pulsar 的分層架構(gòu)中,服務(wù)層 Broker 和存儲層 BookKeeper 的每個節(jié)點都是對等的。Broker 僅僅負(fù)責(zé)消息的服務(wù)支持,不存儲數(shù)據(jù)。這為服務(wù)層和存儲層提供了瞬時的節(jié)點擴(kuò)展和無縫的失效恢復(fù)。
存儲層 BookKeeper 為 WAL(Write Ahead Log)提供了存儲,是一個分布式的 Log 存儲系統(tǒng)。
WAL 和數(shù)據(jù)處理中的流有很多相似性,都是數(shù)據(jù)源源不斷地追加,都對順序和一致性有嚴(yán)格要求。
BookKeeper 通過 Quorum Vote 的方式來實現(xiàn)數(shù)據(jù)的一致性,跟 Master/Slave 模式不同,BookKeeper 中每個節(jié)點也是對等的,對一份數(shù)據(jù)會并發(fā)地同時寫入指定數(shù)目的存儲節(jié)點。對等的存儲節(jié)點,保證了多個備份可以被并發(fā)訪問;也保證了存儲中即使只有一份數(shù)據(jù)可用,也可以對外提供服務(wù)。
Apache Pulsar 通過分層分片的架構(gòu),將邏輯的分區(qū)轉(zhuǎn)化為分片來作為存儲單元。這為數(shù)據(jù)的并發(fā)訪問提供了基礎(chǔ)。

除了架構(gòu)的不同,從用戶接口來說,Apache Pulsar 通過訂閱的抽象,提供了靈活的消費模型。每一個訂閱類似一個 Consumer Group,接收一個 topic 的所有的消息。用戶可以使用不同的訂閱類型、以不同的模式來共同消費同一個 Topic 中的消息。
如果對順序性有要求,可以使用 Exclusive 和 Failover 的訂閱模式,這樣同一個 Topic 只有一個 Consumer 在消費,可以保證順序性。
如果使用 Shared 訂閱模式,多個 Consumer 可以并發(fā)消費同一個 Topic。通過動態(tài)增加 Consumer 的數(shù)量,可以加速 Topic 的消費,減少消息在服務(wù)端的堆積。
Pulsar 即將發(fā)布的 2.4.0 版本添加了一種新的訂閱模式:KeyShared。KeyShared 模式保證在 Shared 模式下同一個 Key 的消息也會發(fā)送到同一個 Consumer,在并發(fā)的同時也保證了順序性。
Apache Pulsar 靈活的消費模型,避免了因為不同的消費場景需要部署多套消息系統(tǒng)的場景,消除了數(shù)據(jù)生產(chǎn)端的數(shù)據(jù)分離。

此外,Apache Pulsar 是以多租戶為基礎(chǔ)的豐富的企業(yè)級特性。企業(yè)內(nèi)部可以搭建一套 Pulsar 集群,在集群中給各個部門分配不同的租戶,并設(shè)置租戶的管理權(quán)限。租戶的管理員再根據(jù)部門的不同業(yè)務(wù)和場景需求,創(chuàng)建不同的 Namespace。在 Namespace 中可以設(shè)置管理策略,比如流控,Quota,互備的集群,數(shù)據(jù)副本數(shù)等。這樣為 Topic 的管理提供了一個層級的可控的視圖。
Apache Pulsar 的企業(yè)級特性,為企業(yè)搭建統(tǒng)一大集群提供了基礎(chǔ),方便了集群的管理和數(shù)據(jù)的共享。

在大數(shù)據(jù)處理剛剛興起的時候,一般用戶會采用 λ 架構(gòu),維護(hù)批流兩套系統(tǒng):批系統(tǒng)主要處理歷史數(shù)據(jù);流系統(tǒng)處理實時的數(shù)據(jù),對批系統(tǒng)的結(jié)果進(jìn)行補(bǔ)充來提高時效。兩套系統(tǒng)造成數(shù)據(jù)冗余,增加維護(hù)成本。
在存儲層,批處理常使用 HDFS 和網(wǎng)絡(luò)對象存儲等;流處理常使用 Kafka 或其他的消息系統(tǒng)。
為了解決 λ 架構(gòu)的問題,逐漸演化出 κ 架構(gòu),使用一套系統(tǒng)來滿足實時數(shù)據(jù)處理和歷史數(shù)據(jù)處理的需求。
在 κ 架構(gòu)中,數(shù)據(jù)的“可重復(fù)處理”是關(guān)鍵。一方面要求實時數(shù)據(jù)能及時獲取最新數(shù)據(jù),處理完立即導(dǎo)出給其他系統(tǒng)使用;另一方面要滿足處理歷史數(shù)據(jù)的需求,需要具備讀大量歷史數(shù)據(jù)的能力。實時數(shù)據(jù)的處理決定了必須使用消息系統(tǒng),但是消息系統(tǒng)并不能完全滿足批處理的并發(fā)需求。
在前面的分享中,百度和阿里的專家分享了計算層的批流融合。我們認(rèn)為批流融合存儲層的需求是一個融合的存儲表征:消息系統(tǒng) + 并發(fā)的存儲訪問。
下面我們從 “Apache Pulsar 提供的存儲抽象”、“批流處理中的 IO 模式”和 “Apache Pulsar 提供的無限流存儲” 這三個方面來解釋為什么 Apache Pulsar 能滿足批流融合的存儲需求。
?? Segmented Stream 存儲表征

前面我們介紹了 Apache Pulsar 首先是一個消息系統(tǒng),它和其他消息系統(tǒng)類似,提供了簡潔的以 Topic,Producer,Consumer 為基礎(chǔ)的 Pub/Sub 模型。
Pulsar 靈活的訂閱模式和高帶寬、低延遲特性,能夠很好的滿足流處理的需求。
Apache Pulsar 的 Topic 可以分為不同的分區(qū)。和其他消息系統(tǒng)不同的是 Apache Pulsar 利用分片的架構(gòu),每個邏輯分區(qū)又進(jìn)行了分片。
在分層分片的架構(gòu)中,分片是存儲的單元,可以類比 HDFS 中的一個文件塊,分片被均勻地分布在存儲層的 BookKeeper 節(jié)點中。

我們再從批流處理的角度來看 Apache Pulsar 的這種分片(Segment)的架構(gòu):
對于流處理來說,Apache Pulsar 的每個 Partition 就是流處理的一個流,它通過 Pub/Sub 的接口來給流處理提供數(shù)據(jù)交互。
對于批處理來說,Apache Pulsar 以分片為粒度,可以為批處理提供數(shù)據(jù)的并發(fā)訪問。
一方面,Apache Pulsar 中每個 Partition 都可以看做是源源不斷流入數(shù)據(jù)的載體,借助于分片和二級存儲,Apache Pulsar 有能力將 Partition 所有流入的數(shù)據(jù)都保存下來。這樣每個 Partition 都可以看作是 Stream 的存儲抽象。
另一方面, Apache Pulsar 的 Partition 是邏輯分區(qū)的概念,分區(qū)內(nèi)部又被分成分片,作為存儲和 IO 訪問的單元。
結(jié)合這兩個概念,我們把 Apache Pulsar 對每個 Partiton 的存儲表征稱為 Segmented Stream。
通過 Pulsar 的 Segmented Stream 抽象,為批流處理提供了一個統(tǒng)一的存儲表征。
介紹了 Apache Pulsar 的 Segmented Stream 的存儲表征后,下面我們結(jié)合批流處理中數(shù)據(jù)的三種常用的訪問模式:Write,Tailing Read 和 Catchup Read,來看看 Apache Pulsar 這種架構(gòu)的合理性。這里主要會討論延遲、IO 的并發(fā)和隔離,并用大家比較熟悉的 Kafka 系統(tǒng)來對比說明。
Write:往 Stream 中添加新的數(shù)據(jù)。
Tailing Read:讀最新的數(shù)據(jù)。
Catchup Read:讀歷史老數(shù)據(jù)。

對于 Write 這種模式,所有的寫都直接追加在 Stream 的尾部。對于和 Kafka 類似的 Master/Slave 架構(gòu)系統(tǒng)來說,數(shù)據(jù)會先寫入 Leader Broker,再發(fā)送給其他 Follower Broker。
Apache Pulsar 的寫先發(fā)送到 broker,然后 broker 作為存儲代理,并發(fā)將數(shù)據(jù)發(fā)送給存儲層的多個 Bookie 節(jié)點。兩種架構(gòu)都會有兩次網(wǎng)絡(luò)跳躍。
對于 Write 模式,延遲差別不大。

Tailing Read 是流處理中的常用模式。它從 Stream 的尾部讀取最新寫入的數(shù)據(jù)。
對于和 Kafka 類似的系統(tǒng),Tailing Read 會從 Leader Broker 直接讀取。對于 Apache Pulsar,在 Broker 中有一段自維護(hù)的 Cache 來緩存剛剛寫入的最新數(shù)據(jù),Tailing Read 直接從 Broker 獲取數(shù)據(jù)并返回。
兩種架構(gòu)都只有 1 次網(wǎng)絡(luò)跳躍。對 Tailing Read 模式,延遲差別不大。

Catchup Read 是批處理中常用的讀取模式。它從 Stream 的指定位置,讀取一定量的歷史數(shù)據(jù)。這種場景一般對數(shù)據(jù)的讀取量比較大,注重讀取的帶寬。
對于 Kafka 類似的系統(tǒng),Catchup Read 一般還是會使用 Pub/Sub 的接口,從 Leader Broker 直接讀取。對于 Apache Pulsar,我們可以從 Broker 中讀取元數(shù)據(jù),獲取 partition 中分片的起始位置和分片在 BookKeeper 中的存儲信息,繞過 Pub/Sub 接口,利用 BookKeeper 的 Read 接口,直接從存儲層并發(fā)訪問多個分片。BookKeeper 提供了多副本的高可用,提升了讀取歷史數(shù)據(jù)的并發(fā)能力。

如果我們把這三種 IO 模式放在一起看就更有意思了。這可以類比用戶在某時間段,對 Stream 既有最新數(shù)據(jù)讀寫,也有歷史數(shù)據(jù)讀寫的情形。這是在批流融合中經(jīng)常遇到的場景。
對和 Kafka 類似的系統(tǒng),這三種 IO 模式都會發(fā)生在 Leader Broker。在 Leader Broker 中,系統(tǒng)的數(shù)據(jù)都需要通過文件系統(tǒng)的 Pagecache,歷史數(shù)據(jù)和最新的數(shù)據(jù)會爭用 Pagecache 資源,造成讀寫響應(yīng)不及時。
如果這時再遇到 Broker 磁盤空間寫滿,需要擴(kuò)容的情況,那就需要等待數(shù)據(jù)的搬移和 rebalance 的操作。這時,IO 的延遲和服務(wù)質(zhì)量很難得到保障。
Apache Pulsar Segmented Stream 的存儲表征,結(jié)合分層分片的架構(gòu),為新數(shù)據(jù)和歷史數(shù)據(jù)做了天然的隔離。最新的數(shù)據(jù) IO 發(fā)生在 Broker 層。
對歷史數(shù)據(jù)的并發(fā)讀寫,直接發(fā)生在存儲節(jié)點。冷熱數(shù)據(jù)被天然隔離,用戶完全不用擔(dān)心 IO 的沖突和爭用。Apache Pulsar 在節(jié)點擴(kuò)容和錯誤恢復(fù)的過程中,也不會有數(shù)據(jù)大量拷貝和 rebalance,因此提升了系統(tǒng)的高可用性。
通過這三種 IO 模式的說明和對比,我們發(fā)現(xiàn) Pulsar Segmented Stream 的存儲表征,再結(jié)合分層分片的架構(gòu),可以很好地滿足批流處理中對存儲系統(tǒng)的需求。
?? 無限的流存儲支持

Pulsar Segmented Stream 的存儲表征,很好地模擬了現(xiàn)實中 Stream 數(shù)據(jù)。對于流存儲的另一個需求是理論上無限的存儲空間。這樣可以滿足對歷史數(shù)據(jù)的存儲和訪問需求。Apache Pulsar 從兩個方面解決了這個問題。
一方面 Pulsar 的存儲層中,分片會均衡地分布到所有的存儲節(jié)點中,這避免了其他系統(tǒng)中單一broker 存儲容量的限制,進(jìn)而可以利用整個集群的存儲空間。
另一方面,Pulsar 的分片架構(gòu),為數(shù)據(jù)的二級存儲擴(kuò)展提供了很好的基礎(chǔ)。對于Segmented Stream,用戶可以設(shè)置 Segment 在 BookKeeper 中保留的時間或大小。如果超過設(shè)定的值,將舊的 Segment 遷移到廉價的二級存儲,比如 Aws S3,Google Cloud Storage,或者HDFS 中。二級存儲的帶寬一般有保障,可以滿足歷史數(shù)據(jù)的批處理模式。通過二級存儲可以減輕無限存儲的成本。
Pulsar 利用自身的分層分片的架構(gòu),提供了 Segmented Stream 的存儲表征,滿足了批流融合的存儲需求。
通過 Pulsar Pub/Sub 接口訪問 Segmented Stream,可以滿足流處理的存儲需求;
通過 Pulsar 存儲層對 segment 的訪問接口(Segment Reader),可以滿足批處理的并發(fā)訪問需求。
從批流處理的 IO 模式分析中可以發(fā)現(xiàn),Pulsar 的架構(gòu)可以很好地處理批流處理中的 IO 并發(fā)和隔離。并且 Pulsar 提供了理論上無限流存儲的能力,能夠滿足批處理中,對海量歷史數(shù)據(jù)的存儲需求。
前面我們介紹了為什么 Pulsar 的架構(gòu)能滿足批流融合的存儲需求。接著我們會介紹 Pulsar是如何在工程上實現(xiàn)的。

基于 Segmented Stream 存儲的表征,我們很容易區(qū)分和支持批處理和流處理。批處理所請求的數(shù)據(jù)可以看做是一個有邊界的流(Bounded Stream)。流處理所請求的數(shù)據(jù)可以看做是一個沒有邊界的流(UnBounded Stream)。
下面我們看在 Pulsar 內(nèi)部,批處理和流處理會怎樣訪問 Segmented Stream。

這里的代碼是一個計算廣告點擊率的 SQL 語句。如果用戶想要查詢某個時間段內(nèi)的點擊率,會提供點擊事件的起止時間。起止時間可以確定一個流的起止邊界,進(jìn)而確定一個 Bounded Stream。這是一個典型的批處理場景。
對 Pulsar 的處理來說,首先根據(jù)起止時間來確定和獲取所需要的 Segments 列表;然后選擇這些Segments,繞過 pub/sub 接口,直接通過 Pulsar 的 Segment Reader 接口,來訪問 Pulsar 的存儲層。

流處理是一系列不會停止的 Windows 訪問和查詢。與批處理相比,流處理它沒有截止的時間點,即使查詢到當(dāng)前時刻,它仍然繼續(xù)對當(dāng)前的 window 不斷地查詢,一個 window 處理結(jié)束,接著處理下一個 window。它的 SQL 查詢語句不會變化,但是查詢 window 中的數(shù)據(jù)會不斷實時更新,它是一個源源不斷的、不停處理最新數(shù)據(jù)的方式。
對于這種訪問模式,直接使用 Pulsar 的 pub/sub 接口就可以直接獲取最新的消息,滿足流處理的需求。

對批流融合,在計算層,更多關(guān)注的是批流融合的計算模型、API 和運(yùn)行時的統(tǒng)一。在存儲層,通過 Segmented Stream 的存儲表征,為批流數(shù)據(jù)提供了統(tǒng)一的數(shù)據(jù)存儲和組織方式。
針對批流處理的不同訪問模式,Pulsar 提供了兩套 API 接口。流處理使用 Pub/Sub 的接口;批處理使用 Parallel Segment Read(PSegment)的接口。
對于批處理的接口,我們在 Pulsar SQL 里面做了一個嘗試,Pulsar SQL 借助 Presto,對寫入Pulsar 中的數(shù)據(jù)進(jìn)行交互式的查詢。
如果你想體驗 Pulsar SQL,可以查看 Pulsar 的 SQL手冊。
https://pulsar.apache.org/docs/en/sql-getting-started
Pub/Sub 的接口已經(jīng)比較完善,我們最近在豐富和完善 PSegment 接口。

在 PSegment 中,我們的主要工作是集成Pulsar 和 Flink、Spark、Hive 及 Presto 。這些工作主要集中在 API 的實現(xiàn)和 Schema 的整合。這些工作完成之后,我們會開源這部分的代碼。
Pulsar 是下一代云原生的消息和流存儲的平臺。我們認(rèn)為消息和流是一份數(shù)據(jù)的兩種不同表征方式。Pulsar 采用了存儲計算分離的分層架構(gòu)和分區(qū)內(nèi)再分片的存儲架構(gòu),這種架構(gòu)能夠提供基于Segmented Stream 的存儲表征,能為批和流處理提供融合的存儲基礎(chǔ)。
