當我們進入2022年,我們想花點時間來回顧Databricks和Apache Spark™在流媒體方麵取得的巨大進步!在2021年,工程團隊和開源貢獻者在三個目標下取得了一些進展:
- 降低延遲並改進有狀態流處理
- 改進Databricks和Spark結構化流工作負載的可觀察性
- 改進資源分配和可伸縮性
最終,這些目標背後的動機是使更多的團隊能夠在Databricks和Spark上運行流工作負載,使客戶更容易在Databricks上操作關鍵任務的生產流應用程序,同時優化成本效益和資源使用。beplay体育app下载地址
目標# 1:降低延遲和改進有狀態處理
有兩個新的關鍵特性專門針對降低有狀態操作的延遲,以及對有狀態api的改進。第一種方法是針對大型有狀態操作的異步檢查點,它改進了傳統的同步和高延遲設計。
異步的檢查點
在此模型中,在下一個微批處理開始之前,將狀態更新寫入雲存儲檢查點位置。這樣做的好處是,如果有狀態流查詢失敗,我們可以使用來自最後一個成功完成的批處理的信息輕鬆地重新啟動查詢。在異步模型中,下一個微批處理不必等待狀態更新被寫入,從而提高了整個微批處理執行的端到端延遲。
您可以在即將發布的深入博客文章中了解有關此特性的更多信息,並在Databricks Runtime 10.3及更高版本中嚐試使用它。
任意有狀態運算符改進
在很大程度上之前的帖子,我們通過[flat]MapGroupsWithState引入了結構化流中的任意狀態處理。這些操作符提供了很大的靈活性,並支持聚合以外的更高級的有狀態操作。我們對這些操作符進行了改進:
- 允許初始狀態,避免需要重新處理所有流數據。
- 通過公開一個新的TestGroupState接口,允許用戶創建GroupState實例並訪問已設置的內部值,從而簡化狀態轉換函數的單元測試,從而實現更簡單的邏輯測試。
允許初始狀態
讓我們從以下flatMapGroupswithState操作符開始:
def flatMapGroupsWithState[S: Encoder, U: Encoder](outputMode: outputMode, timeoutConf: GroupStateTimeout, initialState: KeyValueGroupedDataset[K, S])(func: (K, Iterator[V], GroupState[S]) => Iterator[U])
這個自定義狀態函數維護已遇到水果的運行計數。
val fruitCountFunc =(key: String, values: Iterator[String], state: GroupState[RunningCount]) => {val count = state. getoption .map(_.count). getorelse (0L) + valList. map(0L))。大小的狀態。update(new RunningCount(count))迭代器((key, count. tostring))}
在這個例子中,我們通過為某些水果設置起始值來指定this操作符的初始狀態:
val fruitCountInitialDS: Dataset[(String, RunningCount)] = Seq(("apple", new RunningCount(1)), ("orange", new RunningCount(2)), ("mango", new RunningCount(5)),). tods () val fruitCountInitial = initialState。groupByKey(x => x._1). mapvalues (_._2) frustream .groupByKey(x => x) . flatmapgroupswithstate (Update, GroupStateTimeout. _1). groupByKey(x => x._1). mapvalues (_._2)NoTimeout fruitCountInitial) (fruitCountFunc)
更簡單的邏輯測試
您現在還可以使用TestGroupState API測試狀態更新。
進口org.apache.spark.sql.streaming。_ import org.apache.spark.api.java.可選test("flatMapGroupsWithState的狀態更新功能"){var prevState = TestGroupState。create[UserStatus](optionalState =可選。empty[UserStatus], timeoutConf = GroupStateTimeout。EventTimeTimeout, batchProcessingTimeMs = 1L, eventTimeWatermarkMs = Optional.of(1L), hasTimedOut = false) val userId: String =…val actions:迭代器[UserAction] =…斷言(!prevState. hasupdated) updateState(userId, actions, prevState)斷言(prevState. hasupdated)}
你可以找到這些,以及更多的例子磚的文檔.
會話窗口的本機支持
結構化流介紹的能力基於事件時間的Windows上的聚合使用翻轉或滑動窗口,兩者都是固定長度的窗口。在Spark 3.2中,我們引入了的概念會話窗口,允許動態窗口長度。這在曆史上需要使用flatMapGroupsWithState自定義狀態操作符。
一個使用動態間隙的例子:
#根據eventType session_window expr = session_window(events. window)定義具有動態間隔時間的會話窗口。時間戳,\ when(事件。eventType == "type1", "5 seconds") \ .when(事件。eventType == "type2", "20秒")\ . else("5分鍾"))#按會話窗口和userId分組數據,並計算每個組的計數windowedCountsDF = events \ . withwatermark ("timestamp", "10分鍾")\ . groupby (events. eventType == "type2", "20秒")\ . groupby (events. withwatermark ("timestamp", "10分鍾")userID, session_window_expr) \ .count()
目標#2:提高流工作負載的可觀察性
而StreamingQueryListenerAPI允許您異步監視SparkSession中的查詢,並為查詢狀態、進度和終止事件定義自定義回調函數,了解反壓力並推理微批處理中的瓶頸位置仍然具有挑戰性。從Databricks Runtime 8.1開始,StreamingQueryProgress對象報告數據源特定的背壓指標卡夫卡,運動,三角洲湖而且自動加載程序流源。
Kafka提供的指標示例:
{"sources": [{"description": "KafkaV2[Subscribe[topic]]]", "metrics": {"avgOffsetsBehindLatest": "4.0", "maxOffsetsBehindLatest": "4", "minOffsetsBehindLatest": "4", "estimatedTotalBytesBehindLatest": "80.0"},}]}
Databricks Runtime 8.3引入了實時度量來幫助理解數據庫的性能RocksDB狀態存儲並調試狀態操作的性能。這些也有幫助確定目標工作負載對於異步檢查點。
一個新的狀態存儲度量的例子:
{" id ": " 6774075 e - 8869 - 454 b - ad51 - 513 be86cfd43”、“runId”:“3 d08104d-d1d4-4d1a-b21e-0b2e1fb871c5”、“batchId”:7,“stateOperators”:[{“numRowsTotal”:20000000,“numRowsUpdated”:20000000,“memoryUsedBytes”:31005397,“numRowsDroppedByWatermark”:0,“customMetrics”:{“rocksdbBytesCopied”:141037747,“rocksdbCommitCheckpointLatency”:2,“rocksdbCommitCompactLatency”:22061年,“rocksdbCommitFileSyncLatencyMs”:1710年,“rocksdbCommitFlushLatency”:19032年,“rocksdbCommitPauseLatency”:0, "rocksdbCommitWriteBatchLatency": 56155, " rocksdbfilescopies ": 2, " rocksdbfilesreuse ": 0, "rocksdbGetCount": 40000000, "rocksdbGetLatency": 21834, "rocksdbPutCount": 1, "rocksdbPutLatency": 56155599000, "rocksdbReadBlockCacheHitCount": 1988, "rocksdbReadBlockCacheMissCount": 40341617, "rocksdbSstFileSize": 141037747, "rocksdbTotalBytesReadByCompaction": 336853375, "rocksdbTotalBytesReadByGet": 680000000, "rocksdbTotalBytesReadByGet": 680000000, "rocksdbTotalBytesReadThroughIterator":0, "rocksdbTotalBytesWrittenByCompaction": 141037747, "rocksdbTotalBytesWrittenByPut": 740000012, "rocksdbTotalCompactionLatencyMs": 21949695000, "rocksdbWriterStallLatencyMs": 0, "rocksdbZipFileBytesUncompressed": 7038}}], "sources": [{}], "sink": {}}
目標# 3:改進資源分配和可伸縮性
使用Delta Live表(DLT)流式自動縮放
在去年的數據+人工智能峰會上,我們宣布Delta活動表,這是一個允許您以聲明方式構建和編排數據管道的框架,並在很大程度上抽象了配置集群和節點類型的需求。我們更進一步,為流媒體管道引入了一種智能自動伸縮解決方案,改進了現有的流媒體管道優化的自動縮放.這些好處包括:
- 更好的集群利用率:
- 主動優雅停機:
新算法利用新的背壓指標來調整集群大小,以更好地處理流工作負載波動的場景,這最終會導致更好的集群利用率。
現有的自動伸縮解決方案僅在節點空閑時才會讓它們退役,而新的DLT自動縮放程序將在利用率低時主動關閉選定的節點,同時保證不會因關閉而導致任務失敗。
在撰寫本文時,此功能目前正在進行私人預覽.請聯係您的客戶團隊了解更多信息。
觸發器。AvailableNow
在結構化流中,觸發器允許用戶定義流查詢數據處理的時間。這些觸發類型可以是微批處理(默認)、固定間隔微批處理(觸發器。ProcessingTime(“
Databricks運行時10.1介紹了一種新型觸發器;觸發器。AvailableNow,類似於Trigger。但是提供了更好的可伸縮性。像Trigger Once一樣,所有可用的數據都將在查詢停止之前被處理,但是是多個批次而不是一個批次。支持Delta Lake和Auto Loader流媒體源。
例子:
火花。readStream .format("delta") .option("maxFilesPerTrigger", "1") .load(inputDir) .writeStream .trigger(Trigger.AvailableNow) .option("checkpointLocation", checkpointDir) .start()
總結
隨著我們進入2022年,我們將繼續加速結構化流媒體的創新,進一步提高性能,減少延遲,並實現新的令人興奮的功能。請繼續關注今年更多的信息!