亞馬遜運動
用於結構化流的Kinesis連接器包含在Databricks Runtime中。
使用Amazon Kinesis進行認證
對於Kinesis的身份驗證,我們使用Amazon的默認憑證提供者鏈默認情況下。我們建議使用一個可以訪問Kinesis的實例配置文件啟動Databricks集群。如果希望使用密鑰進行訪問,可以使用選項提供密鑰awsAccessKey
而且awsSecretKey
.
你也可以承擔IAM角色使用roleArn
選擇。您可以選擇使用指定外部IDroleExternalId
和會話名roleSessionName
.為了承擔角色,您可以使用承擔角色的權限啟動集群,或者通過提供訪問密鑰awsAccessKey
而且awsSecretKey
.對於跨帳戶身份驗證,我們建議使用roleArn
以持有所承擔的角色,然後可以通過Databricks AWS帳戶承擔該角色。有關跨帳戶身份驗證的詳細信息,請參見使用IAM角色跨AWS帳戶授權訪問.
請注意
運動源要求ListShards
,GetRecords
,GetShardIterator
權限。如果你遇到亞馬遜:訪問否認
異常時,檢查您的用戶或配置文件是否具有這些權限。看到通過IAM控製對Amazon Kinesis數據流資源的訪問欲知詳情。
模式
記錄的模式是:
列 |
類型 |
---|---|
partitionKey |
字符串 |
數據 |
二進製 |
流 |
字符串 |
shardId |
字符串 |
sequenceNumber |
字符串 |
approximateArrivalTimestamp |
時間戳 |
使用數據幀操作(鑄造(“字符串”)
, udfs)來顯式反序列化數據
列。
配置
警告
由於Kinesis執行的速率限製和Kinesis API的限製,執行一次觸發器(Trigger.Once ()
)不被Kinesis支持。
下麵是用於指定要讀取的數據的最重要配置。
選項 |
價值 |
默認的 |
描述 |
---|---|---|---|
streamName |
以逗號分隔的流名列表。 |
無(必需參數) |
要訂閱的流名稱。 |
地區 |
要指定的流的區域。 |
局部分辨區域 |
定義流的區域。 |
端點 |
Kinesis數據流的區域。 |
局部分辨區域 |
Kinesis數據流的區域端點。 |
initialPosition |
Latest、trim_horizon、最早(trim_horizon的別名)、 |
最新的 |
從流中的哪裏開始讀取。 |
對於生產方麵的考慮,請回顧關鍵技術考慮因素和最佳實踐.
在某個時間點開始閱讀
請注意
此特性在Databricks Runtime 7.3 LTS及以上版本上可用。
要在某個時間點開始閱讀,可以使用at_timestamp
的值。initialPosition
選擇。您可以將值指定為JSON字符串,例如{“at_timestamp”:“06/25/202010:23:45PDT "}
.流查詢將讀取給定時間戳(包括)時或之後的所有更改。它使用Java默認格式解析時間戳。你可以通過在JSON字符串中提供一個額外的字段顯式地指定格式,例如:
(火花.readStream.格式(“運動”).選項(“streamName”,kinesisStreamName).選項(“地區”,kinesisRegion).選項(“initialPosition”,'{"at_timestamp": "06/25/2020 10:23:45 PDT", "format": "MM/dd/yyyy HH: MM:ss ZZZ"}').選項(“awsAccessKey”,awsAccessKeyId).選項(“awsSecretKey”,awsSecretKey).負載())
此外,還有用於控製從Kinesis讀取的吞吐量和延遲的配置。Kinesis源在後台線程中運行Spark作業,定期預取Kinesis數據並將其緩存到Spark執行器的內存中。隻有在每個預取步驟完成後,流查詢才會處理緩存的數據,並使數據可用於處理。因此,這個預取步驟決定了很多觀察到的端到端延遲和吞吐量。您可以通過以下選項控製性能。
選項 |
價值 |
默認的 |
描述 |
---|---|---|---|
maxRecordsPerFetch |
一個正整數。 |
10000年 |
每個API請求要讀取多少記錄到Kinesis。返回的記錄數量實際上可能更高,這取決於是否使用Kinesis Producer Library將子記錄聚合為單個記錄。 |
maxFetchRate |
正十進製,表示數據速率,單位為MB/s。 |
1.0(最大= 2.0) |
每個分片預取數據的速度。這是為了限製取回的速率,避免運動節流。2.0 MB/s是Kinesis允許的最大速率。 |
minFetchPeriod |
例如,一個持續時間字符串, |
400ms (min = 200ms) |
連續預取間隔時間。這是為了限製取回的頻率並避免Kinesis節流。200ms是最小值,因為kineesis最多允許5次/秒的抓取。 |
maxFetchDuration |
例如,一個持續時間字符串, |
十年代 |
在使預取的新數據可用於處理之前緩衝多長時間。 |
fetchBufferSize |
例如,一個字節字符串, |
20 gb |
為下一次觸發器緩衝多少數據。該值用作停止條件,而不是嚴格的上限,因此可能會緩存比指定的值更多的數據。 |
shardsPerTask |
一個正整數。 |
5 |
每個Spark任務並行預取多少個Kinesis分片。在理想的情況下 |
shardFetchInterval |
例如,一個持續時間字符串, |
1 |
多久輪詢一次Kinesis進行重分片。 |
awsAccessKey |
字符串 |
沒有違約。 |
AWS訪問密鑰。 |
awsSecretKey |
字符串 |
沒有違約。 |
與接入密鑰對應的AWS秘密訪問密鑰。 |
roleArn |
字符串 |
沒有違約。 |
訪問Kinesis時要承擔的角色的Amazon資源名(ARN)。 |
roleExternalId |
字符串 |
沒有違約。 |
可選值,可在將訪問權限委托給AWS帳戶時使用。看到如何使用外部ID. |
roleSessionName |
字符串 |
沒有違約。 |
假設角色會話的標識符,當不同主體或出於不同原因假設相同角色時,該標識符唯一標識會話。 |
coalesceThresholdBlockSize |
一個正整數。 |
10000000年 |
自動合並發生的閾值。如果平均塊大小小於此值,則預取的塊將合並到 |
coalesceBinSize |
一個正整數。 |
128000000年 |
合並後的大致塊大小。 |
請注意
選項的默認值已經被選擇,這樣兩個讀取器(Spark或其他)可以同時使用Kinesis流而不會達到Kinesis速率限製。如果你有更多的消費者,你必須相應地調整選項。例如,你可能不得不減少maxFetchRate
,並增加minFetchPeriod
.
以下是針對特定用例的一些建議配置。
從Kinesis到S3的ETL
當您對長期存儲執行ETL時,您希望使用少量的大文件。在這種情況下,您可能需要設置一個較大的流觸發間隔,例如5-10分鍾。此外,你可能想要增加你的薪水maxFetchDuration
這樣你就可以緩衝在處理過程中會被寫出來的大塊,並增加fetchBufferSize
這樣你就不會在觸發器之間過早停止抓取,並開始在你的流中落後。
低延遲監控和警報
當您有一個警報用例時,您希望更低的延遲。要做到這一點:
確保Kinesis流隻有一個消費者(也就是說,隻有你的流查詢,沒有其他人),這樣我們就可以優化你唯一的流查詢,以便在不遇到Kinesis速率限製的情況下盡可能快地獲取。
設置選項
maxFetchDuration
到一個很小的值(比如,200毫秒
),以盡快開始處理擷取的資料。設置選項
minFetchPeriod
來210毫秒
盡可能頻繁地獲取。設置選項
shardsPerTask
或者將集群配置為#核在集群> =2*(#運動碎片)/shardsPerTask
.這樣可以保證後台預取任務和流查詢任務可以並發執行。
如果您看到您的查詢每5秒接收一次數據,那麼很可能就是這樣達到運動速率限製.檢查您的配置。
警告
如果刪除並重新創建Kinesis流,則不能重用任何現有檢查點目錄來重新啟動流查詢。您必須刪除檢查點目錄並從頭開始這些查詢。
指標
請注意
在Databricks Runtime 8.1及以上版本中可用。
Kinesis報告了消費者在每個工作空間的流開始後落後的毫秒數。您可以獲得流查詢過程中所有工作空間的毫秒數的平均值、最小值和最大值(https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#reading-metrics-interactively)作為avgMsBehindLatest
,maxMsBehindLatest
,minMsBehindLatest
指標。如果在筆記本中運行流,則可以在原始數據頁中的流查詢進度儀表板:
{“源”:[{“描述”:“KinesisV2(流)”,“指標”:{“avgMsBehindLatest”:“32000.0”,“maxMsBehindLatest”:“32000”,“minMsBehindLatest”:“32000”},}]}