三角洲湖快速入門
Delta Lake快速入門提供了使用Delta Lake的基本知識的概述。快速入門介紹了如何將數據加載到Delta表中、修改表、讀取表、顯示表曆史和優化表。
要了解本文中描述的一些功能(以及更多功能)的演示,請觀看這個YouTube視頻(9分鍾)。
您可以運行本文中的示例Python、R、Scala和SQL代碼筆記本附在數據庫上集群.您還可以在查詢與…有關SQL端點在磚的SQL.
有關演示這些功能的現有Databricks筆記本,請參見介紹性的筆記本.
創建一個表
要創建Delta表,可以使用現有的Apache Spark SQL代碼並更改寫入格式拚花
,csv
,json
,等等,到δ
.
對於所有文件類型,使用相應的輸入格式(例如,拚花
,csv
,json
,等等),然後以Delta格式寫出數據。在這個代碼示例中,輸入文件已經是Delta格式的,並且位於樣例數據集(databicks -dataset).該代碼將數據保存為Delta格式數據庫文件係統(DBFS)在指定的位置save_path
.
定義輸入輸出格式和路徑以及表名。read_format=“δ”write_format=“δ”load_path=“/ databricks-datasets / learning-spark-v2 /人/ people-10m.delta”save_path=“/ tmp /δ/ people-10m”table_name=“default.people10m”#從數據源加載數據。人=火花\.讀\.格式(read_format)\.負載(load_path)#將數據寫入目標。人.寫\.格式(write_format)\.保存(save_path)#創建表。火花.sql(“創建表”+table_name+"使用增量定位"+save_path+“”)
圖書館(SparkR)sparkR.session()定義輸入輸出格式和路徑以及表名。read_format=“δ”write_format=“δ”load_path=“/ databricks-datasets / learning-spark-v2 /人/ people-10m.delta”save_path=“/ tmp /δ/ people-10m /”table_name=“default.people10m”#從數據源加載數據。人=read.df(load_path,源=read_format)#將數據寫入目標。write.df(人,源=write_format,路徑=save_path)#創建表。sql(粘貼(“創建表”,table_name,"使用增量定位",save_path,“”,9月=""))
//定義輸入輸出格式和路徑以及表名。瓦爾read_format=“δ”瓦爾write_format=“δ”瓦爾load_path=“/ databricks-datasets / learning-spark-v2 /人/ people-10m.delta”瓦爾save_path=“/ tmp /δ/ people-10m”瓦爾table_name=“default.people10m”//從數據源加載數據。瓦爾人=火花.讀.格式(read_format).負載(load_path)//將數據寫入目標。人.寫.格式(write_format).保存(save_path)//創建表。火花.sql(“創建表”+table_name+"使用增量定位"+save_path+“”)
——LOCATION的路徑必須已經存在——並且必須是Delta格式。創建表格默認的.people10m使用δ位置“/ tmp /δ/ people-10m”
以上操作將創建一個新的非托管表通過使用從數據中推斷出的模式。對於非托管表,可以控製數據的位置。Databricks跟蹤表的名稱及其位置。有關創建Delta表時可用選項的信息,請參見創建一個表而且寫入表.
如果源文件是Parquet格式的,則可以使用轉換為語句就地轉換文件。如果對應的表是非托管的,則在轉換後該表仍然是非托管的:
轉換來δ拚花.' /tmp/δ/人-10米`
創造一個新的管理表,你可以使用創建表語句指定表名,然後可以將數據加載到表中。或者你可以用saveAsTable
方法與Python, R或Scala。例如:
的表=“people10m”sourceType=“δ”loadPath=“/ databricks-datasets / learning-spark-v2 /人/ people-10m.delta”人=火花\.讀\.格式(sourceType)\.負載(loadPath)人.寫\.格式(sourceType)\.saveAsTable(的表)顯示(火花.sql(" select * from "+的表))
圖書館(SparkR)sparkR.session()的表=“people10m”sourceType=“δ”loadPath=“/ databricks-datasets / learning-spark-v2 /人/ people-10m.delta”人=read.df(路徑=loadPath,源=sourceType)saveAsTable(df=人,源=sourceType,的表=的表)顯示(sql(粘貼(" select * from ",的表,9月="")))
瓦爾的表=“people10m”瓦爾sourceType=“δ”瓦爾loadPath=“/ databricks-datasets / learning-spark-v2 /人/ people-10m.delta”瓦爾人=火花.讀.格式(sourceType).負載(loadPath)人.寫.格式(sourceType).saveAsTable(的表)顯示(火花.sql(" select * from "+的表))
創建表格people10m使用δ作為選擇*從δ.' /磚-數據集/學習-火花-v2/人/人-10米.δ`;選擇*從people10m;
對於托管表,Databricks確定數據的位置。要獲取位置,可以使用描述的細節語句,例如:
顯示(火花.sql(“描述細節,people10m”))
顯示(sql(“描述細節people10m”))
顯示(火花.sql(“描述細節people10m”))
描述細節people10m;
對數據進行分區
要加快包含涉及分區列的謂詞的查詢,可以對數據進行分區。下麵的代碼示例類似於創建一個表,但本例對數據進行了分區。
定義輸入輸出格式和路徑以及表名。read_format=“δ”write_format=“δ”load_path=“/ databricks-datasets / learning-spark-v2 /人/ people-10m.delta”partition_by=“性別”save_path=“/ tmp /δ/ people-10m”table_name=“default.people10m”#從數據源加載數據。人=火花\.讀\.格式(read_format)\.負載(load_path)#將數據寫入目標。人.寫\.partitionBy(partition_by)\.格式(write_format)\.保存(save_path)#創建表。火花.sql(“創建表”+table_name+"使用增量定位"+save_path+“”)
如果您已經運行Python代碼示例創建一個表,你必須先刪除現有的表和保存的數據:
定義表名和輸出路徑。table_name=“default.people10m”save_path=“/ tmp /δ/ people-10m”#刪除表。火花.sql(“摔桌子”+table_name)#刪除保存的數據。dbutils.fs.rm(save_path,真正的)
圖書館(SparkR)sparkR.session()定義輸入輸出格式和路徑以及表名。read_format=“δ”write_format=“δ”load_path=“/ databricks-datasets / learning-spark-v2 /人/ people-10m.delta”partition_by=“性別”save_path=“/ tmp /δ/ people-10m /”table_name=“default.people10m”#從數據源加載數據。人=read.df(load_path,源=read_format)#將數據寫入目標。write.df(人,源=write_format,partitionBy=partition_by,路徑=save_path)#創建表。sql(粘貼(“創建表”,table_name,"使用增量定位",save_path,“”,9月=""))
如果您已經運行了R代碼示例創建一個表,你必須先刪除現有的表和保存的數據:
圖書館(SparkR)sparkR.session()定義表名和輸出路徑。table_name=“default.people10m”save_path=“/ tmp /δ/ people-10m”#刪除表。sql(粘貼(“摔桌子”,table_name,9月=""))#刪除保存的數據。dbutils.fs.rm(save_path,真正的)
//定義輸入輸出格式和路徑以及表名。瓦爾read_format=“δ”瓦爾write_format=“δ”瓦爾load_path=“/ databricks-datasets / learning-spark-v2 /人/ people-10m.delta”瓦爾partition_by=“性別”瓦爾save_path=“/ tmp /δ/ people-10m”瓦爾table_name=“default.people10m”//從數據源加載數據。瓦爾人=火花.讀.格式(read_format).負載(load_path)//將數據寫入目標。人.寫.partitionBy(partition_by).格式(write_format).保存(save_path)//創建表。火花.sql(“創建表”+table_name+"使用增量定位"+save_path+“”)
如果您已經運行了Scala代碼示例創建一個表,你必須先刪除現有的表和保存的數據:
//定義表名和輸出路徑。瓦爾table_name=“default.people10m”瓦爾save_path=“/ tmp /δ/ people-10m”//刪除表。火花.sql(“摔桌子”+table_name)//刪除已保存的數據。dbutils.fs.rm(save_path,真正的)
要在使用SQL創建Delta表時對數據進行分區,請指定分區通過
列。
——LOCATION中的路徑必須已經存在——並且必須是Delta格式。創建表格默認的.people10m(idINT,firstName字符串,middleName字符串,姓字符串,性別字符串,生日時間戳,ssn字符串,工資INT)使用δ分區通過(性別)位置“/ tmp /δ/ people-10m”
中運行SQL代碼示例創建一個表,你必須先刪除現有的表:
下降表格默認的.people10m
修改表格
Delta Lake支持一組豐富的操作來修改表。
流寫入表
可以使用結構化流將數據寫入Delta表。Delta Lake事務日誌保證隻進行一次處理,即使有其他流或對表並發運行的批處理查詢。默認情況下,流以追加模式運行,它向表中添加新記錄。
下麵的代碼示例啟動結構化流。中指定的DBFS位置json_read_path
,掃描上傳到該位置的JSON文件。當結構化流注意到一個文件上傳時,它試圖將數據寫入指定的DBFS位置save_path
中指定的模式read_schema
.結構化流繼續監視上傳的文件,直到代碼停止。結構化流使用指定的DBFS位置checkpoint_path
幫助確保上傳的文件隻計算一次。
#定義模式和輸入、檢查點和輸出路徑。read_schema=("id int, "+“firstName字符串,”+" middlelename string, "+"lastName string, "+"性別字符串,"+"birthDate時間戳,"+"ssn string, "+“工資int”)json_read_path=' / FileStore / streaming-uploads / people-10m 'checkpoint_path=“/ tmp /δ/ people-10m /檢查站”save_path=“/ tmp /δ/ people-10m”people_stream=(火花.readStream.模式(read_schema).選項(“maxFilesPerTrigger”,1).選項(“多行”,真正的).格式(“json”).負載(json_read_path))(people_stream.writeStream.格式(“δ”).outputMode(“添加”).選項(“checkpointLocation”,checkpoint_path).開始(save_path))
圖書館(SparkR)sparkR.session()#定義模式和輸入、檢查點和輸出路徑。read_schema="id int, firstName string, middlelename string, lastName string,性別string,出生日期時間戳,ssn string,工資int"json_read_path=“/ FileStore / streaming-uploads / people-10m”checkpoint_path=“/ tmp /δ/ people-10m /檢查站”save_path=“/ tmp /δ/ people-10m”people_stream=read.stream(“json”,路徑=json_read_path,模式=read_schema,多行=真正的,maxFilesPerTrigger=1)write.stream(people_stream,路徑=save_path,模式=“添加”,checkpointLocation=checkpoint_path)
//定義模式和輸入、檢查點和輸出路徑。瓦爾read_schema=("id int, "+“firstName字符串,”+" middlelename string, "+"lastName string, "+"性別字符串,"+"birthDate時間戳,"+"ssn string, "+“工資int”)瓦爾json_read_path=“/ FileStore / streaming-uploads / people-10m”瓦爾checkpoint_path=“/ tmp /δ/ people-10m /檢查站”瓦爾save_path=“/ tmp /δ/ people-10m”瓦爾people_stream=(火花.readStream.模式(read_schema).選項(“maxFilesPerTrigger”,1).選項(“多行”,真正的).格式(“json”).負載(json_read_path))people_stream.writeStream.格式(“δ”).outputMode(“添加”).選項(“checkpointLocation”,checkpoint_path).開始(save_path)
為了測試這種行為,這裏有一個符合要求的JSON文件,您可以將其上傳到json_read_path
,然後查詢位置save_path
查看結構化流寫入的數據。
[{“id”:10000021,“firstName”:“喬”,“middleName”:“亞曆山大”,“姓”:“史密斯”,“性別”:“M”,“生日”:188712000,“ssn”:“123-45-6789”,“工資”:50000},{“id”:10000022,“firstName”:“瑪麗”,“middleName”:“簡”,“姓”:“母鹿”,“性別”:“F”,“生日”:“1968 - 10 - 27 t04:00:00.000 + 000”,“ssn”:“234-56-7890”,“工資”:75500}]
有關Delta Lake與結構化流集成的更多信息,請參見表流讀取和寫入而且生產中的結構化流.參見結構化流媒體編程指南在Apache Spark網站上。
批量插入
方法將一組更新和插入合並到現有的Delta表中合並成聲明。例如,下麵的語句從源表獲取數據,並將其合並到目標Delta表中。當兩個表中都有匹配的行時,Delta Lake使用給定的表達式更新數據列。當沒有匹配的行時,Delta Lake添加一個新行。這個操作被稱為插入.
合並成默認的.people10m使用默認的.people10m_upload在默認的.people10m.id=默認的.people10m_upload.id當匹配然後更新集*當不匹配然後插入*
如果你指定*
,這將更新或插入目標表中的所有列。這假定源表與目標表中的列相同,否則查詢將拋出分析錯誤。
操作時,必須為表中的每一列指定一個值插入
操作(例如,當現有數據集中沒有匹配的行時)。但是,您不需要更新所有的值。
要測試上麵的例子,創建如下的源表:
創建表格默認的.people10m_upload(idINT,firstName字符串,middleName字符串,姓字符串,性別字符串,生日時間戳,ssn字符串,工資INT)使用δ
為了測試當匹配
子句,用以下行填充源表,然後運行前麵的操作合並成
聲明。因為兩個表都有匹配的行在
子句,則更新目標表的匹配行。
插入成默認的.people10m_upload值(9999998,“比利”,“湯米·”,“Luppitt”,“米”,1992 - 09 - 17 t04:00:00.000 + 0000的,“953-38-9452”,55250),(9999999,“伊萊亞斯”,“西裏爾”,“利百特”,“米”,1984 - 05 - 22 t04:00:00.000 + 0000的,“906-51-2137”,48500),(10000000,“約書亞”,‘底盤’,“Broggio”,“米”,1968 - 07 - 22 t04:00:00.000 + 0000的,“988-61-6247”,90000)
要查看結果,請查詢該表。
選擇*從默認的.people10m在哪裏id之間的9999998和10000000排序通過idASC
為了測試當不匹配
子句,用以下行填充源表,然後運行前麵的操作合並成
聲明。因為目標表沒有以下行,所以將這些行添加到目標表中。
插入成默認的.people10m_upload值(20000001,“約翰。”,”,“母鹿”,“米”,1978 - 01 - 14 - t04:00:00.000 + 000的,“345-67-8901”,55500),(20000002,“瑪麗”,”,“史密斯”,“F”,1982 - 10 - 29 t01:00:00.000 + 000,“456-78-9012”,98250),(20000003,“簡”,”,“母鹿”,“F”,1981 - 06 - 25 - t04:00:00.000 + 000的,“567-89-0123”,89900)
要查看結果,請查詢該表。
選擇*從默認的.people10m在哪裏id之間的20000001和20000003排序通過idASC
要在Python、R或Scala中運行上述SQL語句,請將該語句作為字符串參數傳遞給spark.sql
函數在Python或Scala或sql
函數在R中。
讀表格
在本節中:
通過指定DBFS上的路徑(“/ tmp /δ/ people-10m”
)或表名(“default.people10m”
):
人=火花.讀.格式(“δ”).負載(“/ tmp /δ/ people-10m”)顯示(人)
或
人=火花.表格(“default.people10m”)顯示(人)
圖書館(SparkR)sparkR.session()人=read.df(路徑=“/ tmp /δ/ people-10m”,源=“δ”)顯示(人)
或
圖書館(SparkR)sparkR.session()人=tableToDF(“default.people10m”)顯示(人)
瓦爾人=火花.讀.格式(“δ”).負載(“/ tmp /δ/ people-10m”)顯示(人)
或
瓦爾人=火花.表格(“default.people10m”)顯示(人)
選擇*從δ.' /tmp/δ/人-10米`
或
選擇*從默認的.people10m
查詢表的較早版本(時間旅行)
Delta Lake時間旅行允許您查詢Delta表的舊快照。
中指定版本或時間戳以查詢表的舊版本選擇
聲明。例如,要從上麵的曆史記錄中查詢版本0,使用:
火花.sql('從默認選擇*。people10m版本作為的0')
或
火花.sql("SELECT * FROM default. "people10m時間戳作為的“2019-01-29 00:37:58”")
圖書館(SparkR)sparkR.session()sql("SELECT * FROM default. "people10m版本作為的0")
或
圖書館(SparkR)sparkR.session()sql("SELECT * FROM default. "people10m時間戳作為的“2019-01-29 00:37:58”")
火花.sql("SELECT * FROM default. "people10m版本作為的0")
或
火花.sql("SELECT * FROM default. "people10m時間戳作為的“2019-01-29 00:37:58”")
選擇*從默認的.people10m版本作為的0
或
選擇*從默認的.people10m時間戳作為的“2019-01-29 00:37:58”
對於時間戳,隻接受日期或時間戳字符串,例如,“2019-01-01”
而且“2019 - 01 - 01 - 00:00:00.000Z”
.
請注意
因為版本1是在時間戳“2019-01-2900:38:10”
,要查詢版本0,可以使用範圍內的任何時間戳“2019-01-2900:37:58”
來“2019-01-2900:38:09”
包容性。
DataFrameReader選項允許你從固定在表的特定版本的Delta表中創建一個DataFrame,例如在Python中:
df1=火花.讀.格式(“δ”).選項(“timestampAsOf”,“2019-01-01”).負載(“/ tmp /δ/ people-10m”)顯示(df1)
或
df2=火花.讀.格式(“δ”).選項(“versionAsOf”,2).負載(“/ tmp /δ/ people-10m”)顯示(df2)
詳情請參見查詢表的舊快照(時間旅行).
優化表
對表執行多次更改後,可能會有很多小文件。要提高讀取查詢的速度,可以使用優化
把小文件折疊成大文件:
火花.sql(“優化delta. / tmp /δ/ people-10m”)
或
火花.sql(“優化default.people10m”)
圖書館(SparkR)sparkR.session()sql(“優化delta. / tmp /δ/ people-10m”)
或
圖書館(SparkR)sparkR.session()sql(“優化default.people10m”)
火花.sql(“優化delta. / tmp /δ/ people-10m”)
或
火花.sql(“優化default.people10m”)
優化δ.' /tmp/δ/人-10米`
或
優化默認的.people10m
按列的z軸順序
為了進一步提高讀取性能,可以通過z - ordered在同一組文件中共同定位相關信息。Delta Lake數據跳過算法自動使用這種共局部性,以大幅減少需要讀取的數據量。對於Z-Order數據,您可以在ZORDER通過
條款。例如,通過性別
運行:
火花.sql(“優化三角洲。' /tmp/δ/人-10米`ZORDER通過(性別)")
或
火花.sql(“優化默認。people10mZORDER通過(性別)')
圖書館(SparkR)sparkR.session()sql(“優化三角洲。' /tmp/δ/人-10米`ZORDER通過(性別)")
或
圖書館(SparkR)sparkR.session()sql(“優化違約。people10mZORDER通過(性別)")
火花.sql(“優化三角洲。' /tmp/δ/人-10米`ZORDER通過(性別)")
或
火花.sql(“優化違約。people10mZORDER通過(性別)")
優化δ.' /tmp/δ/人-10米`ZORDER通過(性別)
或
優化默認的.people10mZORDER通過(性別)
獲取運行時可用的全套選項優化
,請參閱壓實(裝箱).
清理快照
Delta Lake為讀取提供快照隔離,這意味著它可以安全運行優化
即使其他用戶或作業正在查詢表。不過,最終應該清理舊快照。可以通過運行真空
命令:
火花.sql(“真空default.people10m”)
圖書館(SparkR)sparkR.session()sql(“真空default.people10m”)
火花.sql(“真空default.people10m”)
真空默認的.people10m
控件可以控製最新保留快照的年齡保留< N >小時
選擇:
火花.sql(“真空違約。people10m保留24小時')
圖書館(SparkR)sparkR.session()sql(“真空違約。people10m保留24小時")
火花.sql(“真空違約。people10m保留24小時")
真空默認的.people10m保留24小時
有關使用真空
有效地,看刪除Delta表不再引用的文件.