異步跟蹤進展是什麼?
預覽
這個特性是在公共預覽。
異步進展跟蹤允許結構化流管道並行檢查點進程異步和實際數據處理在micro-batch,減少延遲與維護offsetLog
和commitLog
。
請注意
異步進展跟蹤不工作Trigger.once
或Trigger.availableNow
觸發器。試圖啟用這個特性與這些觸發結果查詢失敗。
異步跟蹤工作進展如何減少延遲?
結構化流依賴持久化和管理補償作為查詢處理進度指標。抵消管理操作直接影響處理延遲,因為沒有數據處理可能發生,直到完成這些操作。異步跟蹤進展使得結構化流管道檢查點過程不受這些的影響抵消管理操作。
當你應該配置檢查點頻率?
用戶可以配置進程檢查點的頻率。的默認設置檢查點頻率為大多數查詢提供良好的吞吐量。配置頻率有助於場景抵消管理操作發生在更高的速度比他們可以處理,這創造了一個不斷增加的抵消管理操作。阻止這一日益嚴重的積壓,數據處理是阻止或減緩,基本上恢複處理行為消除異步跟蹤進展所帶來的好處。
請注意
故障恢複時間檢查點間隔時間增加而增加。在失敗的情況下,管道必須再加工前成功的檢查點之前的所有數據。用戶可以認為這之間的權衡更低的延遲在正常處理和恢複時間的失敗。
與異步相關進展跟蹤是什麼配置的?
選項 |
價值 |
默認的 |
描述 |
---|---|---|---|
asyncProgressTrackingEnabled |
真/假 |
假 |
啟用或禁用異步跟蹤進展 |
asyncProgressTrackingCheckpointIntervalMs |
毫秒 |
1000年 |
我們承諾的時間間隔偏移和完成提交 |
用戶可以啟用異步跟蹤進展如何?
用戶可以使用代碼啟用該特性類似於下麵的代碼:
瓦爾流=火花。readStream。格式(“卡夫卡”)。選項(“kafka.bootstrap.servers”,“host1:端口1,host2:端口2”)。選項(“訂閱”,“在”)。負載()瓦爾查詢=流。writeStream。格式(“卡夫卡”)。選項(“主題”,“出去”)。選項(“checkpointLocation”,“/ tmp /檢查站”)。選項(“asyncProgressTrackingEnabled”,“真正的”)。開始()
關掉異步跟蹤進展
啟用異步跟蹤進展時,框架不每批檢查點進程。為了解決這個問題,在你禁用異步進展跟蹤、過程至少兩個micro-batches使用以下設置:
.option (“asyncProgressTrackingEnabled”,“真正的”)
.option (“asyncProgressTrackingCheckpointIntervalMs”,0)
停止查詢後至少兩個micro-batches完成處理。現在您可以安全地禁用異步查詢進度跟蹤和重新啟動。
如果你有禁用異步跟蹤進展沒有完成這一步,您可能會遇到以下錯誤:
java。朗。IllegalStateException:批處理x並不存在
在司機日誌,您可能會看到下麵的錯誤:
的抵消日誌為批處理x並不存在,需要重新啟動查詢從最新一批x抵消日誌。請確保有兩個後續抵消日誌可用的最新一批通過手動刪除抵消文件(s)。也請確保提交日誌的最新一批等於或比最新一批一批的抵消日誌。
指令後,在本節中禁用異步進展跟蹤允許您處理這些錯誤和修複您的流媒體工作負載。