Dive into Delta Lake | Delta Lake 嘗鮮
點擊上方藍(lán)色字體,選擇“設(shè)為星標(biāo)”

Delta Lake 是一個存儲層,為 Apache Spark 和大數(shù)據(jù) workloads 提供 ACID 事務(wù)能力,其通過寫和快照隔離之間的樂觀并發(fā)控制(optimistic concurrency control),在寫入數(shù)據(jù)期間提供一致性的讀取,從而為構(gòu)建在 HDFS 和云存儲上的數(shù)據(jù)湖(data lakes)帶來可靠性。Delta Lake 還提供內(nèi)置數(shù)據(jù)版本控制,以便輕松回滾。
為什么需要Delta Lake
現(xiàn)在很多公司內(nèi)部數(shù)據(jù)架構(gòu)中都存在數(shù)據(jù)湖,數(shù)據(jù)湖是一種大型數(shù)據(jù)存儲庫和處理引擎。它能夠存儲大量各種類型的數(shù)據(jù),擁有強(qiáng)大的信息處理能力和處理幾乎無限的并發(fā)任務(wù)或工作的能力,最早由 Pentaho 首席技術(shù)官詹姆斯迪克森在2011年的時候提出。雖然數(shù)據(jù)湖在數(shù)據(jù)范圍方面邁出了一大步,但是也面臨了很多問題,主要概括如下:
數(shù)據(jù)湖的讀寫是不可靠的。數(shù)據(jù)工程師經(jīng)常遇到不安全寫入數(shù)據(jù)湖的問題,導(dǎo)致讀者在寫入期間看到垃圾數(shù)據(jù)。他們必須構(gòu)建方法以確保讀者在寫入期間始終看到一致的數(shù)據(jù)。
數(shù)據(jù)湖中的數(shù)據(jù)質(zhì)量很低。將非結(jié)構(gòu)化數(shù)據(jù)轉(zhuǎn)儲到數(shù)據(jù)湖中是非常容易的。但這是以數(shù)據(jù)質(zhì)量為代價的。沒有任何驗證模式和數(shù)據(jù)的機(jī)制,導(dǎo)致數(shù)據(jù)湖的數(shù)據(jù)質(zhì)量很差。因此,努力挖掘這些數(shù)據(jù)的分析項目也會失敗。
隨著數(shù)據(jù)的增加,處理性能很差。隨著數(shù)據(jù)湖中存儲的數(shù)據(jù)量增加,文件和目錄的數(shù)量也會增加。處理數(shù)據(jù)的作業(yè)和查詢引擎在處理元數(shù)據(jù)操作上花費大量時間。在有流作業(yè)的情況下,這個問題更加明顯。
數(shù)據(jù)湖中數(shù)據(jù)的更新非常困難。工程師需要構(gòu)建復(fù)雜的管道來讀取整個分區(qū)或表,修改數(shù)據(jù)并將其寫回。這種模式效率低,并且難以維護(hù)。
由于存在這些挑戰(zhàn),許多大數(shù)據(jù)項目無法實現(xiàn)其愿景,有時甚至完全失敗。我們需要一種解決方案,使數(shù)據(jù)從業(yè)者能夠利用他們現(xiàn)有的數(shù)據(jù)湖,同時確保數(shù)據(jù)質(zhì)量。這就是 Delta Lake 產(chǎn)生的背景。
Delta Lake特性
Delta Lake 很好地解決了上述問題,以簡化我們構(gòu)建數(shù)據(jù)湖的方式。

支持ACID事務(wù)
Delta Lake 在多并發(fā)寫入之間提供 ACID 事務(wù)保證。每次寫入都是一個事務(wù),并且在事務(wù)日志中記錄了寫入的序列順序。
事務(wù)日志跟蹤文件級別的寫入并使用樂觀并發(fā)控制,這非常適合數(shù)據(jù)湖,因為多次寫入/修改相同的文件很少發(fā)生。在存在沖突的情況下,Delta Lake 會拋出并發(fā)修改異常以便用戶能夠處理它們并重試其作業(yè)。
Delta Lake 還提供強(qiáng)大的可序列化隔離級別,允許工程師持續(xù)寫入目錄或表,并允許消費者繼續(xù)從同一目錄或表中讀取。讀者將看到閱讀開始時存在的最新快照。
Schema管理
Delta Lake 自動驗證正在被寫的 DataFrame 模式是否與表的模式兼容。
表中存在但 DataFrame 中不存在的列會被設(shè)置為 null 如果 DataFrame 中有額外的列在表中不存在,那么該操作將拋出異常 Delta Lake 具有可以顯式添加新列的 DDL 和自動更新Schema 的能力 可伸縮的元數(shù)據(jù)處理
Delta Lake 將表或目錄的元數(shù)據(jù)信息存儲在事務(wù)日志中,而不是存儲在元存儲(metastore)中。這使得 Delta Lake 能夠在固定的時間內(nèi)列出大型目錄中的文件,并且在讀取數(shù)據(jù)時非常高效。
數(shù)據(jù)版本
Delta Lake 允許用戶讀取表或目錄之前的快照。當(dāng)文件被修改文件時,Delta Lake 會創(chuàng)建較新版本的文件并保留舊版本的文件。當(dāng)用戶想要讀取舊版本的表或目錄時,他們可以在 Apache Spark 的讀取 API 中提供時間戳或版本號,Delta Lake 根據(jù)事務(wù)日志中的信息構(gòu)建該時間戳或版本的完整快照。這允許用戶重現(xiàn)之前的數(shù)據(jù),并在需要時將表還原為舊版本的數(shù)據(jù)。
統(tǒng)一的批處理和流 sink
除了批處理寫之外,Delta Lake 還可以使用作為 Apache Spark structured streaming 高效的流 sink。再結(jié)合 ACID 事務(wù)和可伸縮的元數(shù)據(jù)處理,高效的流 sink 現(xiàn)在支持許多接近實時的分析用例,而且無需維護(hù)復(fù)雜的流和批處理管道。
數(shù)據(jù)存儲格式采用開源 Apache Parquet
Delta Lake 中的所有數(shù)據(jù)都是使用 Apache Parquet 格式存儲,使 Delta Lake 能夠利用 Parquet 原生的高效壓縮和編碼方案。
更新和刪除
Delta Lake 支持 merge, update 和 delete 等 DML 命令。這使得數(shù)據(jù)工程師可以輕松地在數(shù)據(jù)湖中插入/更新和刪除記錄。由于 Delta Lake 以文件級粒度跟蹤和修改數(shù)據(jù),因此它比讀取和覆蓋整個分區(qū)或表更有效。
數(shù)據(jù)異常處理
Delta Lake 還將支持新的 API 來設(shè)置表或目錄的數(shù)據(jù)異常。工程師能夠設(shè)置一個布爾條件并調(diào)整報警閾值以處理數(shù)據(jù)異常。當(dāng) Apache Spark 作業(yè)寫入表或目錄時,Delta Lake 將自動驗證記錄,當(dāng)數(shù)據(jù)存在異常時,它將根據(jù)提供的設(shè)置來處理記錄。
兼容 Apache Spark API
開發(fā)人員可以將 Delta Lake 與他們現(xiàn)有的數(shù)據(jù)管道一起使用,僅需要做一些細(xì)微的修改。
基本使用
Create a table
val data = spark.range(0, 5)
data.write.format("delta").save("/tmp/delta-table")
// 分區(qū)表
df.write.format("delta").partitionBy("date").save("/delta/events")Read table
val df = spark.read.format("delta").load("/tmp/delta-table")
df.show()Update table
// overwrite
val data = spark.range(5, 10)
data.write.format("delta").mode("overwrite").save("/tmp/delta-table")// update
import io.delta.tables._
import org.apache.spark.sql.functions._
val deltaTable = DeltaTable.forPath("/tmp/delta-table")
// Update every even value by adding 100 to it
deltaTable.update(
condition = expr("id % 2 == 0"),
set = Map("id" -> expr("id + 100")))
// Delete every even value
deltaTable.delete(condition = expr("id % 2 == 0"))
// Upsert (merge) new data
val newData = spark.range(0, 20).as("newData").toDF
deltaTable.as("oldData")
.merge(
newData,
"oldData.id = newData.id")
.whenMatched
.update(Map("id" -> col("newData.id")))
.whenNotMatched
.insert(Map("id" -> col("newData.id")))
.execute()
deltaTable.toDF.show()// update by expressions
import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, pathToEventsTable)
// predicate and update expressions using SQL formatted string
deltaTable.updateExpr(
"eventType = 'clck'",
Map("eventType" -> "'click'")// merge
deltaTable
.as("logs")
.merge(
updates.as("updates"),
"logs.uniqueId = updates.uniqueId")
.whenNotMatched()
.insertAll()
.execute()Delete table
import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, pathToEventsTable)
deltaTable.delete("date < '2017-01-01'") // predicate using SQL formatted string
import org.apache.spark.sql.functions._
import spark.implicits._
deltaTable.delete($"date" < "2017-01-01") // predicate using Spark SQL functions and implicits流支持
查詢表的舊快照
Delta Lake 時間旅行允許您查詢 Delta Lake 表的舊快照。時間旅行有很多用例,包括:
重新創(chuàng)建分析,報告或輸出(例如,機(jī)器學(xué)習(xí)模型的輸出)。這對于調(diào)試或?qū)徲嫹浅S杏?,尤其是在受監(jiān)管的行業(yè)中
編寫復(fù)雜的臨時查詢
修復(fù)數(shù)據(jù)中的錯誤
為快速更改的表的一組查詢提供快照隔離 DataFrameReader options 允許從 Delta Lake 表創(chuàng)建一個DataFrame 關(guān)聯(lián)到表的特定版本,可以使用如下兩種方式:
df1 = spark.read.format("delta").option("timestampAsOf", timestamp_string).load("/delta/events")
df2 = spark.read.format("delta").option("versionAsOf", version).load("/delta/events")對于timestamp_string,僅接受日期或時間戳字符串。例如,2019-01-01 和 2019-01-01 00:00:00.000Z
增加列
當(dāng)以下任意情況為 true 時,DataFrame 中存在但表中缺少的列將自動添加為寫入事務(wù)的一部分:
write 或 writeStream 具有 .option("mergeSchema", "true") 添加的列將附加到它們所在的結(jié)構(gòu)的末尾。附加新列時將保留大小寫。
NullType 列
寫入 Delta 時,會從 DataFrame 中刪除 NullType 列(因為 Parquet 不支持 NullType)。當(dāng)收到該列的不同數(shù)據(jù)類型時,Delta Lake 會將 schema 合并到新數(shù)據(jù)類型
默認(rèn)情況下,覆蓋表中的數(shù)據(jù)不會覆蓋 schema。使用模式 overwrite 覆蓋表而不使用 replaceWhere 時,可能仍希望覆蓋正在寫入的數(shù)據(jù)的 schema??梢酝ㄟ^設(shè)置以下內(nèi)容來選擇替換表的 schema :
df.write.option("overwriteSchema", "true")視圖
Transactional meta 實現(xiàn)
在文件上增加一個日志結(jié)構(gòu)化存儲(transaction log ),該日志有序(ordered)且保持原子性(atomic)。
增加或者刪除數(shù)據(jù)時,都會產(chǎn)生一個描述文件,采用樂觀并發(fā)控制 (optimistic concurrency control) 保證用戶并發(fā)操作時數(shù)據(jù)的一致性。

每次表更都生產(chǎn)一個描述文件,描述文件的記錄數(shù)和歷史版本數(shù)量一致。如圖,delta-table表13個歷史版本就有13個描述文件。



并發(fā)控制
Delta Lake 在讀寫中提供了 ACID 事務(wù)保證。這意味著:
跨多集群的并發(fā)寫入,也可以同時修改數(shù)據(jù)集并查看表的一致性快照,這些寫入操作將按照串行執(zhí)行
在作業(yè)執(zhí)行期間修改了數(shù)據(jù),讀取時也能看到一致性快照。
樂觀并發(fā)控制
Delta Lake 使用 optimistic concurrency control 機(jī)制提供寫數(shù)據(jù)時的事務(wù)保證,在這種機(jī)制下,寫過程包含三個步驟:
Write: 通過編寫新數(shù)據(jù)文件來進(jìn)行所有更改
Validate and commit: 調(diào)用 commit 方法,生成 commit 信息,生成一個新的遞增1的文件,如果相同的文件名已經(jīng)存在,則報 ConcurrentModificationException。
名詞解釋
ACID ACID 就是指數(shù)據(jù)庫事務(wù)的四個基本要素,對應(yīng)的是原子性 Atomicity,一致性 Consistency,隔離性 Isolation 和持久性 Durability。
原子性: 一個事務(wù)要么全部成功,要不全部失敗,事務(wù)出現(xiàn)錯誤會被回滾到事務(wù)開始時候的狀態(tài)。
一致性: 系統(tǒng)始終處于一致的狀態(tài),所有操作都應(yīng)該服務(wù)現(xiàn)實中的期望。
隔離性: 并發(fā)事務(wù)不會互相干擾,事務(wù)之間互相隔離。
持久性: 事務(wù)結(jié)束后就一直保存在數(shù)據(jù)庫中,不會被回滾。
Snapshot
Snapshot 相當(dāng)于當(dāng)前數(shù)據(jù)的快照。這個快照包括的內(nèi)容不僅僅只有一個版本號,還會包括當(dāng)前快照下的數(shù)據(jù)文件,上一個 Snapshot 的操作,以及時間戳和 DeltaLog 的記錄。
MetaData
這里是指 Delta Table 的元數(shù)據(jù),包括 id,name,format,創(chuàng)建時間,schema 信息等等。
事務(wù)日志
事務(wù)日志的相關(guān)代碼主要在 org.apache.spark.sql.delta.DeltaLog 中。這個是 Delta Lake 把對數(shù)據(jù)/表的操作的記錄日志。
CheckSum
可以說 CheckSum 是一個對象,里面包含了,當(dāng)前 SNAPSHOT 下的表的物理大小,文件數(shù),MetaData 的數(shù)量,協(xié)議以及事務(wù)的數(shù)量。這些信息會轉(zhuǎn)成 Json 格式,存放在 CheckSumFile 中。
校驗文件是在 Snapshot 的基礎(chǔ)上計算的,會和各自的事務(wù)生死存亡。

版權(quán)聲明:
文章不錯?點個【在看】吧!??




