異步狀態檢查點結構化流

請注意

在磚運行時10.3及以上。

狀態流查詢瓶頸狀態更新,使異步狀態檢查點可以降低端到端延遲不犧牲任何容錯擔保,但與未成年人的成本更高的啟動延遲。

結構化流默認使用同步檢查點。每個micro-batch確保批處理的所有狀態更新備份在雲存儲(稱為“檢查點位置”)開始前下一批。如果有狀態流查詢失敗,所有micro-batches除了最後micro-batch檢查點。重新啟動,隻有最後一批需要重新運行。快速恢複與同步檢查點付出更高的成本為每個micro-batch延遲。

流檢查點狀態模式

異步狀態檢查點試圖執行異步執行檢查點,以便micro-batch不必等待檢查點完成。換句話說,接下來micro-batch就可以開始計算之前的micro-batch已經完成。然而,內部抵消元數據(也保存在檢查點位置)跟蹤狀態是否為micro-batch檢查點已經完成。對查詢重啟,多個micro-batch可能需要重新執行,最後micro-batch的計算是不完整的,以及一個micro-batch檢查點之前的狀態是不完整的。得到相同的容錯擔保(即隻有一次擔保與冪等沉)的同步檢查點。

確定結構化流負載受益於異步檢查點

以下是流媒體工作特點,可能受益於異步狀態檢查點。

  • 工作有一個或多個有狀態操作(如聚合,flatMapGroupsWithState,mapGroupsWithStatestream-stream連接)

  • 檢查點狀態延遲的主要貢獻者之一總體執行批處理延遲。這些信息可以在找到StreamingQueryProgress事件。這些事件被發現在log4j日誌引發司機。下麵是一個示例流查詢的進步和如何找到狀態檢查點影響整個批處理執行延遲。

    • {“id”:“2 e3495a2-de2c-4a6a-9a8e-f6d4c4796f19”,“runId”:“e36e9d7e-d2b1-4a43-b0b3-e875e767e1fe”,“…”,“batchId”:0,“durationMs”:{“…”,“triggerExecution”:547730年,“…”},“stateOperators”:({“…”,“commitTimeMs”:3186626,“numShufflePartitions”:64年,“…”})}
    • 檢查點狀態延遲分析以上查詢事件進展

      • 批處理時間(durationMs.triggerDuration)是547秒左右。

      • 狀態存儲提交延遲(stateOperations [0] .commitTimeMs)是3186秒左右。提交延遲聚合在任務包含存儲狀態。在這種情況下,有64個這樣的任務(stateOperators [0] .numShufflePartitions)。

      • 每個任務包含國家運營商平均需要50秒(3186/64)檢查站。這是一個額外的延遲,導致了批處理時間。假設所有64個任務並行運行,檢查點一步貢獻了約9%(50秒/ 547秒)的批處理時間。最大並發任務時更高比例小於64。

啟用異步狀態檢查點

設置以下配置流媒體的工作。異步檢查點需要一個狀態存儲實現支持異步提交。目前隻有基於RocksDB狀態存儲實現支持它。

火花相依(“spark.databricks.streaming.statefulOperator.asyncCheckpoint.enabled”,“真正的”)火花相依(“spark.sql.streaming.stateStore.providerClass”,“com.databricks.sql.streaming.state.RocksDBStateStoreProvider”)

限製和要求異步檢查點

請注意

計算伸縮擴展限製了集群大小結構化流工作負載。磚建議使用三角洲表與增強的自動定量直播工作負載。看到增強的自動定量是多少?

  • 任何故障在異步檢查站在任何一個或多個存儲查詢失敗。在同步檢查點模式下,執行檢查點的一部分任務,引發多次重試任務之前失敗的查詢。這種機製不存在與異步狀態檢查點。然而,使用磚工作重試,這種失敗可以自動重試。

  • 異步檢查點時效果最好狀態存儲位置不改變micro-batch之間執行。集群調整,結合異步狀態檢查點,可能不適合,因為狀態存儲實例可能會re-distributed隨著節點的增加或刪除集群調整事件的一部分。

  • 異步狀態隻支持檢查點RocksDB狀態存儲提供程序實現。默認的內存狀態存儲實現不支持它。