檢查點文件不刪除當使用foreachBatch ()

學習如何防止foreachBatch()從使用大量的存儲檢查點文件。

寫的亞當Pavlacka

去年發表在:2022年5月19日

問題

你有一個流的工作使用foreachBatch ()處理DataFrames。

% scala streamingDF.writeStream.outputMode(“追加”)。foreachBatch {(batchDF: DataFrame batchId:長)= > batchDF.write.format .mode(“鋪”)(“覆蓋”).save (output_directory)} .start ()

檢查點文件被創建,但並沒有被刪除。

您可以驗證問題,方法是導航到根目錄並查看/ local_disk0 / tmp /文件夾中。檢查點文件保留在文件夾中。

導致

命令foreachBatch ()用於支持DataFrame操作通常不支持流媒體DataFrames。通過使用foreachBatch ()可以將這些操作應用到每個micro-batch。這需要一個檢查站目錄跟蹤流的更新。

如果你沒有指定一個自定義的檢查點位置,默認的檢查點在創建目錄/ local_disk0 / tmp /

磚使用檢查點目錄來確保正確的和一致的進展信息。關閉流時,故意或偶然,檢查點目錄允許磚重啟和接究竟在什麼地方。

如果流被取消關閉流從筆記本,磚的工作試圖清理力所能及的檢查點目錄。如果流以其他方式終止,或者終止工作,檢查點目錄沒有清理幹淨。

這是設計。

解決方案

你應該手動指定檢查點的目錄checkpointLocation選擇。

% scala streamingDF.writeStream.option (“checkpointLocation”、“< checkpoint-path >”) .outputMode(“追加”)。foreachBatch {(batchDF: DataFrame batchId:長)= > batchDF.write.format .mode(“鋪”)(“覆蓋”).save (output_directory)} .start ()


這篇文章有用嗎?