使用Kinesis連接器的流式作業失敗

由於HTTP客戶端沒有被終止,寫入Kinesis接收器的流作業會因內存不足錯誤而失敗。

寫的阿施施

最後發布日期:2022年5月19日

問題

你有一個流作業寫入Kinesis接收器,它失敗了,出現內存不足的錯誤消息。

java.lang.OutOfMemoryError: GC開銷限製超過
Java .lang. outofmemoryerror: Java堆空間。

症狀包括:

  • Ganglia顯示JVM內存使用量逐漸增加。
  • 微批量分析顯示輸入和處理速率是一致的,這意味著來源或處理過程沒有問題。
  • 堆轉儲顯示Java哈希映射在JVM堆上占據很大的空間,並隨著時間的推移而增加。

導致

出現此錯誤最常見的原因是關閉Kinesis客戶端,但沒有關閉HTTP客戶端。

例如,通常會為每個分區創建一個新的Kinesis客戶端。

%scala類KinesisSink extends ForeachWriter[SinkInput] {private var kinesisClient: kinesisClient = _ override def open(partitionId: Long, version: Long): Boolean = {val httpClient = ApacheHttpClient .builder() .build() kinesisClient = kinesisClient .builder() .region(region .of(region)) .httpClient(httpClient) .build() true}覆蓋def process(值:KinesisSinkInput): Unit ={//主進程在這裏。} override def close(errorOrNull: Throwable): Unit = {kinesisClient.close()}}

這個示例代碼正在調用kinesisClient.close ()但它並沒有在呼喚httpClient.close ()

這意味著正在創建HTTP客戶端,並使用資源打開TCP連接,但沒有被終止。

解決方案

確保在不再需要HTTP客戶端時關閉它們。

%scala override def close(errorOrNull: Throwable): Unit = {client.close() httpClient.close()}


這篇文章有用嗎?