表流讀取和寫入
三角洲湖深入結合火花結構化流通過readStream
和writeStream
。三角洲湖克服了許多限製通常與流媒體係統和相關文件,包括:
合並小文件產生的低延遲攝取
維護“隻有一次”的處理與不止一個流(或並發批處理作業)
有效地發現哪些文件是新當使用作為一個流的源文件
δ表來源
當你加載一個三角洲表作為流源和流查詢中使用它,查詢流程表中所有的數據以及任何新到達的數據流後開始。
你可以加載路徑和表作為一個流。
火花。readStream。格式(“δ”)。負載(“/ tmp /δ/事件”)進口io。δ。值得一提的。_火花。readStream。δ(“/ tmp /δ/事件”)
或
進口io。δ。值得一提的。_火花。readStream。格式(“δ”)。表(“事件”)
重要的
如果δ的表的模式改變流讀取後開始對表,查詢失敗。對於大多數模式變化,您可以重新啟動流解決模式不匹配並繼續進行處理。
你不能流從三角洲表啟用了列映射,經曆了非附加模式演化等重命名或刪除列。
限製輸入的速度
以下選項可以控製micro-batches:
maxFilesPerTrigger
:有多少新文件被認為是在每個micro-batch。默認值是1000。maxBytesPerTrigger
在每個micro-batch:多少數據被處理。這個選項設置一個“軟馬克斯”,這意味著一個批處理過程大約這個過程的數據量,可能超過極限為了使流查詢前進情況下的最小輸入單位超過這個極限。這不是默認設置。
如果你使用maxBytesPerTrigger
結合maxFilesPerTrigger
,micro-batch過程數據,直到maxFilesPerTrigger
或maxBytesPerTrigger
達到極限。
請注意
當源表事務是清理由於logRetentionDuration
配置在處理流滯後,三角洲湖處理數據對應的最新交易曆史源表流但不失敗。這可能導致數據被刪除。
河三角洲湖變化數據捕獲(CDC)飼料
三角洲湖改變數據提要三角洲的記錄更改表,包括更新和刪除。啟用時,你可以從改變流數據飼料和編寫邏輯處理插入、更新和刪除操作到下遊的表中。盡管變化數據提要數據輸出三角洲表描述略有不同,這提供了一種解決方案宣傳下遊表的增量變化大獎章架構。
重要的
你不能流變化數據提要的δ表和列映射使經曆了非附加模式演化等重命名或刪除列。
忽略更新和刪除
結構化流不處理不是一個附加的輸入,將拋出一個異常如果發生任何修改在桌子上被用作源。有兩個主要的策略來處理變化,不能自動下遊傳播:
您可以刪除輸出從一開始就和檢查點和重啟流。
你可以設置這兩個選擇:
ignoreDeletes
:忽略事務刪除數據分區的邊界。skipChangeCommits
:忽略事務,刪除或修改現有的記錄。skipChangeCommits
包容ignoreDeletes
。
請注意
在磚運行時的12.1及以上,skipChangeCommits
不讚成之前的設置ignoreChanges
。在磚運行時的12.0和低,ignoreChanges
是唯一支持的選項。
的語義ignoreChanges
有很大的不同的skipChangeCommits
。與ignoreChanges
啟用,改寫源表中的數據文件後重新發出的數據更改操作等更新
,合並成
,刪除
(分區),或覆蓋
。不變行往往發出與新行,所以下遊消費者必須能夠處理重複。刪除不是下遊傳播。ignoreChanges
包容ignoreDeletes
。
skipChangeCommits
完全無視文件更改操作。數據文件中重寫的源表數據更改操作等更新
,合並成
,刪除
,覆蓋
完全被忽略。為了反映上遊源表的變化,你必須實現單獨的邏輯來傳播這些變化。
例子
例如,假設您有一個表user_events
與日期
,user_email
,行動
分區的列日期
。你流的user_events
你需要刪除數據表由於GDPR。
當你在分區邊界(即刪除在哪裏
分區列),文件已經分割的價值所以刪除掉這些文件的元數據。因此,如果你有刪除整個分區的數據,您可以使用以下:
火花。readStream。格式(“δ”)。選項(“ignoreDeletes”,“真正的”)。負載(“/ tmp /δ/ user_events”)
但是,如果你必須在多個分區刪除數據(在這個例子中,過濾user_email
),那麼您必須使用下麵的語法:
火花。readStream。格式(“δ”)。選項(“ignoreChanges”,“真正的”)。負載(“/ tmp /δ/ user_events”)
如果你更新user_email
與更新
聲明中包含的文件user_email
在的問題是重寫。當你使用ignoreChanges
,新記錄與所有其他不變向下遊傳播記錄在同一個文件中。你的邏輯應該能夠處理這些輸入重複的記錄。
指定初始位置
您可以使用以下選項指定的起點三角洲湖流源沒有處理整個表。
startingVersion
:從三角洲湖版本。所有表變化從這個版本(包容)將讀取流源。你可以獲取提交的版本版本
列的描述曆史命令的輸出。在磚運行時的7.4及以上,隻返回最新的變化,指定
最新的
。startingTimestamp
:從時間戳。所有表更改後承諾或時間戳(包容)將讀取流源。之一:時間戳字符串。例如,
“2019 - 01 - 01 t00:00:00.000z”
。一個日期字符串。例如,
“2019-01-01”
。
你不能同時設置兩個選項;你可以隻使用其中的一個。他們隻有當開始一個新的流媒體查詢生效。如果一個流媒體查詢已經開始和檢查點的進展記錄,這些選項將被忽略。
重要的
雖然您可以啟動流源從一個指定的版本或時間戳、流媒體來源的模式總是最新的三角洲表的模式。你必須確保沒有不兼容模式改變δ表後指定的版本或時間戳。否則,流源讀取數據時可能會返回不正確的結果,錯誤的模式。
沒有數據被刪除過程初始快照
請注意
這個特性可以在磚運行時11.1及以上。這個特性是在公共預覽。
使用增量表作為流源時,數據的查詢過程首先出現在桌子上。三角洲表在這個版本被稱為初始快照。默認情況下,三角洲表的數據文件處理基於文件最後修改。然而,最後修改時間並不一定代表記錄事件的時間順序。
在有狀態流查詢定義水印,處理文件的修改時間會導致記錄處理錯誤的訂單。這可能導致晚期事件的記錄刪除水印。
你可以避免數據下降問題通過支持下列選項:
withEventTimeOrder:初始快照是否應該處理事件的時間順序。
啟用事件時間順序後,初始快照數據的事件時間範圍分為桶。每個微一桶的批處理流程過濾數據的時間範圍內。maxFilesPerTrigger和maxBytesPerTrigger配置選項仍適用於控製microbatch大小,但隻在一個近似的方法由於處理的本質。
下圖顯示了這個過程:
明顯的這一特性的信息:
數據下降問題隻有當初始增量快照的狀態在默認順序流查詢處理。
你不能改變
withEventTimeOrder
一旦流查詢開始時初始快照還是正在處理。重新啟動,withEventTimeOrder
改變,你需要刪除檢查點。如果您正在運行一個啟用了withEventTimeOrder流查詢,你不能降級到DBR版本不支持這個功能,直到初始快照處理完成。如果你需要降級,你可以等待初始快照完成,查詢或刪除檢查點和重啟。
不支持這個功能在以下常見場景:
事件時間列是一個生成的列有non-projectionδ源和水印之間的轉換。
有一個水印,不止一個δ源在流查詢。
啟用事件時間順序後,三角洲初始快照處理的性能可能較慢。
每一批微掃描初始快照來過濾數據對應的事件時間範圍內。加快過濾操作,建議使用一個δ源列事件時間,以便數據不可以應用(檢查數據不與z順序索引三角洲湖在適當的時候)。另外,表分區沿著事件時間列可以進一步加快處理。您可以檢查火花UI,看看有多少δ特定微批處理文件掃描。
三角洲表作為一個水槽
您還可以使用結構化數據寫入三角洲表流。事務日誌可以讓三角洲湖保證隻有一次處理,即使還有其他流對表或批量查詢並發運行的情況。
請注意
三角洲湖真空
函數刪除所有文件不是由三角洲湖但跳過任何目錄開始_
。您可以安全地存儲檢查點與其他數據和元數據表使用一個目錄結構如δ< table_name > / _checkpoints
。
指標
請注意
在磚運行時8.1及以上。
你可以找到的字節數和文件數量有待處理流媒體查詢流程隨著numBytesOutstanding
和numFilesOutstanding
指標。額外的指標包括:
numNewListedFiles
:列出的三角洲湖文件數量來計算這批的積壓。backlogEndOffset
:表版本用來計算積壓。
如果您正在運行流在一個筆記本,你可以看到這些度量標準下原始數據流查詢進展儀表板選項卡:
{“源”:({“描述”:“DeltaSource(文件/道路/ /源):“,“指標”:{“numBytesOutstanding”:“3456”,“numFilesOutstanding”:“8”},}]}
Append模式
默認情況下,流在附加模式下運行,向表添加新記錄。
您可以使用路徑的方法:
(事件。writeStream。格式(“δ”)。outputMode(“添加”)。選項(“checkpointLocation”,“/ tmp /δ/ _checkpoints /”)。開始(“/δ/事件”))
事件。writeStream。格式(“δ”)。outputMode(“添加”)。選項(“checkpointLocation”,“/ tmp /δ/事件/ _checkpoints /”)。開始(“/ tmp /δ/事件”)
或者是toTable
方法在火花3.1和更高版本(磚運行時的8.3及以上),如下所示。(在火花之前版本3.1(磚運行時8.2及以下),使用表
方法。)
(事件。writeStream。格式(“δ”)。outputMode(“添加”)。選項(“checkpointLocation”,“/ tmp /δ/事件/ _checkpoints /”)。toTable(“事件”))
事件。writeStream。outputMode(“添加”)。選項(“checkpointLocation”,“/ tmp /δ/事件/ _checkpoints /”)。toTable(“事件”)
完整的模式
您還可以使用結構化流與每一批替換整個表。用例的一個例子是使用聚合計算一個總結:
(火花。readStream。格式(“δ”)。負載(“/ tmp /δ/事件”)。groupBy(“customerId”)。數()。writeStream。格式(“δ”)。outputMode(“完整的”)。選項(“checkpointLocation”,“/ tmp /δ/ eventsByCustomer / _checkpoints /”)。開始(“/ tmp /δ/ eventsByCustomer”))
火花。readStream。格式(“δ”)。負載(“/ tmp /δ/事件”)。groupBy(“customerId”)。數()。writeStream。格式(“δ”)。outputMode(“完整的”)。選項(“checkpointLocation”,“/ tmp /δ/ eventsByCustomer / _checkpoints /”)。開始(“/ tmp /δ/ eventsByCustomer”)
前麵的例子不斷更新一個表,其中包含客戶總數量的事件。
更寬鬆的延遲需求的應用程序,你可以節省計算資源與一次性觸發器。使用這些更新總結聚合表在給定的時間表,隻處理新數據到達自上次更新。
冪等表中寫道foreachBatch
請注意
在磚運行時8.4及以上。
命令foreachBatch允許您指定一個函數執行後的輸出每個micro-batch流查詢任意轉換。這使得作為補充foreachBatch
函數可以寫micro-batch輸出到一個或多個目標三角洲表目的地。然而,foreachBatch
不會讓那些寫冪等作為寫嚐試缺乏信息是否重新執行批處理。例如,重新運行失敗的批處理可能導致重複數據寫道。
為了解決這個問題,δ表支持以下DataFrameWriter
的選項,使得冪等寫道:
txnAppId
:一個獨一無二的字符串,您可以通過在每個DataFrame
寫。例如,您可以使用StreamingQuery IDtxnAppId
。txnVersion
:一個單調遞增數字作為事務的版本。
δ表使用的組合txnAppId
和txnVersion
識別重複的寫和忽略它們。
如果一批寫中斷故障,運行批處理使用相同的應用程序和批處理ID,這將有助於正確運行時識別重複的寫和忽略它們。應用程序ID (txnAppId
)可以是任何用戶生成唯一的字符串,不需要流相關ID。
警告
如果你刪除流檢查點和重啟查詢新的關卡,你必須提供一個不同的appId
;否則,寫的重新啟動查詢將被忽略,因為它將包含相同的txnAppId
和批處理ID將從0開始。
相同的DataFrameWriter
選項可以用來實現在非冪等寫工作。有關詳細信息,使冪等寫在工作。
例子
app_id=…#一個獨一無二的字符串用作應用程序ID。defwriteToDeltaLakeTableIdempotent(batch_df,batch_id):batch_df。寫。格式(…)。選項(“txnVersion”,batch_id)。選項(“txnAppId”,app_id)。保存(…)#位置1batch_df。寫。格式(…)。選項(“txnVersion”,batch_id)。選項(“txnAppId”,app_id)。保存(…)#位置2
瓦爾appId=…/ /一個獨一無二的字符串用作應用程序ID。streamingDF。writeStream。foreachBatch{(batchDF:DataFrame,batchId:長)= >batchDF。寫。格式(…)。選項(“txnVersion”,batchId)。選項(“txnAppId”,appId)。保存(…)/ /位置1batchDF。寫。格式(…)。選項(“txnVersion”,batchId)。選項(“txnAppId”,appId)。保存(…)/ /位置2}
執行stream-static連接
你可以依賴於事務擔保和三角洲湖執行版本控製協議stream-static連接。stream-static加入加入最新的有效版本增量表(靜態數據)的數據流使用無狀態連接。
當磚過程數據的micro-batch stream-static加入,從靜態三角洲的最新有效版本數據表連接的記錄出現在當前micro-batch。因為連接是無狀態的,你不需要配置水印和可以處理結果與低延遲。靜態三角洲表中的數據使用的加入應該不常更改。
streamingDF=火花。readStream。表(“訂單”)staticDF=火花。讀。表(“beplay体育app下载地址顧客”)查詢=(streamingDF。加入(staticDF,streamingDF。customer_id= =staticDF。id,“內心”)。writeStream。選項(“checkpointLocation”,checkpoint_path)。表(“orders_with_customer_info”))
插入從流媒體查詢使用foreachBatch
您可以使用的組合合並
和foreachBatch
寫複雜的從流媒體查詢插入到三角洲表。看到使用foreachBatch編寫任意數據彙。
這種模式有許多應用程序,包括以下幾點:
寫流聚集在更新模式:這是更有效的比完整的模式。
數據庫更改流寫入一個增量表:合並查詢寫更改數據可以用在
foreachBatch
不斷應用流變化δ表。數據流寫入三角洲與重複數據刪除表:純插入合並查詢重複數據刪除可以用在
foreachBatch
不斷寫入數據(副本)三角洲表自動重複數據刪除。
請注意
確保你的
合並
聲明內foreachBatch
是冪等的重啟流查詢可以應用相同的一批數據上的操作很多次了。當
合並
被用在foreachBatch
,輸入數據流查詢(通過報道StreamingQueryProgress
和可見的筆記本率圖)可能被報道為多個實際的速率生成的數據來源。這是因為合並
讀取輸入數據多次導致輸入指標成倍增加。如果這是一個瓶頸,您可以緩存批DataFrame之前合並
然後uncache之後合並
。
下麵的例子演示了如何使用SQLforeachBatch
完成這項任務:
/ /函數upsert microBatchOutputDF使用合並到三角洲表defupsertToDelta(microBatchOutputDF:DataFrame,batchId:長){/ /設置dataframe視圖名稱microBatchOutputDF。createOrReplaceTempView(“更新”)/ /使用視圖名稱申請合並/ /注意:必須使用的SparkSession用於定義dataframe“更新”microBatchOutputDF。sparkSession。sql(”“”合並成骨料t使用更新的年代在年代。關鍵= t.key當匹配更新設置*當不匹配插入*”“”)}/ /寫的輸出流聚合查詢到三角洲表streamingAggregatesDF。writeStream。格式(“δ”)。foreachBatch(upsertToDelta_)。outputMode(“更新”)。開始()
#使用合並函數向三角洲upsert microBatchOutputDF表defupsertToDelta(microBatchOutputDF,batchId):#設置dataframe視圖名稱microBatchOutputDF。createOrReplaceTempView(“更新”)#使用視圖名稱申請合並#注意:必須使用的SparkSession用於定義dataframe“更新”#在磚運行時的10.5和下麵,你必須使用以下:# microBatchOutputDF._jdf.sparkSession () . sql (“””microBatchOutputDF。sparkSession。sql(”“”合並成骨料t使用更新的年代在年代。關鍵= t.key當匹配更新設置*當不匹配插入*”“”)#流聚合查詢的輸出寫入δ表(streamingAggregatesDF。writeStream。格式(“δ”)。foreachBatch(upsertToDelta)。outputMode(“更新”)。開始())
您還可以選擇使用三角洲湖api來執行流插入,如以下示例:
進口io。δ。表。*瓦爾deltaTable=DeltaTable。forPath(火花,“/數據/聚合物”)/ /函數upsert microBatchOutputDF使用合並到三角洲表defupsertToDelta(microBatchOutputDF:DataFrame,batchId:長){deltaTable。作為(“t”)。合並(microBatchOutputDF。作為(“s”),”年代。關鍵= t.key”)。whenMatched()。updateAll()。whenNotMatched()。insertAll()。執行()}/ /寫的輸出流聚合查詢到三角洲表streamingAggregatesDF。writeStream。格式(“δ”)。foreachBatch(upsertToDelta_)。outputMode(“更新”)。開始()
從delta.tables進口*deltaTable=DeltaTable。forPath(火花,“/數據/聚合物”)#使用合並函數向三角洲upsert microBatchOutputDF表defupsertToDelta(microBatchOutputDF,batchId):(deltaTable。別名(“t”)。合並(microBatchOutputDF。別名(“s”),”年代。關鍵= t.key”)。whenMatchedUpdateAll()。whenNotMatchedInsertAll()。執行())#流聚合查詢的輸出寫入δ表(streamingAggregatesDF。writeStream。格式(“δ”)。foreachBatch(upsertToDelta)。outputMode(“更新”)。開始())