優化結構化流應用程序中的增量接收器

在foreachBatch運行時,通過在batchId上使用mod值來優化Delta接收器。

寫的mathan.pillai

最後發布日期:2022年5月10日

您正在使用Delta表作為結構化流應用程序的接收器,並且您希望優化Delta表以使查詢更快。

如果結構化流應用程序的觸發間隔非常頻繁,那麼它可能無法在每個微批處理中創建足夠的文件來進行壓縮。

autoOptimize操作壓縮為128 MB的文件。顯式優化操作將Delta Lake文件壓縮為1 GB的文件。

如果每個微批處理中沒有足夠數量的合格文件,則應該定期優化Delta表文件。

使用foreachBatch使用mod值

在結構化流應用程序中定期優化增量表接收器的最簡單方法之一是使用foreachBatch微批上的mod值batchId

假設您有一個從Delta表創建的流DataFrame。你使用foreachBatch當將流數據幀寫入Delta接收器時。

foreachBatch的mod值batchId使用,因此優化每10個微批次後運行一次操作zorder每101個微批次後運行一次操作。

%scala val df = spark. readstream .format("delta").table("") df. writestream .format("delta"). foreachbatch {(batchDF: DataFrame, batchId: Long) => batchDF.persist() if(batchId % 10 == 0){火花。sql("optimize ")} if(batchId % 101 == 0){spark. sql("optimize ")}sql("optimize  zorder by (< zander -column-name>)")} batchDF.write.format("delta").mode("append").saveAsTable("")}.outputMode("update") .start()

您可以根據您的結構化流應用程序修改mod值。