問題
你有一個流的工作使用foreachBatch ()處理DataFrames。
% scala streamingDF.writeStream.outputMode(“追加”)。foreachBatch {(batchDF: DataFrame batchId:長)= > batchDF.write.format .mode(“鋪”)(“覆蓋”).save (output_directory)} .start ()
檢查點文件被創建,但並沒有被刪除。
您可以驗證問題,方法是導航到根目錄並查看/ local_disk0 / tmp /文件夾中。檢查點文件保留在文件夾中。
導致
命令foreachBatch ()用於支持DataFrame操作通常不支持流媒體DataFrames。通過使用foreachBatch ()可以將這些操作應用到每個micro-batch。這需要一個檢查站目錄跟蹤流的更新。
如果你沒有指定一個自定義的檢查點位置,默認的檢查點在創建目錄/ local_disk0 / tmp /。
磚使用檢查點目錄來確保正確的和一致的進展信息。關閉流時,故意或偶然,檢查點目錄允許磚重啟和接究竟在什麼地方。
如果流被取消關閉流從筆記本,磚的工作試圖清理力所能及的檢查點目錄。如果流以其他方式終止,或者終止工作,檢查點目錄沒有清理幹淨。
這是設計。
解決方案
你應該手動指定檢查點的目錄checkpointLocation選擇。
% scala streamingDF.writeStream.option (“checkpointLocation”、“< checkpoint-path >”) .outputMode(“追加”)。foreachBatch {(batchDF: DataFrame batchId:長)= > batchDF.write.format .mode(“鋪”)(“覆蓋”).save (output_directory)} .start ()