跳轉到主要內容
工程的博客

事件時間聚合和水印在Apache火花的結構化流

@磚可伸縮的第4部分數據
通過如來佛Das

2017年5月8日 工程的博客

分享這篇文章

這是第四篇由多部分組成的係列如何你可以執行複雜的流分析使用Apache火花。


連續應用程序通常需要實時決策實時聚合statistics-such健康和讀數從物聯網設備或檢測異常行為。在這個博客中,我們將探索如何表達容易流聚合結構化流,以及自然遲了,和無序的數據處理。

流媒體聚合

結構化流允許用戶來表達相同的流媒體查詢作為一個批處理查詢,和火花的SQL引擎incrementalizes流數據的查詢和執行。例如,假設您有一個流DataFrame從物聯網設備與信號強度的事件,你要計算每個設備的運行平均信號強度,然後你會寫以下Python代碼:

# DataFrame w/模式[eventTime:時間戳的deviceId:字符串,信號:長整型數字]eventsDF=avgSignalDF=eventsDF.groupBy(“的deviceId”)。avg(“信號”)

這段代碼也一樣如果eventsDF DataFrame靜態數據。然而,在這種情況下,平均會持續更新新事件的到來。你選擇不同的輸出模式編寫更新後的平均水平等外部係統的文件係統和數據庫。此外,你還可以實現自定義聚合使用引發的用戶定義的聚合函數(UDAFs)

聚合在Windows事件時間

在許多情況下,而不是運行聚合在整個流,你想要在桶的數據聚合時間窗(每5分鍾或每小時)。我們在之前的例子中,它是深刻的,看看平均信號強度在最後5分鍾的情況下,如果設備已經開始表現出反常地。同時,這個5分鍾窗口應該基於時間戳嵌入數據(又名。事件時間),而不是時間。(又名正在處理處理時間)。

早些時候引發流DStream API使它很難表達這樣的事件時間windows的API是專門為處理時間(也就是說,windows的時間數據到達火花)。在結構化流,表達這類windows事件時間僅僅是執行一個特殊的分組使用窗口()函數。例如,數量超過5分鍾暴跌(重疊)windows事件中的eventTime列如下。

pyspark.sql。功能導入*windowedAvgSignalDF=\eventsDF \.groupBy (窗口(“eventTime”、“5分鍾”))\()

在上麵的查詢中,每個記錄是分配給一個5分鍾暴跌窗口如下圖所示。

事件時間5分鍾暴跌windows的映射

每個窗口是一個組的運行數量計算。您還可以定義重疊窗口通過指定窗口長度和滑動時間間隔。例如:

pyspark.sql。功能導入*windowedAvgSignalDF=\eventsDF \.groupBy (窗口(“eventTime”,“十分鍾”,“5分鍾”))\()

在上麵的查詢中,每一個記錄將被分配給多個重疊窗口如下圖所示。

映射的事件時間重疊的窗口長度10分鍾和滑動時間間隔5分鍾

這個分組策略自動處理後期和無序的數據——已故的事件隻會更新老窗口組而不是最新的。這是一個端到端的插圖的分組由一個查詢的deviceId和重疊的窗口。下麵的插圖顯示了查詢的最終結果如何改變新的數據處理後5分鍾觸發當你分組的的deviceId和滑動窗口(為簡便起見,省略了“信號”字段)。

windowedCountsDF=\eventsDF \.groupBy (“的deviceId”,窗口(“eventTime”,“十分鍾”,“5分鍾”))\()

後期數據處理在窗口分組聚合

注意後期,無序的記錄(上午,dev2)更新舊窗口的計數。

有狀態增量執行

在執行任何流聚合查詢,火花的SQL引擎內部維護中間聚合容錯狀態。這個狀態是結構化的鍵-值對,關鍵是,和值是中間聚合。版本,這些對存儲在內存中的鍵值“商店”的火花執行人檢查點使用寫前登錄的HDFS-compatible文件係統(配置檢查點位置)。在每一個觸發器,讀取和更新狀態存儲,和所有更新保存到寫日誌。在失敗的情況下,正確的版本的狀態恢複檢查點信息,以及查詢所得點失敗了。可複製的來源,和冪等下沉、結構化流確保隻有一次擔保有狀態的流處理。

容錯、僅一次狀態在結構化流流處理

這種容錯狀態管理自然需要一些處理開銷。保持這些開銷有界可接受的範圍內,狀態數據的大小不應無限增長。然而,隨著滑動窗口,窗口/組的數量將無限期地增長,所以可以大小的狀態(組)的數量成正比。結合國家大小,我們必須放棄舊總量不會更新,例如七天的平均水平。我們實現這一目標使用水印

水印極限狀態,而後期處理數據

如前所述,後期數據的到來會導致較早的windows更新。這個複雜的過程定義哪些舊總量不會更新,因此可從國家存儲限製國家的大小。在Apache火花2.1中,我們已經介紹了水印,使自動刪除舊的狀態數據。

水印是一個感人的事件時間閾值,落後的最大事件時間被處理過的數據查詢。落後的差距定義多久我們將等待數據到達。通過了解的數據將為給定組,我們可以限製國家的總金額,我們需要保持一個查詢。例如,假設配置的最大遲到10分鍾。這意味著遲到10分鍾的事件將被允許聚合。如果最大觀測事件時間12:33,那麼所有的未來事件事件時間比12:23將視為“太遲了”,放棄了。此外,windows 12:23以上的所有國家將被清除。你可以設置這個參數根據應用程序的需求——這個參數值的增大使得數據到達以後,但增加的成本大小,也就是說,內存使用,反之亦然。

前麵的例子但水印。

windowedCountsDF=\eventsDF \.withWatermark (“eventTime”、“十分鍾”)\.groupBy (“的deviceId”,窗口(“eventTime”,“十分鍾”,“5分鍾”))\()

執行該查詢時,火花SQL將自動跟蹤的最大觀測值eventTime列,更新水印和清晰的舊狀態。這是所示。

水印在窗口分組聚合

注意到這兩個事件之間的處理時間12:20和需要。水印用於區分末和“太遲了”事件並相應地治療。

結論

簡而言之,我討論了結構化流的窗口策略來處理關鍵流聚合:windows事件時間和後期和無序的數據。使用這個窗口策略允許結構化流引擎來實現水印,後期數據可以被丟棄。由於這個設計,我們可以管理state-store的大小。

在即將到來的2.2版本的Apache火花,我們添加了更高級的狀態流DataFrames /數據流處理操作。有關更多信息,請繼續關注本博客係列。如果你想了解更多關於結構化流,閱讀本係列之前的文章。

嚐試結構化流在Apache 2.0火花,今天試著磚

免費試著磚

相關的帖子

看到所有工程的博客的帖子
Baidu
map