問題
您有一個Apache Spark應用程序,它試圖從Apache Kafka源獲取消息,當它以kafkashaded.org.apache.kafka.clients.consumer.OffsetOutOfRangeException錯誤消息。
導致
您的Spark應用程序試圖從Kafka獲取過期的數據偏移量。
我們通常在以下兩種情況中看到:
場景1
Spark應用程序正在處理數據時被終止。當Spark應用程序重新啟動時,它會根據之前計算的數據偏移量嚐試獲取數據。如果任何數據偏移在Spark應用程序終止期間過期,就會發生此問題。
場景2
您的保留策略被設置為比處理批處理所需時間更短的時間。當批處理完成時,一些Kafka分區偏移量已經過期。為下一批計算偏移量,如果由於偏移量過期而導致檢查點元數據不匹配,則會出現此問題。
解決方案
場景1—選項1
重新啟動Spark應用程序前,請先刪除已有的檢查點。使用新獲取的偏移量的詳細信息創建一個新的檢查點偏移量。
這種方法的缺點是可能會丟失一些數據,因為Kafka中的偏移量已經過期。
場景1 -選項2
增加主題的Kafka保留策略,使其長於Spark應用程序脫機的時間。
這種解決方案不會丟失任何數據,因為在Spark應用程序重新啟動之前,沒有任何偏移量過期。
留存策略有兩種類型:
- 基於時間的留存—該策略定義了日誌段在被自動刪除前的保留時間。所有主題的默認基於時間的數據保留窗口為7天。你可以查看Kafka文檔log.retention.hours,log.retention.minutes,log.retention.ms更多信息。
- 基於規模的用戶留存—這種類型的策略定義了在日誌中為每個主題分區保留的數據量。這個限製是每個分區。默認情況下,該值是無限的。你可以查看Kafka文檔log.retention.bytes更多信息。
卡夫卡的回顧主題級配置有關如何設置每個主題覆蓋的詳細信息。
場景2—選項1
增加分區的保留策略。這與for的解的方法相同場景1 -選項2.
場景2—選項2
通過配置增加並行工作者的數量.option(“minPartitions”,< X >)為readStream.
的選項minPartitions定義從Kafka讀取的最小分區數。默認情況下,Spark在使用Kafka數據時使用Kafka主題分區到Spark分區的一對一映射。如果你設置了這個選項minPartitions到大於Kafka主題分區數量的值時,Spark將Kafka主題分區分離為更小的塊。
在數據傾斜、峰值負載和流落後的情況下,建議使用此選項。將這個值設置為大於默認值會導致在每個觸發器上初始化Kafka消費者。如果在連接到Kafka時使用SSL,這可能會影響性能。