本機支持會話窗口的火花結構化流
2021年10月12日 在工程的博客
Apache火花™結構化流允許用戶進行聚合windows在事件時間。在Apache 3.2火花™,火花支持windows和滑動窗口。在即將到來的Apache火花3.2中,我們添加了“會話窗口”作為新的windows支持類型,適用於流媒體和批處理查詢
“會話窗口”是什麼?
暴跌windows是一係列的固定大小的重疊和連續的時間間隔。一個輸入隻能綁定到一個窗口。
滑動windows類似暴跌的“固定大小”,但windows可以重疊如果幻燈片的持續時間小於的持續時間窗口,在這種情況下,輸入可以綁定多個窗口。
會話窗口有不同特征相比前兩種類型。會話窗口有一個動態窗口長度的大小,取決於輸入。開始一個會話窗口,其中有一個輸入和擴展本身如果以下輸入已經收到在時間的差距。一個會話窗口關閉當沒有輸入收到後的差距持續時間內接收最新的輸入。這使您能夠組事件直到沒有新的事件特定時間段內的(不活躍)。
它的工作原理類似於網站上一個會話,會話超時——如果你登錄一個網站,沒有表現出任何活動持續時間,網站會提示您保持登錄狀態,迫使注銷如果你還不活動超時之後已經超過了。會話超時擴展當你展示活動。
應用此會話窗口:一個新的會話窗口中啟動一個新的事件時,如流媒體工作,在超時之後發生,事件將包括在相同的會話窗口。每個事件將會話超時,引入了一個不同的特點相比,其他時間窗口——會議的時間窗口不是靜態的,而暴跌和滑動窗口都有一個靜態時間。
如何實現一個查詢使用一個會話窗口?
以前,火花需要您利用flatMapGroupsWithState處理會話窗口。你被要求工藝自己的邏輯定義會話窗口和如何總輸入相同的會話。這帶來了一些缺點:
- 你不能利用內置的聚合函數(如數量,金額等,必須做他們自己。
- 工藝是重要的考慮各種輸出的邏輯模式和輸入的遲到。
- 在PySpark flatMapGroupsWithState不可用;因此,你需要工藝查詢通過Java / Scala。
現在,火花使用時間窗口提供了相同的用戶體驗。仍然是正確的,”這句話在結構化流,表達這種windows事件時間僅僅是執行一個特殊分組”。暴跌和滑動窗口,窗口的功能。會話窗口,一個新的函數session_window介紹。
例如,數量超過5分鍾暴跌(重疊)窗戶eventTime列後的事件可以被描述為。
#暴跌窗口windowedCountsDF=\eventsDF \.withWatermark (“eventTime”、“十分鍾”)\.groupBy(“的deviceId”,窗口(“eventTime”,“十分鍾”)\。數()#滑動窗口windowedCountsDF=\eventsDF \.withWatermark (“eventTime”、“十分鍾”)\.groupBy(“的deviceId”,窗口(“eventTime”,“十分鍾”,“5分鍾”))\。數()
您可以簡單地將函數“窗口”替換為“session_window”計算會話窗口以5分鍾的差距在eventTime列事件。
#會話窗口windowedCountsDF=\eventsDF \.withWatermark (“eventTime”、“十分鍾”)\session_window .groupBy(“的deviceId”(“eventTime”、“5分鍾”))\。數()
會話窗口與動態差距持續時間
除了會話窗口,跨會話持續時間相同的差距,有另一種類型的會話窗口,每個會話持續時間有不同的差距。我們稱之為“動態持續的差距。”
下麵的框線的時間表示每個事件持續時間的差距。有四個事件和他們的(事件時間、差距持續時間)對(上午、4分鍾),橙色(12:06 9分鍾),黃色(吸5分鍾),用綠色(12:15,5分鍾)。
上麵的框線表示實際的會話是由這些事件。您可以考慮每個事件作為一個個體會話,和會議有一個十字路口是合並成一個。正如你可能表明,會話的時間範圍是“聯盟”的時間範圍包括所有事件在會話中。注意會議的結束時間不再是最新的事件的持續時間+差距在會話中。
新功能“session_window”接收兩個參數,事件時間列和差距持續時間。
對於動態會話窗口,您可以提供一個“表達式”的“差距持續時間”參數“session_window”功能。表達式應該解析為一個區間,像“5分鍾”。自“差距持續時間”參數接收一個表達式,您還可以利用UDF。
例如,計算在會話窗口與基於eventType列的動態間隙時間可以描述如下。
#定義會話窗口有動態基於差距持續時間在eventTypesession_window expr=session_window(事件。時間戳,\當(events.eventType==類型1、5秒”)\。當(events.eventType==“type2”、“20秒”)\.otherwise(5分鍾)#集團的數據通過會話窗口和用戶標識,和計算出數量的每一個集團windowedCountsDF=事件\.withWatermark(“時間戳”,“十分鍾”)\.groupBy(事件。userID, session_window_expr) \。數()
會話窗口與FlatMapGroupsWithState的原生支持
“flatMapGroupsWithState”提供了更大的靈活性實現會話窗口,但它需要用戶編寫很多行代碼。例如,請參考sessionization的例子在Apache通過flatMapGroupsWithState火花實現會話窗口。注意,sessionization例子在Apache火花非常簡化,僅適用於處理時間& append模式對。處理事件的整體複雜性時間和各種輸出模式抽象了原生支持會話的窗口。
火花原生支持會話的窗口的設置一個目標覆蓋通用用例,因為它使火花來優化性能和狀態存儲使用。您可能還想利用flatMapGroupsWithState當你的業務用例需要複雜的會話窗口,例如,如果會話也應該結案了特定類型的事件無論靜止。
結論
我們已經介紹了會話窗口流聚合,也適用於批量查詢。學習如何使用新功能“session_window”,你可以利用你的知識流數據聚合的時間窗口,能夠處理會話窗口。您可以利用內置的聚合函數,以及自己的UDAFs會話窗口聚合查詢這也使SQL / PySpark用戶處理會話窗口,在PySpark flatMapGroupsWithState API不可用,不能表示成一個SQL語句。
還有更多的房間來改進準時窗口操作,你需要使用flatMapGroupsWithState API。我們正計劃研究自定義窗口業務在不久的將來。
如果你想嚐試即將到來的Apache 3.2火花磚10.0運行時,報名參加Community Edition磚或磚免費試用在幾分鍾內開始。使用火花3.2非常簡單,隻需選擇版本“10.0”時啟動集群。