常見的數據加載模式
自動加載程序簡化了大量的公共數據攝入的任務。這個快速參考提供了幾個例子流行的模式。
過濾使用一滴目錄或文件模式
水珠模式可用於過濾目錄和文件時提供的路徑。
模式 |
描述 |
---|---|
|
匹配任何單個的字符 |
|
匹配零個或多個字符 |
|
匹配一個字符的字符集{a, b, c}。 |
|
匹配一個字符的字符範圍{…z}。 |
|
匹配一個字符不是字符集或距離{}。請注意, |
|
匹配字符串的字符串集{ab、cd}。 |
|
匹配字符串的字符串集{ab, cde, cfh}。 |
使用路徑
提供前綴模式,例如:
df=火花。readStream。格式(“cloudFiles”)\。選項(“cloudFiles.format”,<格式>)\。模式(模式)\。負載(“< base_path > / * /文件”)
瓦爾df=火花。readStream。格式(“cloudFiles”)。選項(“cloudFiles.format”,<格式>)。模式(模式)。負載(“< base_path > / * /文件”)
重要的
您需要使用選擇pathGlobFilter
顯式地提供後綴模式。的路徑
隻提供一個前綴過濾器。
例如,如果您想隻解析png
一個目錄中的文件,其中包含文件與不同的後綴,你能做什麼:
df=火花。readStream。格式(“cloudFiles”)\。選項(“cloudFiles.format”,“binaryFile”)\。選項(“pathGlobfilter”,“* . png”)\。負載(<base_path>)
瓦爾df=火花。readStream。格式(“cloudFiles”)。選項(“cloudFiles.format”,“binaryFile”)。選項(“pathGlobfilter”,“* . png”)。負載(<base_path>)
請注意
默認的globbing自動加載程序的行為是不同的比其他的默認行為引發文件來源。添加.option (“useStrictGlobber”,“真正的”)
你閱讀使用通配符匹配違約引發行為對文件來源。更多關於globbing見下表:
模式 |
文件路徑 |
默認的水珠 |
嚴格的水珠 |
---|---|---|---|
/ / * / c / |
/ a / b / c / file.txt |
是的 |
是的 |
/ / * / c / |
/ a / b / c / d / file.txt |
是的 |
是的 |
/ / * / c / |
/ a / b / x / y / c / file.txt |
是的 |
沒有 |
/ / * / c |
/ a / b / c_file.txt |
是的 |
是的 |
/ / * / c / |
/ a / b / c_file.txt |
是的 |
沒有 |
/ / * / c / |
/ / * /餅幹/ file.txt |
是的 |
沒有 |
使簡單的ETL
一個簡單的方法來讓你的數據到三角洲湖而不會丟失任何數據是使用以下模式和自動加載程序啟用模式推理。磚建議運行下麵的代碼在一個磚工作時自動重啟流源數據的模式變化。默認情況下,模式推斷為字符串類型,任何解析錯誤(應該沒有如果一切仍然作為一個字符串)將去_rescued_data
會失敗,任何新列流和發展模式。
火花。readStream。格式(“cloudFiles”)\。選項(“cloudFiles.format”,“json”)\。選項(“cloudFiles.schemaLocation”,“< path_to_schema_location >”)\。負載(“< path_to_source_data >”)\。writeStream\。選項(“mergeSchema”,“真正的”)\。選項(“checkpointLocation”,“< path_to_checkpoint >”)\。開始(“< path_to_target”)
火花。readStream。格式(“cloudFiles”)。選項(“cloudFiles.format”,“json”)。選項(“cloudFiles.schemaLocation”,“< path_to_schema_location >”)。負載(“< path_to_source_data >”)。writeStream。選項(“mergeSchema”,“真正的”)。選項(“checkpointLocation”,“< path_to_checkpoint >”)。開始(“< path_to_target”)
防止數據丟失結構良好的數據
當你知道你的模式,但想知道每當你收到意想不到的數據,數據磚推薦使用rescuedDataColumn
。
火花。readStream。格式(“cloudFiles”)\。模式(expected_schema)\。選項(“cloudFiles.format”,“json”)\#將收集所有新領域以及在_rescued_data數據類型不匹配。選項(“cloudFiles.schemaEvolutionMode”,“營救”)\。負載(“< path_to_source_data >”)\。writeStream\。選項(“checkpointLocation”,“< path_to_checkpoint >”)\。開始(“< path_to_target”)
火花。readStream。格式(“cloudFiles”)。模式(expected_schema)。選項(“cloudFiles.format”,“json”)/ /將收集所有新領域以及在_rescued_data數據類型不匹配。選項(“cloudFiles.schemaEvolutionMode”,“營救”)。負載(“< path_to_source_data >”)。writeStream。選項(“checkpointLocation”,“< path_to_checkpoint >”)。開始(“< path_to_target”)
如果你想讓你的流停止處理的新領域介紹不匹配你的模式,您可以添加:
。選項(“cloudFiles.schemaEvolutionMode”,“failOnNewColumns”)
支持靈活的半結構化數據管道
當你接收數據從一個供應商介紹新列他們提供的信息,你可能不知道什麼時候,或者你可能沒有足夠的帶寬來更新你的數據管道。您現在可以利用模式演化重啟流,讓汽車自動加載程序更新的模式。您還可以利用schemaHints
一些“無模式”的字段,供應商可以提供。
火花。readStream。格式(“cloudFiles”)\。選項(“cloudFiles.format”,“json”)\#將確保標題列得到加工作為一個地圖。選項(“cloudFiles.schemaHints”,“頭map < string, string >, statusCode短”)\。負載(“/ api /請求”)\。writeStream\。選項(“mergeSchema”,“真正的”)\。選項(“checkpointLocation”,“< path_to_checkpoint >”)\。開始(“< path_to_target”)
火花。readStream。格式(“cloudFiles”)。選項(“cloudFiles.format”,“json”)/ /將確保標題列得到加工作為一個地圖。選項(“cloudFiles.schemaHints”,“頭map < string, string >, statusCode短”)。負載(“/ api /請求”)。writeStream。選項(“mergeSchema”,“真正的”)。選項(“checkpointLocation”,“< path_to_checkpoint >”)。開始(“< path_to_target”)
嵌套的JSON數據轉換
因為汽車裝載機推斷頂級JSON作為字符串列,你可以留下嵌套的JSON對象,需要進一步轉換。您可以使用半結構化數據訪問api進一步變換複雜JSON內容。
火花。readStream。格式(“cloudFiles”)\。選項(“cloudFiles.format”,“json”)\#模式位置目錄跟蹤你的數據模式。選項(“cloudFiles.schemaLocation”,“< path_to_checkpoint >”)\。負載(“< source_data_with_nested_json >”)\。selectExpr(“*”,“標簽:page.name”,#提取{“標簽”:{“頁麵”:{“名稱”:…}}}“標簽:page.id:: int”,#提取{“標簽”:{"頁麵":{" id ":…}}},int轉換“標簽:eventType”#提取{“標簽”:{“eventType”:…}})
火花。readStream。格式(“cloudFiles”)。選項(“cloudFiles.format”,“json”)/ /數據的模式位置目錄跟蹤模式。選項(“cloudFiles.schemaLocation”,“< path_to_checkpoint >”)。負載(“< source_data_with_nested_json >”)。selectExpr(“*”,“標簽:page.name”,/ /提取{“標簽”:{“頁麵”:{“名稱”:…}}}“標簽:page.id:: int”,/ /提取{“標簽”:{"頁麵":{" id ":…}}},int轉換“標簽:eventType”/ /提取{“標簽”:{“eventType”:…}})
推斷出嵌套的JSON數據
當你有嵌套的數據,您可以使用cloudFiles.inferColumnTypes
可以推斷出你的數據和其他列的嵌套結構類型。
火花。readStream。格式(“cloudFiles”)\。選項(“cloudFiles.format”,“json”)\#模式位置目錄跟蹤你的數據模式。選項(“cloudFiles.schemaLocation”,“< path_to_checkpoint >”)\。選項(“cloudFiles.inferColumnTypes”,“真正的”)\。負載(“< source_data_with_nested_json >”)
火花。readStream。格式(“cloudFiles”)。選項(“cloudFiles.format”,“json”)/ /數據的模式位置目錄跟蹤模式。選項(“cloudFiles.schemaLocation”,“< path_to_checkpoint >”)。選項(“cloudFiles.inferColumnTypes”,“真正的”)。負載(“< source_data_with_nested_json >”)
沒有頭的負荷CSV文件
df=火花。readStream。格式(“cloudFiles”)\。選項(“cloudFiles.format”,“csv”)\。選項(“rescuedDataColumn”,“_rescued_data”)\#確保你不會丟失數據。模式(<模式>)\#在這裏提供一個模式文件。負載(<路徑>)
瓦爾df=火花。readStream。格式(“cloudFiles”)。選項(“cloudFiles.format”,“csv”)。選項(“rescuedDataColumn”,“_rescued_data”)/ /確保你不會丟失數據。模式(<模式>)/ /在這裏提供一個模式文件。負載(<路徑>)
執行一個模式在CSV文件頭
df=火花。readStream。格式(“cloudFiles”)\。選項(“cloudFiles.format”,“csv”)\。選項(“頭”,“真正的”)\。選項(“rescuedDataColumn”,“_rescued_data”)\#確保你不會丟失數據。模式(<模式>)\#在這裏提供一個模式文件。負載(<路徑>)
瓦爾df=火花。readStream。格式(“cloudFiles”)。選項(“cloudFiles.format”,“csv”)。選項(“頭”,“真正的”)。選項(“rescuedDataColumn”,“_rescued_data”)/ /確保你不會丟失數據。模式(<模式>)/ /在這裏提供一個模式文件。負載(<路徑>)
攝取圖像或二進製數據為毫升三角洲湖
一旦數據存儲在三角洲湖,您可以運行分布式推理的數據。看到使用熊貓UDF執行分布式推理。
火花。readStream。格式(“cloudFiles”)\。選項(“cloudFiles.format”,“binaryFile”)\。負載(“< path_to_source_data >”)\。writeStream\。選項(“checkpointLocation”,“< path_to_checkpoint >”)\。開始(“< path_to_target”)
火花。readStream。格式(“cloudFiles”)。選項(“cloudFiles.format”,“binaryFile”)。負載(“< path_to_source_data >”)。writeStream。選項(“checkpointLocation”,“< path_to_checkpoint >”)。開始(“< path_to_target”)