表批處理讀寫
Delta Lake支持Apache Spark DataFrame讀寫api提供的大部分選項,用於對表進行批量讀寫。
有關Delta Lake SQL命令的信息,請參見
磚運行時7。x,上圖:三角洲湖語句
Databricks Runtime 5.5 LTS和6.x:Databricks Runtime 5.5 LTS和6.x的SQL參考
請注意
下麵的一些代碼示例使用兩級名稱空間表示法,包括模式(也稱為數據庫)和表或視圖(例如,default.people10m
).來使用這些例子統一目錄,將兩級命名空間替換為Unity Catalog三級命名空間表示法,包括目錄、模式和表或視圖(例如,main.default.people10m
).有關更多信息,請參見在Unity Catalog中使用三級命名空間表示法.
創建一個表
Delta Lake支持創建在metastore中定義的兩種表-表和由path定義的表。
可以通過以下方式創建表。
SQL DDL命令:您可以使用Apache Spark中支持的標準SQL DDL命令(例如:
創建表格
而且取代表格
)來創建增量表。創建表格如果不存在默認的.people10m(idINT,firstName字符串,middleName字符串,姓字符串,性別字符串,生日時間戳,ssn字符串,工資INT)使用δ創建或取代表格默認的.people10m(idINT,firstName字符串,middleName字符串,姓字符串,性別字符串,生日時間戳,ssn字符串,工資INT)使用δ
請注意
在Databricks Runtime 8.0及以上版本中,Delta Lake是默認格式,你不需要它
使用δ
.請注意
在Databricks Runtime 7.0及以上版本中,為了避免AWS S3上的最終一致性問題,Databricks建議使用
創建或取代
語法,而不是下降表格
緊隨其後的是一個創建表格
.在Databricks Runtime 7.0及以上版本中,SQL還支持在一個路徑上創建表,而不需要在Hive metastore中創建條目。
——用path創建或替換表創建或取代表格δ.' /tmp/δ/people10m`(idINT,firstName字符串,middleName字符串,姓字符串,性別字符串,生日時間戳,ssn字符串,工資INT)使用δ
DataFrameWriter
API:如果你想同時創建一個表,並從Spark DataFrames或Datasets中插入數據,你可以使用SparkDataFrameWriter
(Scala或Java而且Python).#使用DataFrame的模式在metastore創建表並寫入數據df.寫.格式(“δ”).saveAsTable(“default.people10m”)使用DataFrame的模式創建或替換分區表,並向其寫入/覆蓋數據df.寫.格式(“δ”).模式(“覆蓋”).保存(“/ tmp /δ/ people10m”)
//使用DataFrame的schema在metastore創建表並寫入數據df.寫.格式(“δ”).saveAsTable(“default.people10m”)//使用DataFrame的模式創建帶有path的表,並向其寫入數據df.寫.格式(“δ”).模式(“覆蓋”).保存(“/ tmp /δ/ people10m”)
在Databricks Runtime 8.0及以上版本中,Delta Lake是默認格式,你不需要指定
使用δ
,格式(“δ”)
,或使用(“δ”)
.在Databricks Runtime 7.0及以上版本中,你也可以使用Spark創建Delta表
DataFrameWriterV2
API。
DeltaTableBuilder
API:你也可以使用DeltaTableBuilder
Delta Lake中的API來創建表。與DataFrameWriter API相比,該API更容易指定附加信息,如列注釋、表屬性和生成的列.預覽
這個特性在公共預覽.
請注意
該特性在Databricks Runtime 8.3及以上版本上可用。
#在metastore創建表DeltaTable.createIfNotExists(火花)\.的表(“default.people10m”)\.addColumn(“id”,“INT”)\.addColumn(“firstName”,“字符串”)\.addColumn(“middleName”,“字符串”)\.addColumn(“姓”,“字符串”,評論=“姓”)\.addColumn(“性別”,“字符串”)\.addColumn(“生日”,“時間戳”)\.addColumn(“ssn”,“字符串”)\.addColumn(“工資”,“INT”)\.執行()#創建或替換表路徑和添加屬性DeltaTable.createOrReplace(火花)\.addColumn(“id”,“INT”)\.addColumn(“firstName”,“字符串”)\.addColumn(“middleName”,“字符串”)\.addColumn(“姓”,“字符串”,評論=“姓”)\.addColumn(“性別”,“字符串”)\.addColumn(“生日”,“時間戳”)\.addColumn(“ssn”,“字符串”)\.addColumn(“工資”,“INT”)\.財產(“描述”,“人員數據表”)\.位置(“/ tmp /δ/ people10m”)\.執行()
//在metastore上創建表DeltaTable.createOrReplace(火花).的表(“default.people10m”).addColumn(“id”,“INT”).addColumn(“firstName”,“字符串”).addColumn(“middleName”,“字符串”).addColumn(DeltaTable.columnBuilder(“姓”).數據類型(“字符串”).評論(“姓”).構建()).addColumn(“姓”,“字符串”,評論=“姓”).addColumn(“性別”,“字符串”).addColumn(“生日”,“時間戳”).addColumn(“ssn”,“字符串”).addColumn(“工資”,“INT”).執行()//創建或替換table為path並添加屬性DeltaTable.createOrReplace(火花).addColumn(“id”,“INT”).addColumn(“firstName”,“字符串”).addColumn(“middleName”,“字符串”).addColumn(DeltaTable.columnBuilder(“姓”).數據類型(“字符串”).評論(“姓”).構建()).addColumn(“姓”,“字符串”,評論=“姓”).addColumn(“性別”,“字符串”).addColumn(“生日”,“時間戳”).addColumn(“ssn”,“字符串”).addColumn(“工資”,“INT”).財產(“描述”,“人員數據表”).位置(“/ tmp /δ/ people10m”).執行()
看到API文檔獲取詳細信息。
另請參閱創建一個表.
對數據進行分區
您可以對數據進行分區,以加快包含涉及分區列的謂詞的查詢或DML。要在創建增量表時對數據進行分區,請按列指定分區。下麵的示例按性別進行分區。
——在metastore中創建表創建表格默認的.people10m(idINT,firstName字符串,middleName字符串,姓字符串,性別字符串,生日時間戳,ssn字符串,工資INT)使用δ分區通過(性別)
df.寫.格式(“δ”).partitionBy(“性別”).saveAsTable(“default.people10m”)DeltaTable.創建(火花)\.的表(“default.people10m”)\.addColumn(“id”,“INT”)\.addColumn(“firstName”,“字符串”)\.addColumn(“middleName”,“字符串”)\.addColumn(“姓”,“字符串”,評論=“姓”)\.addColumn(“性別”,“字符串”)\.addColumn(“生日”,“時間戳”)\.addColumn(“ssn”,“字符串”)\.addColumn(“工資”,“INT”)\.partitionedBy(“性別”)\.執行()
df.寫.格式(“δ”).partitionBy(“性別”).saveAsTable(“default.people10m”)DeltaTable.createOrReplace(火花).的表(“default.people10m”).addColumn(“id”,“INT”).addColumn(“firstName”,“字符串”).addColumn(“middleName”,“字符串”).addColumn(DeltaTable.columnBuilder(“姓”).數據類型(“字符串”).評論(“姓”).構建()).addColumn(“姓”,“字符串”,評論=“姓”).addColumn(“性別”,“字符串”).addColumn(“生日”,“時間戳”).addColumn(“ssn”,“字符串”).addColumn(“工資”,“INT”).partitionedBy(“性別”).執行()
要確定一個表是否包含一個特定的分區,可以使用該語句選擇COUNT (*)>0從<表名稱>在哪裏<劃分字段>=<值>
.如果分區存在,真正的
返回。例如:
選擇數(*)>0作為`分區存在`從默認的.people10m在哪裏性別=“M”
顯示(火花.sql("SELECT COUNT(*) > 0 AS ' Partition exists ' FROM default. "people10m WHERE gender = 'M'))
顯示(火花.sql("SELECT COUNT(*) > 0 AS ' Partition exists ' FROM default. "people10m WHERE gender = 'M'))
控製數據的位置
對於在metastore中定義的表,您可以選擇指定位置
作為一個路徑。使用指定創建的表位置
被認為是不受轉移灶控製的。與未指定路徑的托管表不同,非托管表的文件不會被刪除下降
桌子上。
當您運行創建表格
與一個位置
那已經包含使用Delta Lake存儲的數據,Delta Lake的功能如下:
如果您指定隻有表名和位置,例如:
創建表格默認的.people10m使用δ位置“/ tmp /δ/ people10m”
metastore中的表自動繼承現有數據的模式、分區和表屬性。這個功能可以用來“導入”數據到metastore。
如果您指定任何配置(模式、分區或表屬性),Delta Lake驗證規範是否與現有數據的配置完全匹配。
重要的
如果指定的配置不存在完全如果匹配數據的配置,Delta Lake會拋出一個異常來描述差異。
請注意
亞穩態不是Delta表最新信息的真實來源。事實上,metastore中的表定義可能不包含所有的元數據,比如模式和屬性。它包含表的位置,該位置上的表事務日誌是真相的來源。如果您從一個不知道這種特定於增量的定製的係統查詢metastore,您可能會看到不完整或陳舊的表信息。
使用生成的列
預覽
這個特性在公共預覽.
請注意
該特性在Databricks Runtime 8.3及以上版本上可用。
Delta Lake支持生成列,這是一種特殊類型的列,其值是根據用戶指定的函數自動生成的,而不是Delta表中的其他列。當寫入具有生成列的表且沒有顯式地為它們提供值時,Delta Lake會自動計算這些值。例如,您可以從時間戳列自動生成一個日期列(用於按日期對表進行分區);任何對表的寫入都隻需要指定時間戳列的數據。但是,如果顯式地為它們提供值,則這些值必須滿足約束(<值>< = ><代表達式>)是真正的
否則寫操作將失敗並報錯。
重要的
用生成的列創建的表具有比默認更高的表寫入協議版本。看到表協議版本為了理解表協議版本控製以及表協議版本的更高版本意味著什麼。
下麵的例子展示了如何使用生成的列創建一個表:
創建表格默認的.people10m(idINT,firstName字符串,middleName字符串,姓字符串,性別字符串,生日時間戳,dateOfBirth日期生成的總是作為(投(生日作為日期)),ssn字符串,工資INT)使用δ分區通過(性別)
DeltaTable.創建(火花)\.的表(“default.people10m”)\.addColumn(“id”,“INT”)\.addColumn(“firstName”,“字符串”)\.addColumn(“middleName”,“字符串”)\.addColumn(“姓”,“字符串”,評論=“姓”)\.addColumn(“性別”,“字符串”)\.addColumn(“生日”,“時間戳”)\.addColumn(“dateOfBirth”,DateType(),generatedAlwaysAs=“鑄(生日日期)”)\.addColumn(“ssn”,“字符串”)\.addColumn(“工資”,“INT”)\.partitionedBy(“性別”)\.執行()
DeltaTable.創建(火花).的表(“default.people10m”).addColumn(“id”,“INT”).addColumn(“firstName”,“字符串”).addColumn(“middleName”,“字符串”).addColumn(DeltaTable.columnBuilder(“姓”).數據類型(“字符串”).評論(“姓”).構建()).addColumn(“姓”,“字符串”,評論=“姓”).addColumn(“性別”,“字符串”).addColumn(“生日”,“時間戳”).addColumn(DeltaTable.columnBuilder(“dateOfBirth”).數據類型(DateType).generatedAlwaysAs(“鑄(dateOfBirth日期)”).構建()).addColumn(“ssn”,“字符串”).addColumn(“工資”,“INT”).partitionedBy(“性別”).執行()
生成的列將像普通列一樣存儲。也就是說,它們占用存儲空間。
以下限製適用於生成的列:
生成表達式可以使用Spark中的任何SQL函數,當給定相同的參數值時,總是返回相同的結果,除了以下類型的函數:
用戶自定義函數。
聚合函數。
窗口函數。
函數返回多行。
對於Databricks Runtime 9.1及以上版本,
合並
操作支持設置時生成的列spark.databricks.delta.schema.autoMerge.enabled
為true。
在支持Photon的Databricks Runtime 8.4和以上版本中,Delta Lake可以為一個查詢生成分區過濾器,隻要一個分區列是由以下表達式之一定義的:
鑄造(坳作為日期)
而這種類型上校
是時間戳
.(col)
而這種類型上校
是時間戳
.定義的兩個分區列
年(col),月(col)
而這種類型上校
是時間戳
.定義的三個分區列
年(col),月(col),天(col)
而這種類型上校
是時間戳
.定義的四個分區列
年(col),月(col),天(col),小時(col)
而這種類型上校
是時間戳
.SUBSTRING(坳,pos機,蘭)
而這種類型上校
是字符串
DATE_FORMAT(坳,格式)
而這種類型上校
是時間戳
.
如果分區列由上述表達式之一定義,並且查詢使用生成表達式的基礎基列過濾數據,那麼Delta Lake會查看基列和生成的列之間的關係,並在可能的情況下基於生成的分區列填充分區過濾器。例如,給定下表:
創建表格事件(eventId長整型數字,數據字符串,eventType字符串,eventTime時間戳,eventDate日期生成的總是作為(投(eventTime作為日期)))使用δ分區通過(eventType,eventDate)
如果你運行以下查詢:
選擇*從事件在哪裏eventTime> =“2020-10-01”就是<=“2020-10-01 12:00:00”
Delta Lake自動生成一個分區篩選器,因此前麵的查詢隻讀取分區中的數據日期= 2020-10-01
即使沒有指定分區篩選器。
再舉一個例子,請看下表:
創建表格事件(eventId長整型數字,數據字符串,eventType字符串,eventTime時間戳,一年INT生成的總是作為(一年(eventTime)),月INT生成的總是作為(月(eventTime)),一天INT生成的總是作為(一天(eventTime)))使用δ分區通過(eventType,一年,月,一天)
如果你運行以下查詢:
選擇*從事件在哪裏eventTime> =“2020-10-01”就是<=“2020-10-01 12:00:00”
Delta Lake自動生成一個分區篩選器,因此前麵的查詢隻讀取分區中的數據年= 2020 /月= 10 /天= 1
即使沒有指定分區篩選器。
你可以使用解釋子句並檢查提供的計劃,以查看Delta Lake是否自動生成任何分區過濾器。
在列名中使用特殊字符
默認情況下,特殊字符,如空格和任意字符, {} () \ n \ t =
在表列名中不支持。若要在表的列名中包含這些特殊字符,請啟用列映射.
默認表屬性
在SparkSession中設置的Delta Lake配置將覆蓋默認配置表屬性用於在會話中創建的新Delta Lake表。SparkSession中使用的前綴與表屬性中使用的配置不同。
三角洲湖相依 |
SparkSession相依 |
---|---|
|
|
例如,設置delta.appendOnly=真正的
屬性為會話中創建的所有新Delta Lake表設置如下:
集火花.磚.δ.屬性.違約.appendOnly=真正的
要修改現有表的表屬性,請使用設置TBLPROPERTIES.
讀一個表
通過指定表名或路徑,可以將Delta表作為DataFrame加載:
選擇*從默認的.people10m——metastore中的查詢表選擇*從δ.' /tmp/δ/people10m`——按路徑查詢表
火花.表格(“default.people10m”)# metastore中的查詢表火花.讀.格式(“δ”).負載(“/ tmp /δ/ people10m”)#按路徑查詢表
火花.表格(“default.people10m”)//查看metastore中的查詢表火花.讀.格式(“δ”).負載(“/ tmp /δ/ people10m”)//按路徑創建表進口io.δ.值得一提的._火花.讀.δ(“/ tmp /δ/ people10m”)
返回的DataFrame為任何查詢自動讀取表的最近快照;你永遠不需要逃跑刷新表格
.當查詢中有可應用的謂詞時,Delta Lake自動使用分區和統計數據讀取最小數量的數據。
查詢表的舊快照(時間旅行)
Delta Lake時間旅行允許查詢Delta表的舊快照。時間旅行有很多用例,包括:
重新創建分析、報告或輸出(例如,機器學習模型的輸出)。這對於調試或審計很有用,特別是在受管製的行業中。
編寫複雜的時態查詢。
修正數據中的錯誤。
為快速更改表的一組查詢提供快照隔離。
本節描述查詢舊版本表、數據保留問題的支持方法,並提供示例。
語法
本節展示如何查詢Delta表的舊版本。
在本節中:
SQL作為的
語法
選擇*從table_name時間戳作為的timestamp_expression選擇*從table_name版本作為的版本
在哪裏
timestamp_expression
可以是以下任何一個:2018 - 10 - 18 t22:15:12.013z
,即可以轉換為時間戳的字符串鑄造(' 2018-10-1813:36:32c '作為時間戳)
“2018-10-18”
,即日期字符串在Databricks Runtime 6.6及以上版本中:
current_timestamp ()-時間間隔12小時
date_sub(當前日期(),1)
被轉換為時間戳的或可以轉換為時間戳的任何其他表達式
版本
的輸出是否可以得到一個長值描述曆史table_spec
.
既不timestamp_expression
也不版本
子查詢。
例子
選擇*從默認的.people10m時間戳作為的2018 - 10 - 18 t22:15:12.013z選擇*從δ.' /tmp/δ/people10m`版本作為的123
DataFrameReader選項
DataFrameReader選項允許您從固定到表的特定版本的Delta表創建DataFrame。
df1=火花.讀.格式(“δ”).選項(“timestampAsOf”,timestamp_string).負載(“/ tmp /δ/ people10m”)df2=火花.讀.格式(“δ”).選項(“versionAsOf”,版本).負載(“/ tmp /δ/ people10m”)
為timestamp_string
,隻接受日期或時間戳字符串。例如,“2019-01-01”
而且“2019 - 01 - 01 t00:00:00.000z”
.
一種常見的模式是在執行Databricks作業期間使用Delta表的最新狀態來更新下遊應用程序。
由於Delta表會自動更新,所以如果底層數據更新,從Delta表加載的DataFrame可能會在調用之間返回不同的結果。通過使用時間旅行,你可以跨調用修複DataFrame返回的數據:
latest_version=火花.sql("SELECT max(version) FROM (DESCRIBE HISTORY delta. ' /tmp/delta/people10m ')").收集()df=火花.讀.格式(“δ”).選項(“versionAsOf”,latest_version[0] [0]).負載(“/ tmp /δ/ people10m”)
@
語法
您可能有一個參數化的管道,其中管道的輸入路徑是作業的參數。在作業執行之後,您可能希望在將來的某個時候重現輸出。在這種情況下,您可以使用@
指定時間戳或版本的語法。時間戳必須為inyyyyMMddHHmmssSSS
格式。之後可以指定版本@
通過將一個v
的版本。例如,查詢版本號123
為表people10m
,指定people10m@v123
.
選擇*從默認的.people10m@20190101000000000選擇*從默認的.people10m@v123
火花.讀.格式(“δ”).負載(“/ tmp /δ/ people10m@20190101000000000”)#表號2019-01-01 00:00:00.000火花.讀.格式(“δ”).負載(“/ tmp /δ/ people10m@v123”)#表在123版本
例子
修複對用戶表的意外刪除
111
:
插入成my_table選擇*從my_table時間戳作為的date_sub(當前日期(),1)在哪裏用戶標識=111
修複對表的意外錯誤更新:
合並成my_table目標使用my_table時間戳作為的date_sub(當前日期(),1)源在源.用戶標識=目標.用戶標識當匹配然後更新集*
查詢最近一周新增客戶數。beplay体育app下载地址
選擇數(截然不同的用戶標識)-(選擇數(截然不同的用戶標識)從my_table時間戳作為的date_sub(當前日期(),7))
數據保留
要穿越到以前的版本,你必須保留這兩個該版本的日誌和數據文件。
支持Delta表的數據文件是從來沒有自動刪除;隻有運行時才會刪除數據文件真空.真空
不刪除增量日誌文件;日誌文件在寫入檢查點後會自動清理。
默認情況下,您可以時間旅行到一個Delta表,最多30天,除非您有:
運行
真空
放在你的Delta桌上。使用以下方法更改數據或日誌文件的保留期表屬性:
delta.logRetentionDuration=“間隔< >間隔”
:控製表的曆史記錄保存多長時間。默認值是時間間隔30.天
.每次寫入檢查點時,Databricks自動清除比保留時間間隔更早的日誌條目。如果將此配置設置為足夠大的值,將保留許多日誌條目。這不會影響性能,因為對日誌的操作時間是恒定的。曆史上的操作是並行的,但隨著日誌大小的增加,成本會變得越來越高。
delta.deletedFileRetentionDuration=“間隔< >間隔”
:控製文件被刪除的時間在成為候選人之前真空
.默認值是時間間隔7天
.以訪問30天的曆史數據,即使您運行
真空
在Delta桌上,準備好delta.deletedFileRetentionDuration=“間隔30.天”
.這個設置可能會導致存儲成本上升。
寫入表
附加
要以原子方式向現有Delta表添加新數據,請使用附加
模式:
插入成默認的.people10m選擇*從更多人
df.寫.格式(“δ”).模式(“添加”).保存(“/ tmp /δ/ people10m”)df.寫.格式(“δ”).模式(“添加”).saveAsTable(“default.people10m”)
df.寫.格式(“δ”).模式(“添加”).保存(“/ tmp /δ/ people10m”)df.寫.格式(“δ”).模式(“添加”).saveAsTable(“default.people10m”)進口io.δ.值得一提的._df.寫.模式(“添加”).δ(“/ tmp /δ/ people10m”)
覆蓋
要以原子方式替換表中的所有數據,請使用覆蓋
模式:
插入覆蓋表格默認的.people10m選擇*從更多人
df.寫.格式(“δ”).模式(“覆蓋”).保存(“/ tmp /δ/ people10m”)df.寫.格式(“δ”).模式(“覆蓋”).saveAsTable(“default.people10m”)
df.寫.格式(“δ”).模式(“覆蓋”).保存(“/ tmp /δ/ people10m”)df.寫.格式(“δ”).模式(“覆蓋”).saveAsTable(“default.people10m”)進口io.δ.值得一提的._df.寫.模式(“覆蓋”).δ(“/ tmp /δ/ people10m”)
使用DataFrames,您還可以選擇性地隻覆蓋與任意表達式匹配的數據。此特性在Databricks Runtime 9.1 LTS及以上。下麵的命令自動替換目標表中1月份的事件,目標表分區為start_date
,有了數據df
:
df.寫\.格式(“δ”)\.模式(“覆蓋”)\.選項(“replaceWhere”,"start_date >= '2017-01-01' AND end_date <= '2017-01-31'")\.保存(“/ tmp /δ/事件”)
df.寫.格式(“δ”).模式(“覆蓋”).選項(“replaceWhere”,"start_date >= '2017-01-01' AND end_date <= '2017-01-31'").保存(“/ tmp /δ/事件”)
這個示例代碼將數據寫入其中df
,驗證它是否與謂詞匹配,並執行原子替換。如果希望寫出與謂詞不完全匹配的數據,那麼要替換目標表中匹配的行,可以通過設置禁用約束檢查spark.databricks.delta.replaceWhere.constraintCheck.enabled
假:
火花.相依.集(“spark.databricks.delta.replaceWhere.constraintCheck.enabled”,假)
火花.相依.集(“spark.databricks.delta.replaceWhere.constraintCheck.enabled”,假)
在Databricks Runtime 9.0及以下版本中,replaceWhere
僅在分區列上覆蓋與謂詞匹配的數據。下麵的命令自動替換目標表中的一月月份,目標表分區為日期
,有了數據df
:
df.寫\.格式(“δ”)\.模式(“覆蓋”)\.選項(“replaceWhere”,"birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")\.保存(“/ tmp /δ/ people10m”)
df.寫.格式(“δ”).模式(“覆蓋”).選項(“replaceWhere”,"birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'").保存(“/ tmp /δ/ people10m”)
在Databricks Runtime 9.1及以上版本中,如果你想恢複到以前的行為,你可以禁用spark.databricks.delta.replaceWhere.dataColumns.enabled
國旗:
火花.相依.集(“spark.databricks.delta.replaceWhere.dataColumns.enabled”,假)
火花.相依.集(“spark.databricks.delta.replaceWhere.dataColumns.enabled”,假)
動態分區覆蓋
預覽
這個特性在公共預覽.
Databricks Runtime 11.1及以上版本支持動態分區表的分區覆蓋模式。
在動態分區覆蓋模式下,我們覆蓋每個邏輯分區中的所有現有數據,寫入操作將為這些分區提交新數據。寫入不包含數據的任何現有邏輯分區將保持不變。該模式僅適用於在寫數據時采用覆蓋模式:“任意一種”插入覆蓋
或DataFrame寫入df.write.mode(“覆蓋”)
.
通過配置Spark會話配置來配置動態分區覆蓋方式spark.sql.sources.partitionOverwriteMode
來動態
.也可以通過設置DataFrameWriter
選項partitionOverwriteMode
來動態
.如果存在,查詢特定選項將覆蓋會話配置中定義的模式。的默認值partitionOverwriteMode
是靜態
.
集火花.sql.來源.partitionOverwriteMode=動態;插入覆蓋表格默認的.people10m選擇*從更多人;
df.寫\.格式(“δ”)\.模式(“覆蓋”)\.選項(“partitionOverwriteMode”,“動態”)\.saveAsTable(“default.people10m”)
df.寫.格式(“δ”).模式(“覆蓋”).選項(“partitionOverwriteMode”,“動態”).saveAsTable(“default.people10m”)
請注意
動態分區覆蓋與該選項衝突replaceWhere
分區表。
如果在Spark會話配置中啟用了動態分區覆蓋,並且
replaceWhere
是作為DataFrameWriter
選項,則Delta Lake將根據replaceWhere
表達式(查詢特定選項覆蓋會話配置)。您將收到一個錯誤,如果
DataFrameWriter
選項有動態分區覆蓋和replaceWhere
啟用。
重要的
驗證使用動態分區覆蓋寫入的數據隻觸及預期的分區。錯誤分區中的一行可能會導致無意中覆蓋整個分區。我們建議使用replaceWhere
指定要覆蓋的數據。
如果分區被意外覆蓋,您可以使用在Spark會話中找到上次提交的版本撤消更改。
有關Delta Lake對更新表的支持,請參見表刪除、更新和合並.
限製寫入文件的行
您可以使用SQL會話配置spark.sql.files.maxRecordsPerFile
來指定Delta Lake表寫入單個文件的最大記錄數。指定0或負值表示沒有限製。
在Databricks Runtime 10.5及以上版本中,你也可以使用DataFrameWriter選項maxRecordsPerFile
當使用DataFrame api寫入Delta Lake表時。當maxRecordsPerFile
SQL會話配置的值spark.sql.files.maxRecordsPerFile
將被忽略。
df.寫.格式(“δ”)\.模式(“添加”)\.選項(“maxRecordsPerFile”,“10000”)\.保存(“/ tmp /δ/ people10m”)
df.寫.格式(“δ”).模式(“添加”).選項(“maxRecordsPerFile”,“10000”).保存(“/ tmp /δ/ people10m”)
冪等寫道
有時,一個向Delta表寫入數據的作業會因為各種原因(例如,作業遇到了故障)重新啟動。失敗的作業在終止之前可能已經將數據寫入Delta表,也可能沒有。在將數據寫入增量表的情況下,重新啟動的作業將相同的數據寫入增量表,從而產生重複的數據。
為了解決這個問題,增量表支持以下功能DataFrameWriter
使寫入等冪的選項:
txnAppId
:一個唯一的字符串,您可以傳遞給每個DataFrame
寫。例如,這可以是作業的名稱。txnVersion
:作為事務版本的單調遞增的數字。對於寫入增量表的數據,這個數字需要是唯一的。例如,這可以是第一次嚐試查詢時的epoch秒。對同一作業的任何後續重啟都需要具有相同的值txnVersion
.
對於被攝取到Delta表和Delta表中的每個新數據,上述選項組合必須是唯一的txnVersion
需要高於上次攝取到Delta表中的數據。例如:
最後一次成功寫入的數據包含選項值為
dailyETL: 23423
(txnAppId: txnVersion
).下一次寫數據時應該有
txnAppId=dailyETL
而且txnVersion
至少23424
(比上次寫入的數據多一個txnVersion
).使用。寫入數據的任何嚐試
txnAppId=dailyETL
而且txnVersion
作為23422
或更少被忽略,因為txnVersion
比上次記錄的少嗎txnVersion
在表中。嚐試用。寫入數據
txnAppId: txnVersion
作為anotherETL: 23424
是否成功寫入數據到包含不同數據的表txnAppId
與最後攝取的數據中相同的選項值進行比較。
警告
這個解決方案假設在作業的多次重試中寫入Delta表的數據是相同的。如果Delta表中的一個寫嚐試成功了,但由於某些下遊失敗,出現了具有相同txn選項但數據不同的第二次寫嚐試,那麼第二次寫嚐試將被忽略。這可能會導致意想不到的結果。
設置用戶自定義提交元數據
您可以使用DataFrameWriter選項在這些操作的提交中指定用戶定義的字符串作為元數據userMetadata
或SparkSession配置spark.databricks.delta.commitInfo.userMetadata
.如果指定了這兩個選項,則該選項優先。中的用戶定義元數據是可讀的曆史操作。
集火花.磚.δ.commitInfo.userMetadata=覆蓋-為-修複-不正確的-數據插入覆蓋默認的.people10m選擇*從更多人
df.寫.格式(“δ”)\.模式(“覆蓋”)\.選項(“userMetadata”,“overwritten-for-fixing-incorrect-data”)\.保存(“/ tmp /δ/ people10m”)
df.寫.格式(“δ”).模式(“覆蓋”).選項(“userMetadata”,“overwritten-for-fixing-incorrect-data”).保存(“/ tmp /δ/ people10m”)
模式驗證
Delta Lake自動驗證正在寫入的DataFrame的模式與表的模式是否兼容。Delta Lake使用以下規則來確定從DataFrame寫入表是否兼容:
所有DataFrame列必須存在於目標表中。如果DataFrame中的列不在表中,則會引發異常。表中存在但DataFrame中沒有的列被設為空。
DataFrame列數據類型必須與目標表中的列數據類型匹配。如果它們不匹配,就會引發異常。
DataFrame列名不能隻按大小寫不同。這意味著不能在同一個表中定義“Foo”和“Foo”等列。雖然可以在區分大小寫或不區分大小寫(默認)的模式下使用Spark,但Parquet在存儲和返回列信息時是區分大小寫的。Delta Lake是保留大小寫的,但在存儲模式時不敏感,並且有這個限製以避免潛在的錯誤、數據損壞或丟失問題。
Delta Lake支持DDL顯式添加新列,並支持自動更新模式。
如果指定其他選項,例如partitionBy
,結合附加模式,Delta Lake將驗證它們是否匹配,並為任何不匹配拋出錯誤。當partitionBy
不存在的情況下,自動追加現有數據的分區。
請注意
在Databricks Runtime 7.0及以上版本中,插入
語法提供模式強製並支持模式演化。如果列的數據類型無法安全地轉換為Delta Lake表的數據類型,則會引發運行時異常。如果模式演化啟用時,新列可以作為模式的最後一列(或嵌套列)存在,以便模式發展。
有關Delta Lake加強和發展模式的更多信息,請觀看YouTube視頻(55分鍾)。
更新表模式
Delta Lake允許您更新表的模式。支持以下類型的更改:
添加新列(在任意位置)
重新安排現有的列
重命名現有列
可以使用DDL顯式地進行這些更改,也可以使用DML隱式地進行這些更改。
重要的
更新Delta表模式時,從該表讀取數據的流將終止。如果要繼續流,必須重新啟動它。
推薦的方法請參見數據裏克上結構化流媒體應用的生產注意事項.
明確更新模式
可以使用以下DDL顯式地更改表的模式。
添加列
改變表格table_name添加列(col_namedata_type[評論col_comment][第一個|後colA_name],…)
缺省情況下,為nullability真正的
.
要向嵌套字段添加列,請使用:
改變表格table_name添加列(col_name.nested_col_namedata_type[評論col_comment][第一個|後colA_name],…)
更改列注釋或排序
改變表格table_name改變[列]col_namecol_namedata_type[評論col_comment][第一個|後colA_name]
要更改嵌套字段中的列,請使用:
改變表格table_name改變[列]col_name.nested_col_namenested_col_namedata_type[評論col_comment][第一個|後colA_name]
重命名列
預覽
這個特性在公共預覽.
請注意
該特性在Databricks Runtime 10.2及以上版本中可用。
若要重命名列而不重寫列的任何現有數據,必須為表啟用列映射。看到三角洲列映射.
重命名列:
改變表格table_name重命名列old_col_name來new_col_name
重命名嵌套字段:
改變表格table_name重命名列col_name.old_nested_field來new_nested_field
看到三角洲列映射.
刪除列
預覽
這個特性在公共預覽.
請注意
該特性在Databricks Runtime 11.0及以上版本中可用。
要將列作為僅包含元數據的操作刪除而不重寫任何數據文件,必須為表啟用列映射。看到三角洲列映射.
刪除一列:
改變表格table_name下降列col_name
刪除多個列:
改變表格table_name下降列(col_name_1,col_name_2)
自動模式更新
Delta Lake可以作為DML事務的一部分自動更新表的模式(追加或覆蓋),並使模式與正在寫入的數據兼容。
替換表模式
默認情況下,覆蓋表中的數據不會覆蓋模式。當重寫表時使用模式(“覆蓋”)
沒有replaceWhere
,您可能仍然希望覆蓋正在寫入的數據的模式。可以通過設置。替換表的模式和分區overwriteSchema
選項真正的
:
df.寫.選項(“overwriteSchema”,“真正的”)
表意見
Delta Lake支持在Delta表上創建視圖,就像使用數據源表一樣。
這些視圖與訪問控製表以允許列級和行級安全性。
使用視圖進行操作時的核心挑戰是解析模式。如果更改Delta表模式,則必須重新創建派生視圖,以處理模式中添加的任何內容。例如,如果向Delta表添加新列,則必須確保該列在基表之上構建的適當視圖中可用。
表屬性
您可以使用將自己的元數據存儲為表屬性TBLPROPERTIES
在創建
而且改變
.然後,您可以顯示
元數據。例如:
改變表格默認的.people10m集TBLPROPERTIES(“部門”=“會計”,“delta.appendOnly”=“真正的”);顯示表的屬性。顯示TBLPROPERTIES默認的.people10m;隻顯示'department'表屬性。顯示TBLPROPERTIES默認的.people10m(“部門”);
TBLPROPERTIES
作為增量表元數據的一部分存儲。不能定義newTBLPROPERTIES
在一個創建
語句,如果Delta表已經存在於給定位置。
此外,為了調整行為和性能,Delta Lake支持某些Delta表屬性:
塊在Delta表中刪除和更新:
delta.appendOnly = true
.配置時間旅行保留屬性:
delta.logRetentionDuration = < interval-string >
而且delta.deletedFileRetentionDuration = < interval-string >
.有關詳細信息,請參見數據保留.配置統計的列數:
delta.dataSkippingNumIndexedCols = n
.此屬性向編寫器指示隻收集第一個的統計信息n
表中的列。此外,數據跳過代碼會忽略此列索引以外的任何列的統計信息。此屬性僅對寫入的新數據有效。
隨機化文件前綴以避免S3元數據中的熱點:
delta.randomizeFilePrefixes = true
.對於需要大量(每秒數千個請求)快速讀寫操作的表,我們強烈建議將一個S3 bucket專用於一個表(將表定位在bucket的根),並啟用隨機的文件前綴以獲得最佳體驗。
請注意
修改增量表屬性是一個寫操作,會與其他操作發生衝突並發寫操作,導致他們失敗。我們建議隻有在表上沒有並發寫操作時才修改表屬性。
您還可以設置三角洲。
-prefix - properties,在第一次使用Spark配置提交Delta表時使用。例如,用屬性初始化一個Delta表delta.appendOnly = true
,設置Spark配置spark.databricks.delta.properties.defaults.appendOnly
來真正的
.例如:
火花.sql("SET spark.databricks.delta.properties.defaults.appendOnly = true")
火花.相依.集(“spark.databricks.delta.properties.defaults.appendOnly”,“真正的”)
火花.相依.集(“spark.databricks.delta.properties.defaults.appendOnly”,“真正的”)
看到也增量表屬性引用.
表元數據
Delta Lake具有豐富的功能來探索表元數據。
它支持顯示(分區|列)
而且描述表格
.看到
它還提供了以下獨特的命令:
描述曆史
提供來源信息,包括操作、用戶等,以及每次寫入表的操作指標。表曆史保留30天。有關詳細信息,請參見檢索增量表曆史.
的使用Data選項卡瀏覽和創建表提供了Delta表詳細表信息和曆史的可視化視圖。除了表模式和示例數據之外,還可以單擊曆史選項來查看顯示的表曆史記錄描述曆史
.
配置存儲憑證
Delta Lake使用Hadoop FileSystem api訪問存儲係統。存儲係統的信用通常可以通過Hadoop配置進行設置。Delta Lake提供了多種設置Hadoop配置的方法,類似於Apache Spark。
火花配置
當在集群中啟動Spark應用程序時,可以通過如下方式設置Spark配置spark.hadoop。*
傳遞您的自定義Hadoop配置。例如,設置spark.hadoop.a.b.c
是否將該值作為Hadoop配置傳遞a.b.c
, Delta Lake將使用它來訪問Hadoop文件係統api。
看到__為更多的細節。
SQL會話配置
Spark SQL將通過當前所有的SQL會話配置Delta Lake將使用它們來訪問Hadoop文件係統api。例如,集a.b.c = x.y.z
會告訴三角洲湖通的價值嗎x.y.z
作為Hadoop配置a.b.c
, Delta Lake將使用它來訪問Hadoop文件係統api。
DataFrame選項
除了通過Spark(集群)配置或SQL會話配置設置Hadoop文件係統配置外,Delta還支持從DataFrameReader
而且DataFrameWriter
選項(即以fs。
前綴)時,可以使用DataFrameReader.load(路徑)
或DataFrameWriter.save(路徑)
.
請注意
該特性在Databricks Runtime 10.1及以上版本中可用。
例如,你可以通過DataFrame選項傳遞你的存儲憑據:
df1=火花.讀.格式(“δ”)\.選項(“fs.s3a.access.key”,“< access-key-1 >”)\.選項(“fs.s3a.secret.key”,“< secret-key-1 >”)\.讀(“…”)df2=火花.讀.格式(“δ”)\.選項(“fs.s3a.access.key”,“< access-key-1 >”)\.選項(“fs.s3a.secret.key”,“< secret-key-2 >”)\.讀(“…”)df1.聯盟(df2).寫.格式(“δ”)\.模式(“覆蓋”)\.選項(“fs.s3a.access.key”,“< access-key-3 >”)\.選項(“fs.s3a.secret.key”,“< secret-key-3 >”)\.保存(“…”)
瓦爾df1=火花.讀.格式(“δ”).選項(“fs.s3a.access.key”,“< access-key-1 >”).選項(“fs.s3a.secret.key”,“< secret-key-1 >”).讀(“…”)瓦爾df2=火花.讀.格式(“δ”).選項(“fs.s3a.access.key”,“< access-key-2 >”).選項(“fs.s3a.secret.key”,“< secret-key-2 >”).讀(“…”)df1.聯盟(df2).寫.格式(“δ”).模式(“覆蓋”).選項(“fs.s3a.access.key”,“< access-key-3 >”).選項(“fs.s3a.secret.key”,“< secret-key-3 >”).保存(“…”)
可以在中找到用於存儲的Hadoop文件係統配置的詳細信息數據源.