亞馬遜運動

用於結構化流的Kinesis連接器包含在Databricks Runtime中。

使用Amazon Kinesis進行認證

對於Kinesis的身份驗證,我們使用Amazon的默認憑證提供者鏈默認情況下。我們建議使用一個可以訪問Kinesis的實例配置文件啟動Databricks集群。如果希望使用密鑰進行訪問,可以使用選項提供密鑰awsAccessKey而且awsSecretKey

你也可以承擔IAM角色使用roleArn選擇。您可以選擇使用指定外部IDroleExternalId和會話名roleSessionName.為了承擔角色,您可以使用承擔角色的權限啟動集群,或者通過提供訪問密鑰awsAccessKey而且awsSecretKey.對於跨帳戶身份驗證,我們建議使用roleArn以持有所承擔的角色,然後可以通過Databricks AWS帳戶承擔該角色。有關跨帳戶身份驗證的詳細信息,請參見使用IAM角色跨AWS帳戶授權訪問

請注意

運動源要求ListShardsGetRecords,GetShardIterator權限。如果你遇到亞馬遜:訪問否認異常時,檢查您的用戶或配置文件是否具有這些權限。看到通過IAM控製對Amazon Kinesis數據流資源的訪問欲知詳情。

模式

記錄的模式是:

類型

partitionKey

字符串

數據

二進製

字符串

shardId

字符串

sequenceNumber

字符串

approximateArrivalTimestamp

時間戳

使用數據幀操作(鑄造(“字符串”), udfs)來顯式反序列化數據列。

快速入門

讓我們從一個簡單的例子開始:WordCount。下麵的筆記本演示了如何使用Kinesis的結構化流運行WordCount。

Kinesis WordCount與結構化流筆記本

在新標簽頁打開筆記本

配置

警告

由於Kinesis執行的速率限製和Kinesis API的限製,執行一次觸發器(Trigger.Once ())不被Kinesis支持。

下麵是用於指定要讀取的數據的最重要配置。

選項

價值

默認的

描述

streamName

以逗號分隔的流名列表。

無(必需參數)

要訂閱的流名稱。

地區

要指定的流的區域。

局部分辨區域

定義流的區域。

端點

Kinesis數據流的區域。

局部分辨區域

Kinesis數據流的區域端點。

initialPosition

Latest、trim_horizon、最早(trim_horizon的別名)、at_timestamp

最新的

從流中的哪裏開始讀取。

對於生產方麵的考慮,請回顧關鍵技術考慮因素和最佳實踐

在某個時間點開始閱讀

請注意

此特性在Databricks Runtime 7.3 LTS及以上版本上可用。

要在某個時間點開始閱讀,可以使用at_timestamp的值。initialPosition選擇。您可以將值指定為JSON字符串,例如{“at_timestamp”:“06/25/202010:23:45PDT "}.流查詢將讀取給定時間戳(包括)時或之後的所有更改。它使用Java默認格式解析時間戳。你可以通過在JSON字符串中提供一個額外的字段顯式地指定格式,例如:

火花readStream格式“運動”選項“streamName”kinesisStreamName選項“地區”kinesisRegion選項“initialPosition”'{"at_timestamp": "06/25/2020 10:23:45 PDT", "format": "MM/dd/yyyy HH: MM:ss ZZZ"}'選項“awsAccessKey”awsAccessKeyId選項“awsSecretKey”awsSecretKey負載()

此外,還有用於控製從Kinesis讀取的吞吐量和延遲的配置。Kinesis源在後台線程中運行Spark作業,定期預取Kinesis數據並將其緩存到Spark執行器的內存中。隻有在每個預取步驟完成後,流查詢才會處理緩存的數據,並使數據可用於處理。因此,這個預取步驟決定了很多觀察到的端到端延遲和吞吐量。您可以通過以下選項控製性能。

選項

價值

默認的

描述

maxRecordsPerFetch

一個正整數。

10000年

每個API請求要讀取多少記錄到Kinesis。返回的記錄數量實際上可能更高,這取決於是否使用Kinesis Producer Library將子記錄聚合為單個記錄。

maxFetchRate

正十進製,表示數據速率,單位為MB/s。

1.0(最大= 2.0)

每個分片預取數據的速度。這是為了限製取回的速率,避免運動節流。2.0 MB/s是Kinesis允許的最大速率。

minFetchPeriod

例如,一個持續時間字符串,1一秒鍾。

400ms (min = 200ms)

連續預取間隔時間。這是為了限製取回的頻率並避免Kinesis節流。200ms是最小值,因為kineesis最多允許5次/秒的抓取。

maxFetchDuration

例如,一個持續時間字符串,1米1分鍾。

十年代

在使預取的新數據可用於處理之前緩衝多長時間。

fetchBufferSize

例如,一個字節字符串,2 gb10 mb

20 gb

為下一次觸發器緩衝多少數據。該值用作停止條件,而不是嚴格的上限,因此可能會緩存比指定的值更多的數據。

shardsPerTask

一個正整數。

5

每個Spark任務並行預取多少個Kinesis分片。在理想的情況下集群> =運動碎片/shardsPerTask對於最小的查詢延遲和最大的資源使用。

shardFetchInterval

例如,一個持續時間字符串,2米2分鍾。

1

多久輪詢一次Kinesis進行重分片。

awsAccessKey

字符串

沒有違約。

AWS訪問密鑰。

awsSecretKey

字符串

沒有違約。

與接入密鑰對應的AWS秘密訪問密鑰。

roleArn

字符串

沒有違約。

訪問Kinesis時要承擔的角色的Amazon資源名(ARN)。

roleExternalId

字符串

沒有違約。

可選值,可在將訪問權限委托給AWS帳戶時使用。看到如何使用外部ID

roleSessionName

字符串

沒有違約。

假設角色會話的標識符,當不同主體或出於不同原因假設相同角色時,該標識符唯一標識會話。

coalesceThresholdBlockSize

一個正整數。

10000000年

自動合並發生的閾值。如果平均塊大小小於此值,則預取的塊將合並到coalesceBinSize

coalesceBinSize

一個正整數。

128000000年

合並後的大致塊大小。

請注意

選項的默認值已經被選擇,這樣兩個讀取器(Spark或其他)可以同時使用Kinesis流而不會達到Kinesis速率限製。如果你有更多的消費者,你必須相應地調整選項。例如,你可能不得不減少maxFetchRate,並增加minFetchPeriod

以下是針對特定用例的一些建議配置。

從Kinesis到S3的ETL

當您對長期存儲執行ETL時,您希望使用少量的大文件。在這種情況下,您可能需要設置一個較大的流觸發間隔,例如5-10分鍾。此外,你可能想要增加你的薪水maxFetchDuration這樣你就可以緩衝在處理過程中會被寫出來的大塊,並增加fetchBufferSize這樣你就不會在觸發器之間過早停止抓取,並開始在你的流中落後。

低延遲監控和警報

當您有一個警報用例時,您希望更低的延遲。要做到這一點:

  • 確保Kinesis流隻有一個消費者(也就是說,隻有你的流查詢,沒有其他人),這樣我們就可以優化你唯一的流查詢,以便在不遇到Kinesis速率限製的情況下盡可能快地獲取。

  • 設置選項maxFetchDuration到一個很小的值(比如,200毫秒),以盡快開始處理擷取的資料。

  • 設置選項minFetchPeriod210毫秒盡可能頻繁地獲取。

  • 設置選項shardsPerTask或者將集群配置為集群> =2(#運動碎片)/shardsPerTask.這樣可以保證後台預取任務和流查詢任務可以並發執行。

如果您看到您的查詢每5秒接收一次數據,那麼很可能就是這樣達到運動速率限製.檢查您的配置。

警告

如果刪除並重新創建Kinesis流,則不能重用任何現有檢查點目錄來重新啟動流查詢。您必須刪除檢查點目錄並從頭開始這些查詢。

指標

請注意

在Databricks Runtime 8.1及以上版本中可用。

Kinesis報告了消費者在每個工作空間的流開始後落後的毫秒數。您可以獲得流查詢過程中所有工作空間的毫秒數的平均值、最小值和最大值(https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#reading-metrics-interactively)作為avgMsBehindLatestmaxMsBehindLatest,minMsBehindLatest指標。如果在筆記本中運行流,則可以在原始數據頁中的流查詢進度儀表板

“源”“描述”“KinesisV2(流)”“指標”“avgMsBehindLatest”“32000.0”“maxMsBehindLatest”“32000”“minMsBehindLatest”“32000”},

寫入到Kinesis

下麵的代碼片段可以用作ForeachSink向Kinesis寫入數據。它需要數據集[(字符串,數組(字節)))

請注意

下麵的代碼片段提供至少一次語義上的,不止一次。

Kinesis Foreach水槽筆記本

在新標簽頁打開筆記本