優化與.trigger流交易

使用.trigger定義存儲更新間隔。更高的價值降低了存儲事務的數量。

寫的chetan.kardekar

去年發表在:2022年10月26日

結構化的流媒體應用程序運行時使用雲存儲桶(ADLS Gen2 S3,等等)很容易造成過度交易你訪問的存儲桶。

沒有指定一個.trigger選項在流代碼中一個常見的原因是大量的存儲事務。當一個.trigger沒有指定選項,存儲經常可以調查。發生這種情況後立即在默認情況下每個micro-batch完成。

默認行為是在官方的描述Apache文檔觸發火花,“如果沒有顯式地指定觸發器設置,那麼在默認情況下,查詢將在micro-batch模式下,執行,盡快將生成micro-batches前麵micro-batch已完成處理”。

沒有這個樣例代碼.trigger選項定義。如果運行,它將導致過度存儲事務。

% python spark.readStream.format(“δ”).load (“< delta_table_path >”) .writeStream .format .outputMode(“δ”)(“追加”).option (“checkpointLocation”、“< checkpoint_path >”) .options (* * writeConfig) .start ()


你可以減少存儲事務通過設置。觸發選項.writeStream。設置.trigger處理時間幾秒鍾防止短輪詢。

指令

默認行為是檢查每10毫秒更新的源代碼。對於大多數用戶來說,一個更長的時間間隔源更新將對性能沒有明顯的影響,但交易成本大大降低。

例如,我們用一個5秒的處理時間。這比10 ms慢500倍。存儲調用相應減少。

設置一個5秒的處理時間需要添加.trigger (processingTime = 5秒).writeStream

例如,修改現有包括示例代碼.trigger5秒的處理時間隻需要添加一行。

% python spark.readStream.format(“δ”).load (“< delta_table_path >”) .writeStream .format(“δ”).trigger (processingTime = 5秒)#添加行代碼定義.trigger處理時間。.outputMode .option(“追加”)(“checkpointLocation”、“< checkpoint_path >”) .options (* * writeConfig) .start ()


你應該嚐試.trigger處理時間來確定一個值,為您的應用程序進行了優化。

這篇文章有用嗎?