在Auto Loader中配置模式推斷和進化
您可以配置Auto Loader來自動檢測加載數據的模式,允許您在不顯式聲明數據模式的情況下初始化表,並在引入新列時改進表模式。這消除了隨著時間的推移手動跟蹤和應用模式更改的需要。
Auto Loader還可以“挽救”JSON blob列中意外的數據(例如,不同的數據類型),您可以選擇稍後使用半結構化數據訪問api.
模式推斷和進化支持以下格式:
文件格式 |
支持版本 |
---|---|
|
Databricks Runtime 8.2及以上版本 |
|
Databricks Runtime 8.3及以上版本 |
|
Databricks運行時10.2及以上 |
|
Databricks運行時11.1及以上版本 |
|
不支持的 |
|
不適用(固定模式) |
|
不適用(固定模式) |
模式推斷和進化的語法
指定選項的目標目錄cloudFiles.schemaLocation
啟用模式推斷和進化。屬性指定的相同目錄checkpointLocation
.如果你使用Delta活動表, Databricks自動管理模式位置和其他檢查點信息。
請注意
如果將多個源數據位置加載到目標表中,則每個Auto Loader攝取工作負載都需要一個單獨的流檢查點。
下麵的示例使用拚花
為cloudFiles.format
.使用csv
,avro
,或json
對於其他文件源。對於每種格式的默認行為,讀寫的所有其他設置保持不變。
(火花.readStream.格式(“cloudFiles”).選項(“cloudFiles.format”,“鋪”)#模式位置目錄跟蹤數據模式隨時間的變化.選項(“cloudFiles.schemaLocation”,“< path_to_checkpoint >”).負載(“< path_to_source_data >”).writeStream.選項(“checkpointLocation”,“< path_to_checkpoint >”).開始(“< path_to_target”))
火花.readStream.格式(“cloudFiles”).選項(“cloudFiles.format”,“鋪”)//模式位置目錄跟蹤你的數據模式.選項(“cloudFiles.schemaLocation”,“< path_to_checkpoint >”).負載(“< path_to_source_data >”).writeStream.選項(“checkpointLocation”,“< path_to_checkpoint >”).開始(“< path_to_target”)
Auto Loader模式推斷如何工作?
為了在第一次讀取數據時推斷模式,Auto Loader對它發現的前50 GB或1000個文件進行采樣,以先超過限製為準。Auto Loader將模式信息存儲在一個目錄中_schemas
在配置cloudfFiles.schemaLocation
隨著時間的推移跟蹤輸入數據的模式更改。
請注意
要更改使用的示例的大小,您可以設置SQL配置:
火花.磚.cloudFiles.schemaInference.sampleSize.numBytes
例如,字節字符串10 gb
)
而且
火花.磚.cloudFiles.schemaInference.sampleSize.numFiles
(整數)
默認情況下,Auto Loader模式推斷試圖避免由於類型不匹配而導致的模式演變問題。對於不編碼數據類型的格式(JSON和CSV),自動加載器將所有列推斷為字符串(包括JSON文件中的嵌套字段)。對於具有類型化模式的格式(Parquet和Avro), Auto Loader對文件的子集進行采樣,並合並單個文件的模式。這種行為總結在下表中:
文件格式 |
默認推斷數據類型 |
---|---|
|
字符串 |
|
字符串 |
|
Avro模式中編碼的類型 |
|
Parquet模式中編碼的類型 |
Apache Spark dataframerader使用不同的行為進行模式推斷,根據示例數據為JSON和CSV源中的列選擇數據類型。要使用Auto Loader啟用此行為,請設置該選項cloudFiles.inferColumnTypes
來真正的
.
請注意
當推斷CSV數據的模式時,自動加載器假設文件包含頭文件。如果您的CSV文件不包含標題,請提供該選項.option(“頭”,“假”)
.此外,Auto Loader合並示例中所有文件的模式,以生成一個全局模式。Auto Loader可以根據文件頭讀取每個文件,並正確解析CSV。
請注意
當一個列在兩個Parquet文件中具有不同的數據類型時,Auto Loader將嚐試向上的
從一種到另一種。如果無法向上轉換,則數據推斷失敗。示例見下表:
1型 |
2型 |
向上的類型 |
---|---|---|
|
|
|
|
|
|
|
|
推理失敗 |
在推理時合並數據類型之後,包含未選擇類型記錄的文件將加載到獲救數據列,因為數據類型不同於推斷的模式。
Auto Loader模式演化如何工作?
Auto Loader在處理數據時檢測添加的新列。當自動加載器檢測到一個新列時,流以UnknownFieldException
.在流拋出此錯誤之前,Auto Loader會對最新的微批數據執行模式推斷,並通過將新列合並到模式的末尾,使用最新的模式更新模式位置。現有列的數據類型保持不變。
Databricks建議配置自動加載流工作流在這種模式更改後自動重新啟動。
Auto Loader支持在選項中設置的模式演變的以下模式cloudFiles.schemaEvolutionMode
:
模式 |
讀取新列時的行為 |
---|---|
|
流失敗。新列被添加到模式中。現有列不演進數據類型。 |
|
模式永遠不會演進,流也不會因為模式更改而失敗。中記錄所有新列獲救數據列. |
|
流失敗。除非更新了提供的模式,或者刪除了有問題的數據文件,否則流不會重新啟動。 |
|
不進化模式,忽略新列,並且不保存數據,除非 |
分區如何與自動加載器工作?
如果數據以Hive風格分區布局,Auto Loader嚐試從數據的底層目錄結構推斷分區列。例如,文件路徑base_path /事件= = 2021-04-01 / f0.json點擊/日期
的推論結果日期
而且事件
作為分區列。如果底層目錄結構包含衝突的Hive分區或不包含Hive風格的分區,則忽略分區列。
二進製文件(binaryFile
),文本
文件格式具有固定的數據模式,但支持分區列推斷。Databricks推薦設置cloudFiles.schemaLocation
對於這些文件格式。這避免了任何潛在的錯誤或信息丟失,並防止每次自動加載程序開始時分區列的推斷。
分區列不考慮模式演變。如果你有一個初始目錄結構base_path /事件= = 2021-04-01 / f0.json點擊/日期
,然後開始接收新文件base_path /事件=點擊/日期= 1 / f1.json = 2021-04-01 /小時
, Auto Loader忽略小時列。若要捕獲新分區列的信息,請設置cloudFiles.partitionColumns
來事件、日期、小時
.
請注意
的選項cloudFiles.partitionColumns
獲取以逗號分隔的列名列表。僅為存在的列鍵=值
目錄結構中的對將被解析。
獲救的數據列是什麼?
當Auto Loader推斷出模式時,一個已保存的數據列將自動添加到您的模式為_rescued_data
.您可以重命名列,或者在提供模式的情況下通過設置該選項將其包括在內rescuedDataColumn
.
獲救的數據列確保與模式不匹配的列被獲救,而不是被刪除。獲救的數據列包含由於以下原因未被解析的任何數據:
模式中缺少該列。
類型不匹配。
不匹配。
獲救的數據列包含一個JSON,其中包含獲救的列和記錄的源文件路徑。
請注意
JSON和CSV解析器在解析記錄時支持三種模式:寬容的
,DROPMALFORMED
,FAILFAST
.當與rescuedDataColumn
,數據類型不匹配不會導致記錄被刪除DROPMALFORMED
模式或拋出錯誤FAILFAST
模式。隻有損壞的記錄才會被丟棄或拋出錯誤,例如不完整或格式錯誤的JSON或CSV。如果你使用badRecordsPath
,在解析JSON或CSV時,數據類型不匹配不被視為壞記錄rescuedDataColumn
.僅存儲不完整和格式不正確的JSON或CSV記錄badRecordsPath
.
改變區分大小寫的行為
除非啟用區分大小寫,否則列美國廣播公司
,美國廣播公司
,美國廣播公司
出於模式推斷的目的,將它們視為同一列。所選擇的情況是任意的,取決於采樣數據。你可以使用模式提示來執行應該使用哪種情況。一旦做出選擇並推斷出模式,Auto Loader就不會考慮與模式不一致的套管變體。
當獲救數據列啟用時,以模式大小寫以外的大小寫命名的字段將加載到_rescued_data
列。通過設置該選項來改變這種行為readerCaseSensitive
為false,在這種情況下,Auto Loader以不區分大小寫的方式讀取數據。
用模式提示覆蓋模式推斷
您可以使用模式提示在推斷的模式上強製使用您所知道和期望的模式信息。當您知道一個列是特定的數據類型,或者如果您想選擇更通用的數據類型(例如,a雙
而不是整數
),您可以使用SQL模式規範語法為列數據類型提供任意數量的提示作為字符串,例如:
.選項(“cloudFiles.schemaHints”,"標簽映射<字符串,字符串>,版本int")
請參閱有關數據類型獲取所支持的數據類型列表。
如果某個列不在流的開始處,還可以使用模式提示將該列添加到推斷的模式中。
下麵是一個推斷模式的示例,用於查看帶有模式提示的行為。
推斷模式:
|——date: string |——quantity: int |——user_info: struct | |——id: string | |——name: string | |——dob: string |——purchase_options: struct | |——delivery_address: string
通過指定以下模式提示:
.選項(“cloudFiles.schemaHints”,date日期,user_info。MAP, time TIMESTAMP" )
你會得到:
|——date: string -> date |——quantity: int |——user_info: struct | |——id: string | |——name: string | b|——dob: string -> date |——purchase_options: struct -> map |——time: timestamp
請注意
數組和映射模式提示支持Databricks Runtime 9.1 LTS及以上。
下麵是一個具有複雜數據類型的推斷模式示例,以查看使用模式提示的行為。
推斷模式:
|——products: array |——locations: array |——users: array | |——users。元素:struct | | |——id: string | | |——name: string | | b|——dob: string |——ids: map |——names: map |——prices: map |——折扣:map | |——折扣。鍵:struct | | |——id: string | |——折扣。值:字符串|——描述:map<字符串,struct> | |——描述。關鍵字:字符串| |—描述。值:struct | | |——內容:int
通過指定以下模式提示:
.選項(“cloudFiles.schemaHints”,"products ARRAY, location . "元素字符串,users.element.id INT, id MAP,名稱。關鍵INT,價格。key.id INT, description .value.content STRING" )
你會得到:
|——products: array -> array |——locations: array -> array |——users: array | |——users。元素:struct | b| |——id: string -> int | | |——name: string | | |——dob: string |——ids: map -> map |——names: map -> map |——prices: map -> map |——折扣:map | |——折扣。關鍵字:struct | | |——id: string -> int | |——折扣。值:字符串|——描述:map<字符串,struct> | |——描述。關鍵字:字符串| |—描述。取值:struct | | |——content: int ->字符串
請注意
模式提示僅在以下情況下使用不為自動加載器提供一個模式。您可以使用模式提示是否cloudFiles.inferColumnTypes
啟用或禁用。