DataStreamWriter。
foreach
這是一個簡單的方式來表達你的處理邏輯。請注意,這並不允許您刪除處理失敗時生成的數據導致一些輸入數據的後處理。這就要求您指定的處理邏輯在未來。
過程
開放
關閉
對象可以有以下方法。
打開(partition_idepoch_id)
(例如,打開一個連接,啟動一個事務,等等)。此外,您可以使用partition_id和epoch_id刪除處理再生數據(稍後討論)。
過程(行):可選的方法處理每個行。
過程(行)
行
關上(錯誤)
緊密聯係,提交事務等)後,所有行處理。
所使用的對象將引發以以下方式。
在一個查詢單任務。換句話說,一個實例負責處理一個分區的數據在一個分布式的方式生成。
serialized-deserialized提供對象的副本。因此,強烈建議任何寫作初始化數據(例如,打開一個連接或啟動一個事務)是後完成的打開(…)方法被稱為,這意味著任務準備生成數據。
的生命周期方法如下。
為每個分區partition_id: …每一批/時代的流數據epoch_id: ……。方法打開(partitionIdepochId)被稱為。 ……。如果打開(…)返回true,分區和為每一行 批/時代,方法過程(行)被稱為。 ……。方法關上(errorOrNull)被稱為時遇到以下錯誤(如果有的話) 行處理。
為每個分區partition_id:
partition_id
…每一批/時代的流數據epoch_id:
epoch_id
……。方法打開(partitionIdepochId)被稱為。
打開(partitionIdepochId)
打開(…)
批/時代,方法過程(行)被稱為。
關上(errorOrNull)
行處理。
重要的幾點注意事項:
失敗導致一些輸入數據的後處理。這取決於查詢的執行模式。如果流查詢被執行在micro-batch模式,然後每個分區代表一個獨特的元組(partition_id epoch_id)是保證有相同的數據。因此,(partition_id epoch_id)可以用來刪除處理和/或以事務的提交數據,實現隻有一次擔保。然而,如果流查詢被執行在連續模式,那麼這個保證不持有,因此不應該被用於重複數據刪除。
close ()
成功返回(不考慮返回值),除了中間如果Python崩潰。
筆記
這個API是不斷發展的。
例子
> > >#打印每一行用一個函數> > >defprint_row(行):…打印(行)…> > >作家=自衛隊。writeStream。foreach(print_row)> > >#打印每一行與流程()方法使用一個對象> > >類RowPrinter:…def開放(自我,partition_id,epoch_id):…打印(“打開% d,% d”%(partition_id,epoch_id))…返回真正的…def過程(自我,行):…打印(行)…def關閉(自我,錯誤):…打印(“封閉與錯誤:% s”%str(錯誤))…> > >作家=自衛隊。writeStream。foreach(RowPrinter())
以前的
pyspark.sql.streaming.DataStreamReader.text
下一個
pyspark.sql.streaming.DataStreamWriter.foreachBatch