在Apache火花的任意狀態處理結構化流gydF4y2Ba
2017年10月17日,gydF4y2Ba 在gydF4y2Ba工程的博客gydF4y2Ba
這係列的第七篇是關於如何執行複雜的gydF4y2Ba流分析gydF4y2Ba使用Apache火花和結構化流。gydF4y2Ba
介紹gydF4y2Ba
大多數數據流,盡管連續流,流內離散事件,每個事件發生時的時間戳。因此,這個想法的核心是“事件時間”gydF4y2Ba結構化流gydF4y2Baapi是為事件時間,並提供的功能來處理這些離散事件。gydF4y2Ba
事件時間基本知識和事件時間處理gydF4y2Ba充分覆蓋著gydF4y2Ba結構化文檔流gydF4y2Ba和我們的gydF4y2Ba流選的技術資產結構gydF4y2Ba。為簡便起見,我們不會支付他們。建立在概念開發(在規模和測試)在事件時間處理,如滑動窗口,翻滾的窗戶,和水印,這個博客將集中在兩個主題:gydF4y2Ba
- 如何處理重複的事件流gydF4y2Ba
- 如何處理任意或自定義狀態處理gydF4y2Ba
刪除重複的gydF4y2Ba
沒有重複的條目的流事件都是免費的。刪除重複的條目在record-at-a-time係統上的需要,而這通常是一個繁瑣的操作是有原因的。首先,你必須處理小型或大型批記錄時拋棄他們。第二,一些事件,因為網絡延遲高,無序或延遲到達,這可能會迫使你重申或重複這個過程。你如何解釋呢?gydF4y2Ba
結構化流,確保完全once-semantics,可以減少重複消息,他們基於任意鍵。刪除處理數據,火花將維持指定的鍵和確保副本,當遇到,被丟棄。gydF4y2Ba
正如其他狀態處理api在結構化流被宣布有界gydF4y2Ba水印為後期數據gydF4y2Ba語義,所以減少重複。沒有水印,可維護的狀態可以無限增長的流。gydF4y2Ba
API來指示結構化流掉重複的所有其他API一樣簡單到目前為止我們已經表明在我們的博客和文檔。使用該API,您可以聲明任意列的下降的副本的例子,user_id和時間戳。一個條目相同的時間戳和user_id被標記為複製和下降,但相同的入口有兩個不同的時間戳。gydF4y2Ba
讓我們看一個例子如何使用簡單的API副本。gydF4y2Ba
進口org.apache.spark.sql.functions.exprgydF4y2Ba
withEventTimegydF4y2Ba.withWatermark (“event_time”、“5秒”)gydF4y2Ba.dropDuplicates(“用戶”、“event_time”)gydF4y2Ba.groupBy(“用戶”)gydF4y2Ba。gydF4y2Ba數gydF4y2Ba()gydF4y2Ba.writeStreamgydF4y2Ba.queryName(“刪除處理”)gydF4y2Ba.format(“記憶”)gydF4y2Ba.outputMode(“完整的”)gydF4y2Ba。gydF4y2Ba開始gydF4y2Ba()gydF4y2Ba
從gydF4y2Bapyspark.sql。functions import expr
withEventTime \gydF4y2Ba.withWatermark (“event_time”、“5秒”)\gydF4y2Ba.dropDuplicates((“用戶”、“event_time”)) \gydF4y2Ba.groupBy \(“用戶”)gydF4y2Ba。gydF4y2Ba數gydF4y2Ba()\gydF4y2Ba.writeStream \gydF4y2Ba.queryName \ (“pydeduplicated”)gydF4y2Ba.format \“記憶”gydF4y2Ba.outputMode \(“完整的”)gydF4y2Ba。gydF4y2Ba開始gydF4y2Ba()gydF4y2Ba
在查詢的,如果你要發出一個SQL查詢,你會得到一個精確的結果,與所有副本。gydF4y2Ba
選擇gydF4y2Ba*gydF4y2Ba從gydF4y2Ba刪除處理gydF4y2Ba+gydF4y2Ba- - - - - + - - - +gydF4y2Ba|gydF4y2Ba用戶gydF4y2Ba|gydF4y2Ba數gydF4y2Ba|gydF4y2Ba+gydF4y2Ba- - - - - + - - - +gydF4y2Ba|gydF4y2Ba一個gydF4y2Ba|gydF4y2Ba8085年gydF4y2Ba|gydF4y2Ba|gydF4y2BabgydF4y2Ba|gydF4y2Ba9123年gydF4y2Ba|gydF4y2Ba|gydF4y2BacgydF4y2Ba|gydF4y2Ba7715年gydF4y2Ba|gydF4y2Ba|gydF4y2BaggydF4y2Ba|gydF4y2Ba9167年gydF4y2Ba|gydF4y2Ba|gydF4y2BahgydF4y2Ba|gydF4y2Ba7733年gydF4y2Ba|gydF4y2Ba|gydF4y2BaegydF4y2Ba|gydF4y2Ba9891年gydF4y2Ba|gydF4y2Ba|gydF4y2BafgydF4y2Ba|gydF4y2Ba9206年gydF4y2Ba|gydF4y2Ba|gydF4y2BadgydF4y2Ba|gydF4y2Ba8124年gydF4y2Ba|gydF4y2Ba|gydF4y2Ba我gydF4y2Ba|gydF4y2Ba9255年gydF4y2Ba|gydF4y2Ba+gydF4y2Ba- - - - - + - - - +gydF4y2Ba
接下來,我們將詳述如何實現一個定製的狀態處理使用兩個結構化流api。gydF4y2Ba
使用任意或自定義狀態處理gydF4y2Ba
並不是所有的事件時間基礎處理等於或簡單聚合一個特定的數據列在一個事件。其他事件更複雜;他們需要處理的行事件歸結為一組;他們隻有意義整體上加工時通過發射一個結果或多行結果,取決於你的用例。gydF4y2Ba
考慮這些用例,任意的或定製的有狀態的處理成為必要的:gydF4y2Ba
1。我們想要發出一個警報基於一組或類型的事件如果我們觀察到他們超過一個閾值gydF4y2Ba
2。我們要保持用戶會話,在明確的或不確定的時間和堅持這些會話後分析。gydF4y2Ba
所有上述場景需要定製加工。結構化流api提供了一組api來處理這些情況:gydF4y2BamapGroupsWithStategydF4y2Ba
和gydF4y2BaflatMapGroupsWithState。gydF4y2Ba
mapGroupsWithStatgydF4y2Ba
e可以操作組和輸出為每組隻有一個結果行,而gydF4y2BaflatMapGroupsWithStategydF4y2Ba
可以發出一行或多行每組的結果。gydF4y2Ba
超時和狀態gydF4y2Ba
有一點要注意的是,因為我們管理組的狀態基於用戶定義的概念,正如上麵表達的用例,水印的語義(到期或丟棄事件)可能並不總是適用。相反,我們自己必須指定一個合適的超時。超時之前規定我們應該等多久時間了一些中間狀態。gydF4y2Ba
超時可以根據處理時間gydF4y2Ba(GroupStateTimeout.ProcessingTimeTimeout)gydF4y2Ba
或事件的時間gydF4y2Ba(GroupStateTimeout.EventTimeTimeout)。gydF4y2Ba
當使用超時,您可以檢查超時處理之前,先通過檢查標誌的值gydF4y2Bastate.hasTimedOut。gydF4y2Ba
設置處理超時,使用gydF4y2BaGroupState.setTimeoutDuration (…)gydF4y2Ba
方法。這意味著超時保證將發生在下列條件:gydF4y2Ba
- 超時將永遠不會發生在時鍾之前已上漲gydF4y2BaX女士gydF4y2Ba中指定的方法gydF4y2Ba
- 超時將最終出現在查詢中有一個觸發器,之後gydF4y2BaX女士gydF4y2Ba
設置事件時間超時,使用gydF4y2BaGroupState.setTimeoutTimestamp (…)gydF4y2Ba
。隻有基於事件的超時時間必須指定水印。因此組中的所有事件比水印將過濾掉,超時會發生當水印擁有先進的超出了設定時間戳。gydF4y2Ba
當超時發生時,將調用函數中提供流媒體查詢參數:你保持狀態的關鍵;輸入迭代器行,一個古老的國家。的例子gydF4y2BamapGroupsWithStategydF4y2Ba
下麵定義了大量的功能使用類和對象。gydF4y2Ba
例子與mapGroupsWithStategydF4y2Ba
讓我們看一個簡單的例子,我們想找出(時間戳)當用戶執行他或她的第一個和最後一個活動在一個給定的數據集在一個流。在這種情況下,我們將小組(或映射)用戶密鑰和活動組合鍵。gydF4y2Ba
但首先,gydF4y2BamapGroupsWithStategydF4y2Ba
需要大量的功能類和對象:gydF4y2Ba
1。三個類定義:一個輸入定義,定義,和可選的輸出定義。gydF4y2Ba
2。一個更新函數基於一個關鍵,迭代器的事件,和前一個狀態。gydF4y2Ba
3所示。一個超時參數如上所述。gydF4y2Ba
讓我們定義輸入、輸出和狀態數據結構定義。gydF4y2Ba
情況下gydF4y2Ba類InputRow (gydF4y2Ba用戶gydF4y2Ba:字符串,gydF4y2Ba時間戳gydF4y2Ba:java.sql.Timestamp,活動:字符串)gydF4y2Ba情況下gydF4y2Ba類UserState (gydF4y2Ba用戶gydF4y2Ba:字符串,gydF4y2Bavar活動:字符串,gydF4y2BavargydF4y2Ba開始gydF4y2Ba:java.sql.Timestamp,gydF4y2BavargydF4y2Ba結束gydF4y2Ba:java.sql.Timestamp)gydF4y2Ba
基於給定的輸入行,我們定義我們的更新函數gydF4y2Ba
defgydF4y2BaupdateUserStateWithEventgydF4y2Ba(gydF4y2Ba狀態:UserState,gydF4y2Ba輸入gydF4y2Ba:InputRowgydF4y2Ba):gydF4y2BaUserState = {gydF4y2Ba/ /沒有時間戳,隻是忽略它gydF4y2Ba如果gydF4y2Ba(選項(gydF4y2Ba輸入gydF4y2Ba.timestamp .isEmpty) {gydF4y2Ba返回gydF4y2Ba狀態gydF4y2Ba}gydF4y2Ba/ /活動是否匹配gydF4y2Ba為gydF4y2Ba的gydF4y2Ba輸入gydF4y2Ba行gydF4y2Ba如果gydF4y2Ba(狀態。一個ctivity ==輸入gydF4y2Ba.activity) {gydF4y2Ba如果gydF4y2Ba(gydF4y2Ba輸入gydF4y2Ba.timestamp.after (state.end)) {gydF4y2Ba狀態。結束=gydF4y2Ba輸入gydF4y2Ba.timestampgydF4y2Ba}gydF4y2Ba如果gydF4y2Ba(gydF4y2Ba輸入gydF4y2Ba.timestamp.before (state.start)) {gydF4y2Ba狀態。開始=輸入gydF4y2Ba.timestampgydF4y2Ba}gydF4y2Ba}gydF4y2Ba其他的gydF4y2Ba{gydF4y2Ba/ /其他一些活動gydF4y2Ba如果gydF4y2Ba(gydF4y2Ba輸入gydF4y2Ba.timestamp.after (state.end)) {gydF4y2Ba狀態。開始=輸入gydF4y2Ba.timestampgydF4y2Ba狀態。結束=gydF4y2Ba輸入gydF4y2Ba.timestampgydF4y2Ba狀態。一個ctivity =輸入gydF4y2Ba.activitygydF4y2Ba}gydF4y2Ba}gydF4y2Ba/ /gydF4y2Ba返回gydF4y2Ba更新後的狀態gydF4y2Ba狀態gydF4y2Ba}gydF4y2Ba
最後,我們寫函數定義的狀態更新基於一個時代的行。gydF4y2Ba
進口gydF4y2Baorg.apache.spark.sql.streaming。{GroupStateTimeout, OutputMode, GroupState}def updateAcrossEvents(用戶:gydF4y2Ba字符串gydF4y2Ba,gydF4y2Ba輸入gydF4y2Ba:迭代器(InputRow),gydF4y2BaoldStategydF4y2Ba:GroupState [UserState]): UserState = {gydF4y2BavargydF4y2Ba狀態:UserState =gydF4y2Ba如果gydF4y2Ba(oldState.exists) oldState.getgydF4y2Ba其他的gydF4y2BaUserState(用戶、gydF4y2Ba”“gydF4y2Ba,gydF4y2Ba新gydF4y2Bajava.sql.Timestamp (6284160000000 l),gydF4y2Ba新gydF4y2Bajava.sql.Timestamp(6284160升)gydF4y2Ba)gydF4y2Ba/ /我們簡單地指定一個舊的日期,我們可以比較和gydF4y2Ba/ /立即更新基於值的數據gydF4y2Ba為gydF4y2Ba(輸入gydF4y2Ba
有了這些作品,我們現在可以在查詢中使用它們。正如上麵所討論的,我們必須指定超時的方法可以超時給定組的狀態,我們可以控製和政府應該做什麼當之後沒有收到更新超時。對於這個例子,我們將無限期地保持狀態。gydF4y2Ba
進口gydF4y2Baorg.apache.spark.sql.streaming.GroupStateTimeoutgydF4y2Ba
withEventTimegydF4y2Ba.selectExpr (gydF4y2Ba“用戶用戶”gydF4y2Ba,gydF4y2Ba“鑄(Creation_Time / 1000000000作為時間戳)作為時間戳”gydF4y2Ba,gydF4y2Ba“gt活動”gydF4y2Ba)gydF4y2Ba。as [InputRow]gydF4y2Ba/ /組由用戶的關鍵gydF4y2Ba.groupByKey (_.user)gydF4y2Ba.mapGroupsWithState (GroupStateTimeout.NoTimeout) (updateAcrossEvents)gydF4y2Ba.writeStreamgydF4y2Ba.queryName (gydF4y2Ba“events_per_window”gydF4y2Ba)gydF4y2Ba.format (gydF4y2Ba“記憶”gydF4y2Ba)gydF4y2Ba.outputMode (gydF4y2Ba“更新”gydF4y2Ba)gydF4y2Ba.start ()gydF4y2Ba
我們現在可以流的查詢結果:gydF4y2Ba
選擇gydF4y2Ba*gydF4y2Ba從gydF4y2Baevents_per_windowgydF4y2Ba訂單gydF4y2Ba通過gydF4y2Ba用戶gydF4y2Ba,gydF4y2Ba開始gydF4y2Ba
和我們的示例結果表明用戶活動的第一個和最後一個時間戳:gydF4y2Ba
+gydF4y2Ba- - - - - + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - - - - +gydF4y2Ba|gydF4y2Ba用戶gydF4y2Ba|gydF4y2Ba活動gydF4y2Ba|gydF4y2Ba開始gydF4y2Ba|gydF4y2Ba結束gydF4y2Ba|gydF4y2Ba+gydF4y2Ba- - - - - + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - - - - +gydF4y2Ba|gydF4y2Ba一個gydF4y2Ba|gydF4y2Ba自行車gydF4y2Ba|gydF4y2Ba2015年gydF4y2Ba-02年gydF4y2Ba-23年gydF4y2Ba13gydF4y2Ba:gydF4y2Ba30.gydF4y2Ba:…gydF4y2Ba|gydF4y2Ba2015年gydF4y2Ba-02年gydF4y2Ba-23年gydF4y2Ba14gydF4y2Ba:gydF4y2Ba06gydF4y2Ba:…gydF4y2Ba|gydF4y2Ba|gydF4y2Ba一個gydF4y2Ba|gydF4y2Ba自行車gydF4y2Ba|gydF4y2Ba2015年gydF4y2Ba-02年gydF4y2Ba-23年gydF4y2Ba13gydF4y2Ba:gydF4y2Ba30.gydF4y2Ba:…gydF4y2Ba|gydF4y2Ba2015年gydF4y2Ba-02年gydF4y2Ba-23年gydF4y2Ba14gydF4y2Ba:gydF4y2Ba06gydF4y2Ba:…gydF4y2Ba|gydF4y2Ba…gydF4y2Ba|gydF4y2BabgydF4y2Ba|gydF4y2Ba自行車gydF4y2Ba|gydF4y2Ba2015年gydF4y2Ba-02年gydF4y2Ba-24年gydF4y2Ba14gydF4y2Ba:gydF4y2Ba01gydF4y2Ba:…gydF4y2Ba|gydF4y2Ba2015年gydF4y2Ba-02年gydF4y2Ba-24年gydF4y2Ba14gydF4y2Ba:gydF4y2Ba38gydF4y2Ba:…gydF4y2Ba|gydF4y2Ba|gydF4y2BabgydF4y2Ba|gydF4y2Ba自行車gydF4y2Ba|gydF4y2Ba2015年gydF4y2Ba-02年gydF4y2Ba-24年gydF4y2Ba14gydF4y2Ba:gydF4y2Ba01gydF4y2Ba:…gydF4y2Ba|gydF4y2Ba2015年gydF4y2Ba-02年gydF4y2Ba-24年gydF4y2Ba14gydF4y2Ba:gydF4y2Ba38gydF4y2Ba:…gydF4y2Ba|gydF4y2Ba|gydF4y2BacgydF4y2Ba|gydF4y2Ba自行車gydF4y2Ba|gydF4y2Ba2015年gydF4y2Ba-02年gydF4y2Ba-23年gydF4y2Ba12gydF4y2Ba:gydF4y2Ba40gydF4y2Ba:…gydF4y2Ba|gydF4y2Ba2015年gydF4y2Ba-02年gydF4y2Ba-23年gydF4y2Ba13gydF4y2Ba:gydF4y2Ba15gydF4y2Ba:…gydF4y2Ba|gydF4y2Ba…gydF4y2Ba|gydF4y2BadgydF4y2Ba|gydF4y2Ba自行車gydF4y2Ba|gydF4y2Ba2015年gydF4y2Ba-02年gydF4y2Ba-24年gydF4y2Ba13gydF4y2Ba:gydF4y2Ba07年gydF4y2Ba:…gydF4y2Ba|gydF4y2Ba2015年gydF4y2Ba-02年gydF4y2Ba-24年gydF4y2Ba13gydF4y2Ba:gydF4y2Ba42gydF4y2Ba:…gydF4y2Ba|gydF4y2Ba+gydF4y2Ba- - - - - + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - - - - +gydF4y2Ba
接下來是什麼gydF4y2Ba
在這個博客中,我們擴展兩個額外的功能和先進的流式分析的api。第一個允許刪除重複的水印有界。第二,您可以實現自定義狀態聚合,超越gydF4y2Ba事件時間基本gydF4y2Ba和gydF4y2Ba事件時間處理gydF4y2Ba。gydF4y2Ba
通過使用mapGroupsWithState api的一個例子,我們演示了如何實現定製狀態聚合的事件處理語義可以定義不僅通過超時也由用戶語義和業務邏輯。gydF4y2Ba
在本係列的下一篇博客中,我們將探索先進的方麵gydF4y2BaflatMapGroupsWithStategydF4y2Ba
用例,將會討論gydF4y2Ba引發歐盟峰會gydF4y2Ba,在都柏林gydF4y2Ba深潛水結構化流媒體會話gydF4y2Ba。gydF4y2Ba
閱讀更多gydF4y2Ba
在結構化流媒體開發和發布以來Apache 2.0火花,我們編製了一個全麵的技術資產的綱要,包括結構化係列博客。在這裏你可以閱讀相關的資產:gydF4y2Ba
試穿最新Apache火花的結構化流api磚”gydF4y2Ba數據Lakehouse平台Beplay体育安卓版本gydF4y2Ba。gydF4y2Ba