如何啟動一個結構化流查詢從去年寫抵消嗎

學習如何重啟一個結構化流查詢上次寫的偏移量。

寫的亞當Pavlacka

去年發表在:2022年5月18日

場景

你有流,運行窗口的聚合查詢,讀來自Apache卡夫卡和寫文件附加模式。你想升級的應用程序並重新啟動查詢抵消等於最後寫抵消。你想丟棄所有狀態信息還沒有被寫入下沉,開始處理從最早的偏移導致廢棄狀態,並相應地修改目錄檢查站。

然而,如果您使用現有的檢查點升級應用程序代碼後,舊州和對象重用以前的應用程序版本,導致意想不到的輸出如閱讀舊來源或處理舊的應用程序代碼。

解決方案

Apache火花保持狀態在執行檢查點和二進製對象。因此您不能修改檢查點目錄。作為一種替代方法,複製和更新偏移量與輸入記錄並存儲在文件或數據庫中。讀它在下一次重新啟動的初始化和使用相同的值readStream。確保刪除檢查點目錄。

你可以通過使用異步api:當前偏移量

% scala spark.streams。addListener(新StreamingQueryListener(){覆蓋def onQueryStarted (queryStarted: QueryStartedEvent):單位= {println(“查詢開始:”+ queryStarted.id)}覆蓋def onQueryTerminated (queryTerminated: QueryTerminatedEvent):單位= {println(“查詢終止”+ queryTerminated.id)}覆蓋def onQueryProgress (queryProgress: QueryProgressEvent):單位= {println(“查詢取得進展”)println(“開始抵消:”+ queryProgress.progress.sources (0) .startOffset) println(“結束偏移量:”+ queryProgress.progress.sources (0) .endOffset) / /邏輯來保存這些補償}})

您可以使用readStream所寫的最新抵消過程如上所示:

% scala選項(startingOffsets“”、“”{“articleA”:{“0”: 23日,“1”:1},“articleB”: {“0”: 2}} " " ")

流媒體記錄的輸入模式:

根|——關鍵:二進製(nullable = true) |——價值:二進製(nullable = true) |——文章:字符串(nullable = true) |——分區:整數(nullable = true) |——抵消:長(可空= true) |——時間戳:時間戳(nullable = true) |——timestampType:整數(nullable = true)

同樣,你可以實現邏輯保存並更新抵消數據庫和讀它在下次重新啟動。

這篇文章有用嗎?