Kafka客戶端終止與offsetoutofraneexception

Kafka客戶端在嚐試獲取消息時被' offsetoutofraneexception '終止

寫的vikas.yadav

最後發布時間:2022年6月1日

問題

您有一個Apache Spark應用程序,它試圖從Apache Kafka源獲取消息,當它以kafkashaded.org.apache.kafka.clients.consumer.OffsetOutOfRangeException錯誤消息。

導致

您的Spark應用程序試圖從Kafka獲取過期的數據偏移量。

我們通常在以下兩種情況中看到:

場景1

Spark應用程序正在處理數據時被終止。當Spark應用程序重新啟動時,它會根據之前計算的數據偏移量嚐試獲取數據。如果任何數據偏移在Spark應用程序終止期間過期,就會發生此問題。

場景2

您的保留策略被設置為比處理批處理所需時間更短的時間。當批處理完成時,一些Kafka分區偏移量已經過期。為下一批計算偏移量,如果由於偏移量過期而導致檢查點元數據不匹配,則會出現此問題。

解決方案

場景1—選項1

重新啟動Spark應用程序前,請先刪除已有的檢查點。使用新獲取的偏移量的詳細信息創建一個新的檢查點偏移量。

這種方法的缺點是可能會丟失一些數據,因為Kafka中的偏移量已經過期。

場景1 -選項2

增加主題的Kafka保留策略,使其長於Spark應用程序脫機的時間。

這種解決方案不會丟失任何數據,因為在Spark應用程序重新啟動之前,沒有任何偏移量過期。

留存策略有兩種類型:

  • 基於時間的留存—該策略定義了日誌段在被自動刪除前的保留時間。所有主題的默認基於時間的數據保留窗口為7天。你可以查看Kafka文檔log.retention.hourslog.retention.minutes,log.retention.ms更多信息。
  • 基於規模的用戶留存—這種類型的策略定義了在日誌中為每個主題分區保留的數據量。這個限製是每個分區。默認情況下,該值是無限的。你可以查看Kafka文檔log.retention.bytes更多信息。
刪除

信息

如果設置了多個保留策略,限製越嚴格的策略控製越好。這可以在每個主題的基礎上被覆蓋。

卡夫卡的回顧主題級配置有關如何設置每個主題覆蓋的詳細信息。

場景2—選項1

增加分區的保留策略。這與for的解的方法相同場景1 -選項2

場景2—選項2

通過配置增加並行工作者的數量.option(“minPartitions”,< X >)readStream

的選項minPartitions定義從Kafka讀取的最小分區數。默認情況下,Spark在使用Kafka數據時使用Kafka主題分區到Spark分區的一對一映射。如果你設置了這個選項minPartitions到大於Kafka主題分區數量的值時,Spark將Kafka主題分區分離為更小的塊。

在數據傾斜、峰值負載和流落後的情況下,建議使用此選項。將這個值設置為大於默認值會導致在每個觸發器上初始化Kafka消費者。如果在連接到Kafka時使用SSL,這可能會影響性能。