卡夫卡與OffsetOutOfRangeException客戶終止

卡夫卡客戶終止與“OffsetOutOfRangeException”當試圖獲取信息

寫的vikas.yadav

去年發表在:2022年6月1日

問題

你有一個Apache火花應用程序試圖獲取信息從一個Apache卡夫卡源時終止了kafkashaded.org.apache.kafka.clients.consumer.OffsetOutOfRangeException錯誤消息。

導致

火花應用程序從卡夫卡試圖獲取過期數據偏移量。

我們通常看到在這兩個場景:

場景1

火花應用程序處理數據時終止。當火花應用程序重新啟動時,它試圖獲取數據基於先前計算數據偏移量。如果有數據的偏移量已經過期在火花應用程序終止時,這個問題會發生。

場景2

你保留政策將比時間更短的時間需要處理批處理。批處理完成的時候,一些卡夫卡分區補償已經過期。下一批的補償計算,如果有不匹配的檢查點元數據由於過期的偏移量,這個問題會發生。

解決方案

場景1 -選項1

刪除現有的檢查點之前重啟引發應用程序。創建一個新的檢查點抵消新獲取的細節抵消。

這種方法的缺點是,一些數據可能會錯過,因為卡夫卡的補償已經過期。

場景1 -選項2

增加了卡夫卡的保留政策的話題,較長時間的火花應用程序離線。

沒有數據是錯過了這個解決方案,因為沒有補償到期之前引發重新啟動應用程序。

有兩種類型的保留政策:

  • 基於時間的保留——這種類型的政策定義了日誌的時間保持段之前自動刪除。默認的基於時間的數據保留窗口所有話題都是七天。你可以檢查的卡夫卡文檔log.retention.hours,log.retention.minutes,log.retention.ms為更多的信息。
  • 基於尺寸保留——這種類型的策略定義的數據量為每個topic-partition保留日誌中。這個限製是每個分區。默認情況下該值是無限的。你可以檢查的卡夫卡文檔log.retention.bytes為更多的信息。
刪除

信息

如果多個保留策略設置更為嚴格的控製。這可以覆蓋在每個主題的基礎上。

卡夫卡的回顧主題級配置更多信息有關如何設置每個主題覆蓋。

場景2 -選項1

增加的保留政策分區。這是同樣的解決方案來完成場景1 -選項2

場景2 -選項2

增加並行工人通過配置的數量.option (“minPartitions”, < X >)readStream

的選項minPartitions定義了最小數量的分區從卡夫卡讀取。默認情況下,火花使用卡夫卡的一對一的映射主題分區引發分區時從卡夫卡消費數據。如果您設置選項minPartitions值大於你的卡夫卡主題分區的數量,火花把卡夫卡主題分區成更小的碎片。

這個選項是推薦的數據傾斜時,高峰負荷,如果你流是落後的。設置這個值大於默認初始化結果的卡夫卡消費者在每一個觸發器。這會影響性能時如果使用SSL連接到卡夫卡。