使用foreachBatch編寫任意數據彙
結構化流api提供兩種方式查詢的輸出流寫入數據源沒有現有的流水槽:foreachBatch ()
和foreach ()
。
重用現有的批處理數據源foreachBatch ()
streamingDF.writeStream.foreachBatch (…)
允許您指定一個函數執行的輸出數據流的每個micro-batch查詢。它接受兩個參數:一個DataFrame或數據集的輸出數據micro-batch micro-batch的惟一ID。與foreachBatch
,您可以:
重用現有的批處理的數據源
對於許多存儲係統來說,可能沒有一個流水槽可用,但可能已經存在一個數據作家對於批處理查詢。使用foreachBatch ()
,您可以使用批處理數據作家每個micro-batch的輸出。下麵是一些例子:
許多其他的批處理數據源可以使用從foreachBatch ()
。
寫入多個位置
如果你想查詢的輸出流寫入多個位置,那麼您可以簡單地編寫多次輸出DataFrame /數據集。然而,每個試圖寫會導致輸出數據重新計算(包括可能的重讀輸入數據)。為了避免重新計算,你應該緩存輸出DataFrame /數據集,寫多個位置,然後uncache它。這是一個大綱。
streamingDF。writeStream。foreachBatch{(batchDF:DataFrame,batchId:長)= >batchDF。堅持()batchDF。寫。格式(…)。保存(…)/ /位置1batchDF。寫。格式(…)。保存(…)/ /位置2batchDF。unpersist()}
請注意
如果你運行多個引發工作batchDF
,輸入數據流查詢(通過報道StreamingQueryProgress
和可見的筆記本率圖)可能被報道為多個實際的速率生成的數據來源。這是由於輸入數據可能在多個讀多次引發每批工作。
申請額外DataFrame操作
許多DataFrame和數據集操作不支持流媒體DataFrames因為火花不支持生成增量計劃在這些情況下。使用foreachBatch ()
你可以把這些操作在每個micro-batch輸出。例如,您可以使用foreachBath ()
和SQL合並成
寫操作的輸出流聚合成三角洲表更新模式。看到更多的細節合並成。
重要的
foreachBatch ()
隻提供“至少一次”寫擔保。然而,您可以使用batchId
提供的函數作為方法刪除處理,得到一個僅一次保證的輸出。在這兩種情況下,你必須思考自己端到端語義。foreachBatch ()
不工作的嗎連續處理模式因為它從根本上依賴於micro-batch查詢的執行流。如果你寫在連續模式下,使用的數據foreach ()
代替。
一個空dataframe可以調用foreachBatch ()
和用戶代碼需要有彈性,以便正確操作。一個例子所示:
。foreachBatch((outputDf:DataFrame,報價:長)= >{/ /流程有效的數據幀如果(!outputDf。isEmpty){/ /業務邏輯}})。開始()
寫任何位置使用foreach ()
如果foreachBatch ()
不是一個選項(例如,您使用的是磚運行時低於4.2的,或不存在相應的批處理數據的作家),然後你可以表達你的定製作家邏輯使用嗎foreach ()
。具體來說,你可以表達寫作邏輯的數據分為三種方法:open ()
,過程()
,close ()
。
有關示例,請參見寫信給亞馬遜DynamoDB在Scala和Python使用foreach ()。
使用Scala或Java
在Scala或Java,您擴展類ForeachWriter:
datasetOfString。writeStream。foreach(新ForeachWriter(字符串]{def開放(partitionId:長,版本:長):布爾={/ /打開連接}def過程(記錄:字符串)={/ /寫字符串連接}def關閉(errorOrNull:Throwable):單位={/ /關閉連接}})。開始()
使用Python
在Python中,您可以調用foreach
在兩個方麵:在一個函數或一個對象。函數提供了一種簡單的方式來表達你的處理邏輯,但不允許您刪除處理失敗時生成的數據導致一些輸入數據的後處理。這種情況下你必須指定一個對象的處理邏輯。
的函數行作為輸入。
defprocessRow(行):/ /寫行來存儲查詢=streamingDF。writeStream。foreach(processRow)。開始()
的對象有一個
過程
方法和可選的開放
和關閉
方法:類ForeachWriter:def開放(自我,partition_id,epoch_id):/ /開放連接。這方法是可選在Python。def過程(自我,行):/ /寫行來連接。這方法是不可選在Python。def關閉(自我,錯誤):/ /關閉的連接。這方法是可選在Python。查詢=streamingDF。writeStream。foreach(ForeachWriter())。開始()
執行語義
流媒體查詢開始時,火花調用函數或對象的方法在以下方式:
這個對象的一個副本負責所有數據查詢中生成的一個任務。換句話說,一個實例負責處理一個分區的數據在一個分布式的方式生成。
這個對象必須是可序列化的,因為每個任務將得到一個新的serialized-deserialized提供對象的副本。因此,強烈建議任何寫作初始化數據(例如,打開一個連接或啟動一個事務)完成後調用
open ()
方法,它意味著任務準備生成數據。的生命周期方法如下:
為每個分區
partition_id
:對於每一批/時代的流數據
epoch_id
:方法
打開(partitionIdepochId)
被稱為。如果
打開(…)
返回true,分區和批處理中的每一行/時代,方法過程(行)
被稱為。方法
關上(錯誤)
被稱為時遇到以下錯誤(如果有的話)處理的行。的
close ()
方法(如果它存在的話)如果一個open ()
方法存在並返回成功(不管返回值),除非中間JVM或Python程序崩潰。
請注意
的partitionId
和epochId
在open ()
方法可以用來刪除處理失敗時生成的數據導致一些輸入數據的後處理。這取決於查詢的執行模式。如果流查詢被執行在micro-batch模式,然後每個分區代表一個獨特的元組(partition_idepoch_id)
保證相同的數據。因此,(partition_idepoch_id)
可以用來刪除處理和/或以事務的提交數據,實現隻有一次擔保。然而,如果流查詢被執行在連續模式,那麼這個保證不持有,因此不應該被用於重複數據刪除。