跳轉到主要內容
工程的博客

加速流查詢與異步狀態檢查點

2022年5月2日 工程的博客

分享這篇文章

背景/動機

有狀態的流媒體變得越來越盛行了利益相關者使日益複雜的需求更大量的數據。然而,權衡是有狀態操作的計算複雜度增加數據延遲,使它更具有可操作性。檢查點異步狀態,將持久化狀態的過程與常規micro-batch檢查點,提供了一種方法來減少處理延遲,同時保持結構化流的兩個特點:高吞吐量和可靠性。

在進入細節之前,最好提供一些背景和動機,為什麼我們開發這個流處理功能。行業共識關於流媒體性能的主要指標是絕對需要延遲一個管道處理單個記錄。然而,我們想提出一個更微妙的觀點評估整體性能:而不是考慮的端到端延時單個記錄,重要的是要看吞吐量和延遲的組合在一段時間內,以可靠的方式。這並不是說,某些操作用例不需要最低限度絕對延遲——這些都是有效的和重要的。然而,它是更好的用例分析和ETL處理200000條記錄/秒或記錄20米/分鍾?它總是依賴於用例,但我們相信,體積和成本效益是流管道的速度一樣重要。有基本效率之間的權衡和支持在流引擎的實現非常低的延遲,所以我們鼓勵我們的客戶通過行使確定增量成本價值邊際降低數據延遲。beplay体育app下载地址

結構化流micro-batch執行模型之間尋求平衡的高吞吐量、可靠性和數據延遲。

高吞吐量

考慮流從概念上講,所有傳入的數據被認為是無限的,無論體積和速度。將這一概念應用於結構化流,我們可以認為每個查詢生成一個無界的dataframe。下罩,Apache火花™分割的數據作為一個無限dataframe成更小的micro-batches也dataframes。這是很重要的,有兩個原因:

  • 它允許發動機應用相同的優化可用於批處理/特別查詢每個dataframes,最大限度地提高效率和吞吐量
  • 它給用戶相同的簡單的界麵和容錯批/特別查詢

可靠性

在可靠性方麵,結構化流寫出每個micro-batch檢查站後,追蹤進展的加工從數據源、中間狀態(聚合和連接),水槽和寫入數據。在發生故障或重新啟動,發動機使用這些信息來確保查詢隻處理數據完全一次。結構化流存儲這些檢查點在某種類型的持久存儲(例如,雲blob存儲),以確保正確查詢失敗後恢複。為有狀態查詢,檢查點包括寫出所涉及的所有鍵的狀態有狀態操作,確保查詢重啟用適當的值。

數據延遲

隨著數據量的增加,鑰匙的數量和大小的增加,維護的狀態進行狀態管理更加重要和費時。為了進一步減少數據延遲狀態查詢,我們已經開發了異步檢查點專門為各種鍵的狀態參與有狀態操作。通過將這種從正常的檢查點過程變成一個後台線程,我們允許查詢繼續下一個micro-batch更快,使數據提供給最終用戶,同時仍然保持可靠性。

它是如何工作的

一般來說,結構化流利用同步檢查點狀態,這意味著發動機寫出所有參與狀態鍵操作的當前狀態正常的一部分為每個micro-batch檢查點之前下一個。這種方法的好處是,如果流查詢失敗,應用程序可以迅速恢複的進展流和隻需要處理文檔從micro-batch失敗。快速恢複的代價是增加了正常micro-batch執行時間。

結構化流利用同步檢查點,寫出所有參與狀態鍵操作的當前狀態之前,下一個

異步狀態檢查點分離的檢查點狀態從正常micro-batch執行。啟用該功能後,結構化流不需要等待檢查點完成當前micro-batch之前下一個——它後立即開始。執行人發送異步提交的狀態回到了司機,他們都完成後,司機是micro-batch完全投入。截至目前,特性允許長達一個micro-batch等待檢查點完成。較低的數據延遲的權衡,失敗,查詢可能需要處理文檔兩個micro-batches給相同的容錯擔保:當前micro-batch進行計算和前micro-batch檢查點是在過程的狀態。

異步狀態檢查點分離的檢查點狀態從正常micro-batch執行. .

一個隱喻解釋這是塑造麵團放在一個麵包店。麵包師通常用雙手塑造一個麵團,慢,但是如果他們犯了錯,他們隻需要開始在一塊。有些麵包師決定形狀兩塊麵團,從而增加他們的吞吐量,但潛在的錯誤可能需要重建兩部分。在這個例子中,同步處理是使用兩隻手塑造一個麵團,異步處理是用兩隻手形狀單獨的塊。

查詢瓶頸狀態更新、異步檢查點狀態提供了一種低成本的方法來降低數據延遲不犧牲任何的可靠性。

識別候選查詢

我們想重申,異步狀態檢查點隻有助於某些工作負載:有狀態流狀態的檢查點提交延遲是一個重要的因素總體micro-batch執行延遲。

用戶如何識別優秀的候選人:

  • 有狀態操作:查詢包括有狀態操作,比如windows,聚合(平)mapGroupsWithState或stream-stream連接。
  • 州檢查點提交延遲:用戶可以從內部檢查指標StreamingQueryListener事件理解提交延遲的影響對整體micro-batch執行時間。log4j日誌的司機也包含相同的信息。

參見下麵的例子如何分析StreamingQueryListener活動適合查詢:

流媒體查詢取得了進展:{“id”:“2 e3495a2-de2c-4a6a-9a8e-f6d4c4796f19”,“runId”:“e36e9d7e-d2b1-4a43-b0b3-e875e767e1fe”,“batchId”:0,“durationMs”:{“addBatch”:519387年,“triggerExecution”:547730年,},“stateOperators”:[{“commitTimeMs”:3186626,“numShufflePartitions”:64年,})}

有很多豐富的信息在上麵的示例中,但是用戶應該關注某些指標:

  • 批處理時間(durationMs.triggerExecution)是547秒左右
  • 聚合狀態存儲所有任務(提交時stateOperators [0] .commitTimeMs)是3186秒左右
  • 任務相關的狀態存儲(stateOperators [0] .numShufflePartitions)是64,這意味著平均每個任務包含狀態操作符添加50秒的掛鍾時間(3186秒/ 64任務)每一批。假設所有64個任務同時運行,提交步驟占9%左右(50秒/ 547秒)的批處理時間。如果並發任務的最大數量小於64年,可能會增加的百分比。例如,如果有32個並發任務,那麼它會占總執行時間的18%

啟用異步狀態檢查點

提供一個集群與磚運行時10.4或更新,並使用以下的火花配置:

spark.conf。(“spark.databricks.streaming.statefulOperator.asyncCheckpoint.enabled”,“真正的”)spark.conf。(“spark.sql.streaming.stateStore.providerClass”,“com.databricks.sql.streaming.state.RocksDBStateStoreProvider”)

幾項注意事項:

  • 異步狀態檢查點隻支持RocksDB-based狀態存儲
  • 任何失敗相關存儲異步狀態檢查點會導致查詢失敗後一個預定義的重試次數。這種行為不同於同步檢查點(執行的任務),火花有能力之前多次重試失敗的任務沒有一個查詢

通過測試的組合內部和客戶工作負載文件和消息總線資源,我們發現平均micro-batch持續時間可以提高25%的溪流大型國有大小與數以百萬計的條目。據坊間傳言,我們看到更大的改善micro-batch峰值持續時間(最長時間流過程micro-batch)。

結論

異步狀態檢查點並不是一個功能我們孤立地發展——它的未來係列我們發布的新特性,簡化狀態流查詢的操作和維護。我們持續進行大的投資流能力和高度關注方便我們的客戶提供更多的數據,更迅速,他們的最終用戶。beplay体育app下载地址請繼續關注!

免費試著磚

相關的帖子

看到所有工程的博客的帖子
Baidu
map