最佳實踐:Delta Lake結構化流媒體與Amazon Kinesis

本文描述了在Delta Lake和Apache Spark結構化流媒體中使用Kinesis作為流媒體源的最佳實踐。

亞馬遜動態數據流(KDS)是一個大規模可擴展和持久的實時數據流服務。KDS每秒從成千上萬的來源(如網站點擊流、數據庫事件流、金融交易、社交媒體feed、IT日誌和位置跟蹤事件)中持續捕獲千兆字節的數據。

KDS是AWS上流媒體數據服務的熱門選擇,因為它易於使用和無服務器設置。Kinesis數據流由單個吞吐量單元(稱為碎片)組成,並根據碎片小時和PUT有效負載單元進行計費。每個碎片的攝取容量估計為1000條記錄/秒,或1MB/秒,輸出速率為2MB/秒。

在KDS中收集數據後,您可以使用Apache Spark Structured Streaming與Delta Lake的深度集成,用於日誌分析、點擊流分析和實時指標等應用程序。您可以連續處理數據並將其存儲到Delta表中。下麵的圖表描述了這些用例的典型架構:

Kinesis Delta架構圖

Databricks動態結構化流媒體源

Databricks運行時包含一個開箱即用的程序運動的來源.這是一個用於KDS的專有連接器,在開放源碼中沒有。這個連接器不是基於Kinesis客戶端庫(KCL)。Kinesis源架構如圖所示:

運動源架構圖

關鍵的技術考慮和最佳實踐

本節包括在Delta Lake中使用Kinesis的最佳實踐和故障排除信息。

優化預取

Kinesis source通過後台線程運行Spark job,定期預取Kinesis數據並緩存到Spark executers的內存中。流查詢在每個預取步驟完成後處理緩存的數據,並使數據可用於處理。預取步驟會顯著影響觀察到的端到端延遲和吞吐量。您可以通過本節介紹的選項控製性能。

的默認設置shardsPerTask配置參數為5。然而,在規模上,這可能需要非常多的CPU核,因此將其設置為10個可能是一個很好的起點。然後,根據流工作負載和數據量的複雜性,您可以根據集群的Ganglia指標(CPU、內存、網絡等)調整這個值。例如,cpu綁定的集群可能需要一個較小的值,但需要更多的核來補償。

為了優化最小的查詢延遲和最大的資源使用,使用以下計算:

總計數量CPU集群(在所有執行人)>=總計數量運動碎片/shardsPerTask

在這個表中描述了用於確定從Kinesis每次預取讀取數據量的參數。

選項

價值

默認的

描述

maxRecordsPerFetch

整數

10000

每條要取的記錄數getRecordsAPI調用。

shardFetchInterval

持續時間字符串(2m = 2分鍾)

1

在更新碎片列表之前需要等待多長時間(這是係統知道流已經調整大小的方式)。

minFetchPeriod

持續時間字符串

400毫秒

連續讀取嚐試之間的等待時間。這個設置有助於避免運動節流。200ms是最小值,因為Kinesis的服務限製是5次/秒。

maxFetchRate

小數

1.0

每個分片最大預取速率,單位MB/秒。這個速率限製和避免運動節流。Kinesis允許的最大速率是2.0 MB/秒。

maxFetchDuration

持續時間字符串

十年代

在將預取的新數據用於處理之前,需要多長時間進行緩衝

fetchBufferSize

字節字符串

20 gb

為下一個觸發器緩衝多少數據。這是一個軟限製,因為它被用作停止條件,所以可能會緩衝更多的數據。

shardsPerTask

整數

5

每個任務並行預取多少個碎片。

避免過多的速率限製錯誤導致的減速

每當遇到速率限製錯誤時,連接器從Kinesis讀取的數據量就會減少一半,並將該事件記錄在日誌中:“打極限。睡覺5秒。”

當流被捕獲時,通常會看到這些錯誤,但在捕獲之後,您應該不會再看到這些錯誤了。如果您這樣做,您可能需要從Kinesis方麵進行調優(通過增加容量)或調整預取選項

過多的數據導致寫入磁盤

如果你在你的動態流中有一個突然的峰值,分配的緩衝區容量可能會被填滿,緩衝區沒有被清空足夠快的新數據添加。

在這種情況下,Spark會將數據塊從緩衝區溢出到磁盤,從而降低處理速度,影響流的性能。這個事件出現在日誌中,消息如下:

。/ log4j.txt: 879546:20/03/0217:15:04 INFO BlockManagerInfo: Updated kinesis_49290928_1_ef24cc00-abda-4acd-bb73-cb135aed175c on disk on10.0.208.13:43458電流大小:88.4 MB,原始大小:00 B

要解決這個問題,可以嚐試增加集群內存容量(增加更多節點或增加每個節點的內存),或者調整配置參數fetchBufferSize

開啟S3 VPC終端

為保證S3的流量通過AWS網絡路由,建議開啟S3 VPC的端點。

掛起S3寫任務

掛起任務會導致流批處理持續時間較長,這可能導致流無法跟上輸入。在這種情況下,Databricks建議啟用Spark推測。為了確保不會過於頻繁地終止任務,請仔細調優此設置的分位數和乘數。一個好的起點是設定spark.speculation.multiplier3.而且spark.speculation.quantile0.95

使用RocksDB管理狀態時,由於S3寫入速度較慢而導致的延遲問題

在流查詢中維護有狀態操作時,一個常見的場景是大型垃圾收集暫停,這反過來會引入延遲並導致延長批處理執行時間。這通常發生在維護數百萬處於狀態的密鑰時。在這種情況下,與其在JVM內存中維護狀態,不如考慮使用RocksDB作為本機內存或磁盤上的狀態存儲。狀態更改會自動傳播到結構化流檢查點。然而,當RocksDB將這些檢查點寫入S3時,由於潛在的S3節流,您可能會觀察到延遲。盡量減少spark.sql.shuffle.partitions(默認200)來最小化寫入的文件數量。您也可以嚐試調優多部分上傳閾值(spark.hadoop.fs.s3a.multipart.size,默認1048576000字節)來減少S3並發寫的數量。

監視流媒體應用程序

要監視流應用程序,Databricks建議使用Spark的流查詢監聽器實現。

可觀察指標被命名為任意的聚合函數,可以在查詢(DataFrame)上定義。一旦DataFrame的執行到達一個完成點(即完成批處理查詢或到達流epoch),就會觸發一個命名事件,其中包含自最後一個完成點以來處理的數據的度量。

您可以通過將偵聽器附加到Spark會話來觀察這些指標。監聽器依賴於執行模式:

  • 批處理模式:使用QueryExecutionListener

    QueryExecutionListener在查詢完成時調用。訪問指標QueryExecution.observedMetrics地圖。

  • 流,或者micro-batch:使用StreamingQueryListener

    StreamingQueryListener當流查詢完成一個紀元時調用。訪問指標StreamingQueryProgress.observedMetrics地圖。數據ricks不支持連續執行流。

例如:

//觀察流數據集中的行數(rc)和錯誤行數(erc)瓦爾observed_dsds觀察“my_event”點燃1))。作為“鋼筋混凝土”),“錯誤”).作為“倫理委員會”))observed_dswriteStream格式“…”).開始()//使用監聽器監視指標火花addListenerStreamingQueryListener()覆蓋defonQueryProgress事件QueryProgressEvent):單位事件進步observedMetrics得到“my_event”).foreach= >//當錯誤數超過5%時觸發瓦爾num_rows木屐) (“鋼筋混凝土”瓦爾num_error_rows木屐) (“倫理委員會”瓦爾num_error_rowstoDouble/num_rows如果>0.05/ /觸發警報})

您還可以通過UI監視指標。如果您正在使用Databricks Runtime 7.0或以上版本,使用流選項卡在火花UI

刪除並重新創建流

如果刪除然後重新創建流,則必須使用新的檢查點位置和目錄。

重新切分

結構化流支持重新分片。在這種情況下,增加碎片的數量就足夠了。您不需要切換流或創建臨時流來分流流量。

了解更多

亞馬遜運動