pyspark.sql.streaming.DataStreamWriter.foreach
DataStreamWriter。 foreach ( f:聯盟[可調用的[[pyspark.sql.types。行,沒有],SupportsProcess] )→DataStreamWriterf。這通常是用來查詢的輸出流寫入任意的存儲係統。可以以兩種方式指定的處理邏輯。

  1. 一個函數這需要一行作為輸入。

    這是一個簡單的方式來表達你的處理邏輯。請注意,這並不允許您刪除處理失敗時生成的數據導致一些輸入數據的後處理。這就要求您指定的處理邏輯在未來。

  2. 一個對象與一個過程方法和可選的開放關閉方法。

    對象可以有以下方法。

    • 打開(partition_idepoch_id):可選化處理的方法

      (例如,打開一個連接,啟動一個事務,等等)。此外,您可以使用partition_idepoch_id刪除處理再生數據(稍後討論)。

    • 過程(行):可選的方法處理每個

    • 關上(錯誤):可選方法總結和清理(例如,

      緊密聯係,提交事務等)後,所有行處理。

    所使用的對象將引發以以下方式。

    • 這個對象的一個副本負責產生的所有數據

      在一個查詢單任務。換句話說,一個實例負責處理一個分區的數據在一個分布式的方式生成。

    • 這個對象必須是可序列化的,因為每個任務將新鮮

      serialized-deserialized提供對象的副本。因此,強烈建議任何寫作初始化數據(例如,打開一個連接或啟動一個事務)是後完成的打開(…)方法被稱為,這意味著任務準備生成數據。

    • 的生命周期方法如下。

      為每個分區partition_id:

      …每一批/時代的流數據epoch_id:

      ……。方法打開(partitionIdepochId)被稱為。

      ……。如果打開(…)返回true,分區和為每一行

      批/時代,方法過程(行)被稱為。

      ……。方法關上(errorOrNull)被稱為時遇到以下錯誤(如果有的話)

      行處理。

    重要的幾點注意事項:

    • partitionIdepochId可以用來刪除處理生成的數據什麼時候

      失敗導致一些輸入數據的後處理。這取決於查詢的執行模式。如果流查詢被執行在micro-batch模式,然後每個分區代表一個獨特的元組(partition_id epoch_id)是保證有相同的數據。因此,(partition_id epoch_id)可以用來刪除處理和/或以事務的提交數據,實現隻有一次擔保。然而,如果流查詢被執行在連續模式,那麼這個保證不持有,因此不應該被用於重複數據刪除。

    • close ()(如果存在的話)將被調用的方法open ()方法存在和

      成功返回(不考慮返回值),除了中間如果Python崩潰。

筆記

這個API是不斷發展的。

例子

> > >#打印每一行用一個函數> > >defprint_row():打印()> > >作家=自衛隊writeStreamforeach(print_row)> > >#打印每一行與流程()方法使用一個對象> > >RowPrinter:def開放(自我,partition_id,epoch_id):打印(“打開% d,% d%(partition_id,epoch_id))返回真正的def過程(自我,):打印()def關閉(自我,錯誤):打印(“封閉與錯誤:% s%str(錯誤))> > >作家=自衛隊writeStreamforeach(RowPrinter())