您正在使用Delta表作為結構化流應用程序的接收器,並且您希望優化Delta表以使查詢更快。
如果結構化流應用程序的觸發間隔非常頻繁,那麼它可能無法在每個微批處理中創建足夠的文件來進行壓縮。
的autoOptimize操作壓縮為128 MB的文件。顯式優化操作將Delta Lake文件壓縮為1 GB的文件。
如果每個微批處理中沒有足夠數量的合格文件,則應該定期優化Delta表文件。
使用foreachBatch使用mod值
在結構化流應用程序中定期優化增量表接收器的最簡單方法之一是使用foreachBatch微批上的mod值batchId.
假設您有一個從Delta表創建的流DataFrame。你使用foreachBatch當將流數據幀寫入Delta接收器時。
在foreachBatch的mod值batchId使用,因此優化每10個微批次後運行一次操作zorder每101個微批次後運行一次操作。
%scala val df = spark. readstream .format("delta").table("") df. writestream .format("delta"). foreachbatch {(batchDF: DataFrame, batchId: Long) => batchDF.persist() if(batchId % 10 == 0){火花。sql("optimize ")} if(batchId % 101 == 0){spark. sql("optimize ")}sql("optimize zorder by (< zander -column-name>)")} batchDF.write.format("delta").mode("append").saveAsTable(" ")}.outputMode("update") .start()
您可以根據您的結構化流應用程序修改mod值。