三角洲湖快速入門

Delta Lake快速入門提供了使用Delta Lake的基本知識的概述。快速入門介紹了如何將數據加載到Delta表中、修改表、讀取表、顯示表曆史和優化表。

要了解本文中描述的一些功能(以及更多功能)的演示,請觀看這個YouTube視頻(9分鍾)。

您可以運行本文中的示例Python、R、Scala和SQL代碼筆記本附在數據庫上集群.您還可以在查詢與…有關SQL端點磚的SQL

有關演示這些功能的現有Databricks筆記本,請參見介紹性的筆記本

創建一個表

要創建Delta表,可以使用現有的Apache Spark SQL代碼並更改寫入格式拚花csvjson,等等,到δ

對於所有文件類型,使用相應的輸入格式(例如,拚花csvjson,等等),然後以Delta格式寫出數據。在這個代碼示例中,輸入文件已經是Delta格式的,並且位於樣例數據集(databicks -dataset).該代碼將數據保存為Delta格式數據庫文件係統(DBFS)在指定的位置save_path

定義輸入輸出格式和路徑以及表名。read_format“δ”write_format“δ”load_path“/ databricks-datasets / learning-spark-v2 /人/ people-10m.delta”save_path“/ tmp /δ/ people-10m”table_name“default.people10m”#從數據源加載數據。火花格式read_format負載load_path#將數據寫入目標。格式write_format保存save_path#創建表。火花sql“創建表”+table_name+"使用增量定位"+save_path+“”
圖書館SparkRsparkR.session()定義輸入輸出格式和路徑以及表名。read_format“δ”write_format“δ”load_path“/ databricks-datasets / learning-spark-v2 /人/ people-10m.delta”save_path“/ tmp /δ/ people-10m /”table_name“default.people10m”#從數據源加載數據。read.dfload_pathread_format#將數據寫入目標。write.dfwrite_format路徑save_path#創建表。sql粘貼“創建表”table_name"使用增量定位"save_path“”9月""))
//定義輸入輸出格式和路徑以及表名。瓦爾read_format“δ”瓦爾write_format“δ”瓦爾load_path“/ databricks-datasets / learning-spark-v2 /人/ people-10m.delta”瓦爾save_path“/ tmp /δ/ people-10m”瓦爾table_name“default.people10m”//從數據源加載數據。瓦爾火花格式read_format負載load_path//將數據寫入目標。格式write_format保存save_path//創建表。火花sql“創建表”+table_name+"使用增量定位"+save_path+“”
——LOCATION的路徑必須已經存在——並且必須是Delta格式。創建表格默認的people10m使用δ位置“/ tmp /δ/ people-10m”

以上操作將創建一個新的非托管表通過使用從數據中推斷出的模式。對於非托管表,可以控製數據的位置。Databricks跟蹤表的名稱及其位置。有關創建Delta表時可用選項的信息,請參見創建一個表而且寫入表

如果源文件是Parquet格式的,則可以使用轉換為語句就地轉換文件。如果對應的表是非托管的,則在轉換後該表仍然是非托管的:

轉換δ拚花' /tmp/δ/-10

創造一個新的管理表,你可以使用創建表語句指定表名,然後可以將數據加載到表中。或者你可以用saveAsTable方法與Python, R或Scala。例如:

的表“people10m”sourceType“δ”loadPath“/ databricks-datasets / learning-spark-v2 /人/ people-10m.delta”火花格式sourceType負載loadPath格式sourceTypesaveAsTable的表顯示火花sql" select * from "+的表))
圖書館SparkRsparkR.session()的表“people10m”sourceType“δ”loadPath“/ databricks-datasets / learning-spark-v2 /人/ people-10m.delta”read.df路徑loadPathsourceTypesaveAsTabledfsourceType的表的表顯示sql粘貼" select * from "的表9月"")))
瓦爾的表“people10m”瓦爾sourceType“δ”瓦爾loadPath“/ databricks-datasets / learning-spark-v2 /人/ people-10m.delta”瓦爾火花格式sourceType負載loadPath格式sourceTypesaveAsTable的表顯示火花sql" select * from "+的表))
創建表格people10m使用δ作為選擇δ' /-數據集/學習-火花-v2//-10δ選擇people10m

對於托管表,Databricks確定數據的位置。要獲取位置,可以使用描述的細節語句,例如:

顯示火花sql“描述細節,people10m”))
顯示sql“描述細節people10m”))
顯示火花sql“描述細節people10m”))
描述細節people10m

另請參閱創建一個表而且控製數據位置

對數據進行分區

要加快包含涉及分區列的謂詞的查詢,可以對數據進行分區。下麵的代碼示例類似於創建一個表,但本例對數據進行了分區。

定義輸入輸出格式和路徑以及表名。read_format“δ”write_format“δ”load_path“/ databricks-datasets / learning-spark-v2 /人/ people-10m.delta”partition_by“性別”save_path“/ tmp /δ/ people-10m”table_name“default.people10m”#從數據源加載數據。火花格式read_format負載load_path#將數據寫入目標。partitionBypartition_by格式write_format保存save_path#創建表。火花sql“創建表”+table_name+"使用增量定位"+save_path+“”

如果您已經運行Python代碼示例創建一個表,你必須先刪除現有的表和保存的數據:

定義表名和輸出路徑。table_name“default.people10m”save_path“/ tmp /δ/ people-10m”#刪除表。火花sql“摔桌子”+table_name#刪除保存的數據。dbutilsfsrmsave_path真正的
圖書館SparkRsparkR.session()定義輸入輸出格式和路徑以及表名。read_format“δ”write_format“δ”load_path“/ databricks-datasets / learning-spark-v2 /人/ people-10m.delta”partition_by“性別”save_path“/ tmp /δ/ people-10m /”table_name“default.people10m”#從數據源加載數據。read.dfload_pathread_format#將數據寫入目標。write.dfwrite_formatpartitionBypartition_by路徑save_path#創建表。sql粘貼“創建表”table_name"使用增量定位"save_path“”9月""))

如果您已經運行了R代碼示例創建一個表,你必須先刪除現有的表和保存的數據:

圖書館SparkRsparkR.session()定義表名和輸出路徑。table_name“default.people10m”save_path“/ tmp /δ/ people-10m”#刪除表。sql粘貼“摔桌子”table_name9月""))#刪除保存的數據。dbutils.fs.rmsave_path真正的
//定義輸入輸出格式和路徑以及表名。瓦爾read_format“δ”瓦爾write_format“δ”瓦爾load_path“/ databricks-datasets / learning-spark-v2 /人/ people-10m.delta”瓦爾partition_by“性別”瓦爾save_path“/ tmp /δ/ people-10m”瓦爾table_name“default.people10m”//從數據源加載數據。瓦爾火花格式read_format負載load_path//將數據寫入目標。partitionBypartition_by格式write_format保存save_path//創建表。火花sql“創建表”+table_name+"使用增量定位"+save_path+“”

如果您已經運行了Scala代碼示例創建一個表,你必須先刪除現有的表和保存的數據:

//定義表名和輸出路徑。瓦爾table_name“default.people10m”瓦爾save_path“/ tmp /δ/ people-10m”//刪除表。火花sql“摔桌子”+table_name//刪除已保存的數據。dbutilsfsrmsave_path真正的

要在使用SQL創建Delta表時對數據進行分區,請指定分區通過列。

——LOCATION中的路徑必須已經存在——並且必須是Delta格式。創建表格默認的people10midINTfirstName字符串middleName字符串字符串性別字符串生日時間戳ssn字符串工資INT使用δ分區通過性別位置“/ tmp /δ/ people-10m”

中運行SQL代碼示例創建一個表,你必須先刪除現有的表:

下降表格默認的people10m

修改表格

Delta Lake支持一組豐富的操作來修改表。

流寫入表

可以使用結構化流將數據寫入Delta表。Delta Lake事務日誌保證隻進行一次處理,即使有其他流或對表並發運行的批處理查詢。默認情況下,流以追加模式運行,它向表中添加新記錄。

下麵的代碼示例啟動結構化流。中指定的DBFS位置json_read_path,掃描上傳到該位置的JSON文件。當結構化流注意到一個文件上傳時,它試圖將數據寫入指定的DBFS位置save_path中指定的模式read_schema.結構化流繼續監視上傳的文件,直到代碼停止。結構化流使用指定的DBFS位置checkpoint_path幫助確保上傳的文件隻計算一次。

#定義模式和輸入、檢查點和輸出路徑。read_schema"id int, "+“firstName字符串,”+" middlelename string, "+"lastName string, "+"性別字符串,"+"birthDate時間戳,"+"ssn string, "+“工資int”json_read_path' / FileStore / streaming-uploads / people-10m 'checkpoint_path“/ tmp /δ/ people-10m /檢查站”save_path“/ tmp /δ/ people-10m”people_stream火花readStream模式read_schema選項“maxFilesPerTrigger”1選項“多行”真正的格式“json”負載json_read_pathpeople_streamwriteStream格式“δ”outputMode“添加”選項“checkpointLocation”checkpoint_path開始save_path
圖書館SparkRsparkR.session()#定義模式和輸入、檢查點和輸出路徑。read_schema"id int, firstName string, middlelename string, lastName string,性別string,出生日期時間戳,ssn string,工資int"json_read_path“/ FileStore / streaming-uploads / people-10m”checkpoint_path“/ tmp /δ/ people-10m /檢查站”save_path“/ tmp /δ/ people-10m”people_streamread.stream“json”路徑json_read_path模式read_schema多行真正的maxFilesPerTrigger1write.streampeople_stream路徑save_path模式“添加”checkpointLocationcheckpoint_path
//定義模式和輸入、檢查點和輸出路徑。瓦爾read_schema"id int, "+“firstName字符串,”+" middlelename string, "+"lastName string, "+"性別字符串,"+"birthDate時間戳,"+"ssn string, "+“工資int”瓦爾json_read_path“/ FileStore / streaming-uploads / people-10m”瓦爾checkpoint_path“/ tmp /δ/ people-10m /檢查站”瓦爾save_path“/ tmp /δ/ people-10m”瓦爾people_stream火花readStream模式read_schema選項“maxFilesPerTrigger”1選項“多行”真正的格式“json”負載json_read_path))people_streamwriteStream格式“δ”outputMode“添加”選項“checkpointLocation”checkpoint_path開始save_path

為了測試這種行為,這裏有一個符合要求的JSON文件,您可以將其上傳到json_read_path,然後查詢位置save_path查看結構化流寫入的數據。

“id”10000021“firstName”“喬”“middleName”“亞曆山大”“姓”“史密斯”“性別”“M”“生日”188712000“ssn”“123-45-6789”“工資”50000},“id”10000022“firstName”“瑪麗”“middleName”“簡”“姓”“母鹿”“性別”“F”“生日”“1968 - 10 - 27 t04:00:00.000 + 000”“ssn”“234-56-7890”“工資”75500

有關Delta Lake與結構化流集成的更多信息,請參見表流讀取和寫入而且生產中的結構化流.參見結構化流媒體編程指南在Apache Spark網站上。

批量插入

方法將一組更新和插入合並到現有的Delta表中合並成聲明。例如,下麵的語句從源表獲取數據,並將其合並到目標Delta表中。當兩個表中都有匹配的行時,Delta Lake使用給定的表達式更新數據列。當沒有匹配的行時,Delta Lake添加一個新行。這個操作被稱為插入

合並默認的people10m使用默認的people10m_upload默認的people10mid默認的people10m_uploadid匹配然後更新匹配然後插入

如果你指定,這將更新或插入目標表中的所有列。這假定源表與目標表中的列相同,否則查詢將拋出分析錯誤。

操作時,必須為表中的每一列指定一個值插入操作(例如,當現有數據集中沒有匹配的行時)。但是,您不需要更新所有的值。

要測試上麵的例子,創建如下的源表:

創建表格默認的people10m_uploadidINTfirstName字符串middleName字符串字符串性別字符串生日時間戳ssn字符串工資INT使用δ

為了測試匹配子句,用以下行填充源表,然後運行前麵的操作合並聲明。因為兩個表都有匹配的行子句,則更新目標表的匹配行。

插入默認的people10m_upload9999998“比利”“湯米·”“Luppitt”“米”1992 - 09 - 17 t04:00:00.000 + 0000的“953-38-9452”55250),9999999“伊萊亞斯”“西裏爾”“利百特”“米”1984 - 05 - 22 t04:00:00.000 + 0000的“906-51-2137”48500),10000000“約書亞”‘底盤’“Broggio”“米”1968 - 07 - 22 t04:00:00.000 + 0000的“988-61-6247”90000

要查看結果,請查詢該表。

選擇默認的people10m在哪裏id之間的999999810000000排序通過idASC

為了測試匹配子句,用以下行填充源表,然後運行前麵的操作合並聲明。因為目標表沒有以下行,所以將這些行添加到目標表中。

插入默認的people10m_upload20000001“約翰。”“母鹿”“米”1978 - 01 - 14 - t04:00:00.000 + 000的“345-67-8901”55500),20000002“瑪麗”“史密斯”“F”1982 - 10 - 29 t01:00:00.000 + 000“456-78-9012”98250),20000003“簡”“母鹿”“F”1981 - 06 - 25 - t04:00:00.000 + 000的“567-89-0123”89900

要查看結果,請查詢該表。

選擇默認的people10m在哪裏id之間的2000000120000003排序通過idASC

要在Python、R或Scala中運行上述SQL語句,請將該語句作為字符串參數傳遞給spark.sql函數在Python或Scala或sql函數在R中。

讀表格

通過指定DBFS上的路徑(“/ tmp /δ/ people-10m”)或表名(“default.people10m”):

火花格式“δ”負載“/ tmp /δ/ people-10m”顯示

火花表格“default.people10m”顯示
圖書館SparkRsparkR.session()read.df路徑“/ tmp /δ/ people-10m”“δ”顯示

圖書館SparkRsparkR.session()tableToDF“default.people10m”顯示
瓦爾火花格式“δ”).負載“/ tmp /δ/ people-10m”顯示

瓦爾火花表格“default.people10m”顯示
選擇δ' /tmp/δ/-10

選擇默認的people10m

顯示表曆史記錄

要查看表的曆史,請使用描述曆史語句,該語句為每次寫入表提供來源信息,包括表版本、操作、用戶等。

查詢表的較早版本(時間旅行)

Delta Lake時間旅行允許您查詢Delta表的舊快照。

中指定版本或時間戳以查詢表的舊版本選擇聲明。例如,要從上麵的曆史記錄中查詢版本0,使用:

火花sql'從默認選擇*。people10m版本作為的0'

火花sql"SELECT * FROM default. "people10m時間戳作為的“2019-01-29 00:37:58”"
圖書館SparkRsparkR.session()sql"SELECT * FROM default. "people10m版本作為的0"

圖書館SparkRsparkR.session()sql"SELECT * FROM default. "people10m時間戳作為的“2019-01-29 00:37:58”"
火花sql"SELECT * FROM default. "people10m版本作為的0"

火花sql"SELECT * FROM default. "people10m時間戳作為的“2019-01-29 00:37:58”"
選擇默認的people10m版本作為0

選擇默認的people10m時間戳作為“2019-01-29 00:37:58”

對於時間戳,隻接受日期或時間戳字符串,例如,“2019-01-01”而且“2019 - 01 - 01 - 00:00:00.000Z”

請注意

因為版本1是在時間戳“2019-01-2900:38:10”,要查詢版本0,可以使用範圍內的任何時間戳“2019-01-2900:37:58”“2019-01-2900:38:09”包容性。

DataFrameReader選項允許你從固定在表的特定版本的Delta表中創建一個DataFrame,例如在Python中:

df1火花格式“δ”選項“timestampAsOf”“2019-01-01”負載“/ tmp /δ/ people-10m”顯示df1

df2火花格式“δ”選項“versionAsOf”2負載“/ tmp /δ/ people-10m”顯示df2

詳情請參見查詢表的舊快照(時間旅行)

優化表

對表執行多次更改後,可能會有很多小文件。要提高讀取查詢的速度,可以使用優化把小文件折疊成大文件:

火花sql“優化delta. / tmp /δ/ people-10m”

火花sql“優化default.people10m”
圖書館SparkRsparkR.session()sql“優化delta. / tmp /δ/ people-10m”

圖書館SparkRsparkR.session()sql“優化default.people10m”
火花sql“優化delta. / tmp /δ/ people-10m”

火花sql“優化default.people10m”
優化δ' /tmp/δ/-10

優化默認的people10m

按列的z軸順序

為了進一步提高讀取性能,可以通過z - ordered在同一組文件中共同定位相關信息。Delta Lake數據跳過算法自動使用這種共局部性,以大幅減少需要讀取的數據量。對於Z-Order數據,您可以在ZORDER通過條款。例如,通過性別運行:

火花sql“優化三角洲。' /tmp/δ/人-10米`ZORDER通過(性別)"

火花sql“優化默認。people10mZORDER通過(性別)'
圖書館SparkRsparkR.session()sql“優化三角洲。' /tmp/δ/人-10米`ZORDER通過(性別)"

圖書館SparkRsparkR.session()sql“優化違約。people10mZORDER通過(性別)"
火花sql“優化三角洲。' /tmp/δ/人-10米`ZORDER通過(性別)"

火花sql“優化違約。people10mZORDER通過(性別)"
優化δ' /tmp/δ/-10ZORDER通過性別

優化默認的people10mZORDER通過性別

獲取運行時可用的全套選項優化,請參閱壓實(裝箱)

清理快照

Delta Lake為讀取提供快照隔離,這意味著它可以安全運行優化即使其他用戶或作業正在查詢表。不過,最終應該清理舊快照。可以通過運行真空命令:

火花sql“真空default.people10m”
圖書館SparkRsparkR.session()sql“真空default.people10m”
火花sql“真空default.people10m”
真空默認的people10m

控件可以控製最新保留快照的年齡保留< N >小時選擇:

火花sql“真空違約。people10m保留24小時'
圖書館SparkRsparkR.session()sql“真空違約。people10m保留24小時"
火花sql“真空違約。people10m保留24小時"
真空默認的people10m保留24小時

有關使用真空有效地,看刪除Delta表不再引用的文件