最佳實踐:Delta Lake結構化流媒體與Amazon Kinesis
本文描述了在Delta Lake和Apache Spark結構化流媒體中使用Kinesis作為流媒體源的最佳實踐。
亞馬遜動態數據流(KDS)是一個大規模可擴展和持久的實時數據流服務。KDS每秒從成千上萬的來源(如網站點擊流、數據庫事件流、金融交易、社交媒體feed、IT日誌和位置跟蹤事件)中持續捕獲千兆字節的數據。
KDS是AWS上流媒體數據服務的熱門選擇,因為它易於使用和無服務器設置。Kinesis數據流由單個吞吐量單元(稱為碎片)組成,並根據碎片小時和PUT有效負載單元進行計費。每個碎片的攝取容量估計為1000條記錄/秒,或1MB/秒,輸出速率為2MB/秒。
在KDS中收集數據後,您可以使用Apache Spark Structured Streaming與Delta Lake的深度集成,用於日誌分析、點擊流分析和實時指標等應用程序。您可以連續處理數據並將其存儲到Delta表中。下麵的圖表描述了這些用例的典型架構:
Databricks動態結構化流媒體源
Databricks運行時包含一個開箱即用的程序運動的來源.這是一個用於KDS的專有連接器,在開放源碼中沒有。這個連接器不是基於Kinesis客戶端庫(KCL)。Kinesis源架構如圖所示:
關鍵的技術考慮和最佳實踐
本節包括在Delta Lake中使用Kinesis的最佳實踐和故障排除信息。
優化預取
Kinesis source通過後台線程運行Spark job,定期預取Kinesis數據並緩存到Spark executers的內存中。流查詢在每個預取步驟完成後處理緩存的數據,並使數據可用於處理。預取步驟會顯著影響觀察到的端到端延遲和吞吐量。您可以通過本節介紹的選項控製性能。
的默認設置shardsPerTask
配置參數為5。然而,在規模上,這可能需要非常多的CPU核,因此將其設置為10個可能是一個很好的起點。然後,根據流工作負載和數據量的複雜性,您可以根據集群的Ganglia指標(CPU、內存、網絡等)調整這個值。例如,cpu綁定的集群可能需要一個較小的值,但需要更多的核來補償。
為了優化最小的查詢延遲和最大的資源使用,使用以下計算:
總計數量的CPU核在的集群(在所有執行人)
>=總計數量的運動碎片
/shardsPerTask
.
在這個表中描述了用於確定從Kinesis每次預取讀取數據量的參數。
選項 |
價值 |
默認的 |
描述 |
---|---|---|---|
|
整數 |
10000 |
每條要取的記錄數 |
|
持續時間字符串(2m = 2分鍾) |
1 |
在更新碎片列表之前需要等待多長時間(這是係統知道流已經調整大小的方式)。 |
|
持續時間字符串 |
400毫秒 |
連續讀取嚐試之間的等待時間。這個設置有助於避免運動節流。200ms是最小值,因為Kinesis的服務限製是5次/秒。 |
|
小數 |
1.0 |
每個分片最大預取速率,單位MB/秒。這個速率限製和避免運動節流。Kinesis允許的最大速率是2.0 MB/秒。 |
|
持續時間字符串 |
十年代 |
在將預取的新數據用於處理之前,需要多長時間進行緩衝 |
|
字節字符串 |
20 gb |
為下一個觸發器緩衝多少數據。這是一個軟限製,因為它被用作停止條件,所以可能會緩衝更多的數據。 |
|
整數 |
5 |
每個任務並行預取多少個碎片。 |
避免過多的速率限製錯誤導致的減速
每當遇到速率限製錯誤時,連接器從Kinesis讀取的數據量就會減少一半,並將該事件記錄在日誌中:“打率極限。睡覺為5秒。”
當流被捕獲時,通常會看到這些錯誤,但在捕獲之後,您應該不會再看到這些錯誤了。如果您這樣做,您可能需要從Kinesis方麵進行調優(通過增加容量)或調整預取選項.
過多的數據導致寫入磁盤
如果你在你的動態流中有一個突然的峰值,分配的緩衝區容量可能會被填滿,緩衝區沒有被清空足夠快的新數據添加。
在這種情況下,Spark會將數據塊從緩衝區溢出到磁盤,從而降低處理速度,影響流的性能。這個事件出現在日誌中,消息如下:
。/ log4j.txt: 879546:20/03/0217:15:04 INFO BlockManagerInfo: Updated kinesis_49290928_1_ef24cc00-abda-4acd-bb73-cb135aed175c on disk on10.0.208.13:43458(電流大小:88.4 MB,原始大小:00 B)
要解決這個問題,可以嚐試增加集群內存容量(增加更多節點或增加每個節點的內存),或者調整配置參數fetchBufferSize
.
掛起S3寫任務
掛起任務會導致流批處理持續時間較長,這可能導致流無法跟上輸入。在這種情況下,Databricks建議啟用Spark推測。為了確保不會過於頻繁地終止任務,請仔細調優此設置的分位數和乘數。一個好的起點是設定spark.speculation.multiplier
來3.
而且spark.speculation.quantile
來0.95
.
使用RocksDB管理狀態時,由於S3寫入速度較慢而導致的延遲問題
在流查詢中維護有狀態操作時,一個常見的場景是大型垃圾收集暫停,這反過來會引入延遲並導致延長批處理執行時間。這通常發生在維護數百萬處於狀態的密鑰時。在這種情況下,與其在JVM內存中維護狀態,不如考慮使用RocksDB作為本機內存或磁盤上的狀態存儲。狀態更改會自動傳播到結構化流檢查點。然而,當RocksDB將這些檢查點寫入S3時,由於潛在的S3節流,您可能會觀察到延遲。盡量減少spark.sql.shuffle.partitions
(默認200)來最小化寫入的文件數量。您也可以嚐試調優多部分上傳閾值(spark.hadoop.fs.s3a.multipart.size
,默認1048576000字節)來減少S3並發寫的數量。
監視流媒體應用程序
要監視流應用程序,Databricks建議使用Spark的流查詢監聽器實現。
可觀察指標被命名為任意的聚合函數,可以在查詢(DataFrame)上定義。一旦DataFrame的執行到達一個完成點(即完成批處理查詢或到達流epoch),就會觸發一個命名事件,其中包含自最後一個完成點以來處理的數據的度量。
您可以通過將偵聽器附加到Spark會話來觀察這些指標。監聽器依賴於執行模式:
批處理模式:使用
QueryExecutionListener
.QueryExecutionListener
在查詢完成時調用。訪問指標QueryExecution.observedMetrics
地圖。流,或者micro-batch:使用
StreamingQueryListener
.StreamingQueryListener
當流查詢完成一個紀元時調用。訪問指標StreamingQueryProgress.observedMetrics
地圖。數據ricks不支持連續執行流。
例如:
//觀察流數據集中的行數(rc)和錯誤行數(erc)瓦爾observed_ds=ds.觀察(“my_event”,數(點燃(1))。作為(“鋼筋混凝土”),數($“錯誤”).作為(“倫理委員會”))observed_ds.writeStream.格式(“…”).開始()//使用監聽器監視指標火花.流.addListener(新StreamingQueryListener(){覆蓋defonQueryProgress(事件:QueryProgressEvent):單位={事件.進步.observedMetrics.得到(“my_event”).foreach{行= >//當錯誤數超過5%時觸發瓦爾num_rows=行.木屐[長) (“鋼筋混凝土”)瓦爾num_error_rows=行.木屐[長) (“倫理委員會”)瓦爾比=num_error_rows.toDouble/num_rows如果(比>0.05){/ /觸發警報}}}})
您還可以通過UI監視指標。如果您正在使用Databricks Runtime 7.0或以上版本,使用流選項卡在火花UI.