水印應用於控製數據處理的閾值

本文介紹了數字水印的基本概念,並提供建議使用共同的水印有狀態流操作。你必須申請水印有狀態的流操作,以避免無限擴大的數據量保持狀態,從而引入內存問題和增加在長期運行流操作期間處理延遲。

水印是什麼?

結構化流使用水印來控製閾值多長時間繼續處理更新對於一個給定的實體狀態。國有實體的常見例子包括:

  • 聚合在一個時間窗口。

  • 獨特的鍵聯接兩個流。

當你聲明一個水印,你指定一個時間戳字段和流DataFrame水印閾值。隨著新數據到來,國家經理跟蹤最近的時間戳在指定的領域和流程中的所有記錄遲到閾值。

下麵的示例10分鍾的水印閾值適用於窗口數:

pyspark.sql.functions進口窗口(dfwithWatermark(“event_time”,“十分鍾”)groupBy(窗口(“event_time”,“5分鍾”),“id”)())

在這個例子中:

  • event_time列是用來定義一個10分鍾的水印和5分鍾暴跌窗口。

  • 收集計數為每個id觀察每個重疊5分鍾窗口。

  • 為每個計數狀態信息維護到窗口10分鍾以上最新的觀察event_time

重要的

水印閾值保證記錄內到達指定的閾值處理根據定義的語義查詢。後麵到達記錄到達外指定的閾值可能仍然使用查詢處理指標,但這是沒有保證的。

水印處理時間和吞吐量影響如何?

水印與輸出模式來控製數據寫入時下沉。因為水印減少處理狀態信息的總量,有效利用水印是必不可少的有效的狀態流的吞吐量。

請注意

並不是所有的輸出模式都支持所有有狀態操作。

水印為窗口的聚合和輸出模式

下表的細節處理查詢與聚合時間戳水印定義:

輸出模式

行為

附加

行寫入目標表一旦水印閾值已經過去了。所有的寫操作都被推遲基於遲到閾值。舊的聚合狀態閾值已通過刪除一次。

更新

行寫入目標表的計算結果,可以更新和覆蓋新數據到來。舊的聚合狀態閾值已通過刪除一次。

完整的

聚合狀態並不下降。目標表與每個觸發器重寫。

stream-stream加入水印和輸出

多個流隻支持append模式之間的連接,和匹配的記錄都寫在每一批他們發現。內部連接,磚建議設置水印閾值在每個流數據來源。這允許狀態信息被丟棄老的記錄。沒有水印,結構化流試圖加入每個鍵加入每個觸發器的兩邊。

結構化流有特殊的語義支持外部連接。水印是強製性的外部連接,因為它表明當一個關鍵必須用null值後無可匹敵的。注意,盡管外部連接可以用於記錄記錄不匹配在數據處理過程中,因為隻加入寫入表附加操作,這沒有記錄丟失的數據,直到遲到閾值後已經過去了。

控製後期數據閾值與多個水印政策結構化流

當使用多個結構化流輸入,您可以設置多個水印來控製公差為後麵到達數據閾值。配置水印允許您控製狀態信息和影響延遲。

流媒體查詢可以有多個輸入流聯合或連接在一起。每一個輸入流可以有不同的閾值的數據需要容忍有狀態操作。指定這些閾值使用withWatermarks (“eventTime”,延遲)對每一個輸入流。下麵是一個示例查詢stream-stream連接

瓦爾inputStream1=/ /延遲1小時瓦爾inputStream2=/ /延遲2個小時inputStream1withWatermark(“eventTime1”,“1小時”)加入(inputStream2withWatermark(“eventTime2”,“兩小時”),joinCondition)

在運行查詢,結構化流單獨跟蹤最大事件時間出現在每一個輸入流,計算水印根據相應的延遲,並選擇一個全球水印用於有狀態操作。默認情況下,選擇最小作為全球水印,因為它確保沒有數據意外下降太晚如果別人背後的溪流瀑布之一(例如,一個流停止接收數據由於上遊失敗)。換句話說,全球水印在最慢的速度流和安全地移動查詢輸出相應延遲。

如果你想要更快的結果,您可以設置多個水印策略選擇最大值隨著全球水印通過設置SQL配置spark.sql.streaming.multipleWatermarkPolicy馬克斯(默認是最小值)。這讓全球水印以最快的速度流。然而,這種配置下降最慢的數據流。正因為如此,磚建議你謹慎使用此配置。

減少重複在水印

在磚運行時的13.1及以上,你可以刪除處理記錄在一個水印閾值使用一個惟一的標識符。

結構化流提供隻有一次處理擔保,但不會自動刪除處理記錄數據來源。您可以使用dropDuplicatesWithinWatermark刪除處理記錄在任何指定的字段,允許您刪除重複從流,即使某些領域有所不同(例如事件時間和到達時間)。

重複的記錄,在到達指定的水印是保證。這個保證是嚴格的隻有一個方向,和重複的記錄之外,到達指定的閾值也可能下降。你必須設置水印的延遲閾值超過最大時間戳差異刪除所有重複重複的事件。

您必須指定一個水印使用dropDuplicatesWithinWatermark方法,如以下示例:

streamingDf=火花readStream#刪除處理使用guid列與水印基於eventTime列(streamingDfwithWatermark(“eventTime”,“10個小時”)dropDuplicatesWithinWatermark(“guid”))
瓦爾streamingDf=火花readStream/ /列:guid, eventTime,…/ /刪除處理使用guid列與水印基於eventTime列streamingDfwithWatermark(“eventTime”,“10個小時”)dropDuplicatesWithinWatermark(“guid”)