問題
您正在運行一係列結構化流作業並寫入文件接收器。每運行10次,運行速度就會比前一次作業慢。
導致
文件接收器創建一個_spark_metadata目標路徑下的文件夾。這個元數據文件夾存儲關於每個批處理的信息,包括哪些文件是批處理的一部分。這是為文件接收流提供精確一次保證所必需的。默認情況下,在每10個批處理時,前9個批處理數據文件被壓縮為一個文件/ <目標文件夾> /數據/ _spark_metadata / 9.緊湊.
解決方案
有三種可能的解決方案。選擇一個最適合你的情況。
- 選項1:在生產環境中通過最小的代碼更改緩解問題,但保留較少的元數據。
- 選項2:如果可以切換到使用Delta表,則推薦使用。這是一個很好的長期解決方案。
- 選項3:如果管道不需要恰好一次語義,或者下遊可以處理重複,則推薦使用。
選項1:縮短元數據保留時間
默認情況下,元數據文件夾會隨著時間的推移而增大。為了緩解這個問題,您可以為輸出文件設置最大保留時間。超過保存期的文件將被自動排除,從而限製元數據文件夾中的文件數量。元數據文件夾中的文件更少,意味著壓縮所需的時間更少。
當你寫入流數據幀到你的文件接收器時設置保留期:
%python check_point = '' target_path = ' ' retention = ' ' #您可以以小時或天的字符串格式提供時間的值。例如,“12h”,“7d”等。默認情況下禁用df.writeStream.format('json').mode('append')。選項(“checkPointLocation”,check_point)。選項(“路徑”,目標路徑)。選項(“保留”,保留).start ()
選項2:使用Delta表作為接收器
Delta表不使用aspark_metadata文件夾,它們隻提供一次語義。
有關更多信息,請參閱關於將Delta表用作接收器的文檔(AWS|Azure|GCP).
選項3:使用foreachBatch
foreachBatch不創建spark_metadata文件夾時寫入水槽。