Apache卡夫卡
Apache卡夫卡連接器為磚結構流打包運行時。您使用卡夫卡
連接器連接到卡夫卡0.10 +的kafka08
連接器連接到卡夫卡0.8 +(棄用)。
模式
記錄的模式是:
列 |
類型 |
---|---|
關鍵 |
二進製 |
價值 |
二進製 |
主題 |
字符串 |
分區 |
int |
抵消 |
長 |
時間戳 |
長 |
timestampType |
int |
的關鍵
和價值
總是反序列化為字節數組ByteArrayDeserializer
。使用DataFrame操作(鑄造(“字符串”)
udf)顯式地對鍵和值進行反序列化。
快速入門
讓我們先從一個規範化WordCount例子。以下筆記本演示了如何使用結構化流運行WordCount卡夫卡。
請注意
這個筆記本的例子使用0.10卡夫卡。0.8使用卡夫卡,改變格式kafka08
(即,.format (“kafka08”)
)。
配置
comphensive列表的配置選項,請參閱火花結構化流+卡夫卡集成指南。在你開始之前,這是最常見的配置選項的子集。
請注意
結構化流仍在發展,這個列表可能不是最新的。
有多種方法的指定主題訂閱。你應該隻提供其中一個參數:
選項 |
價值 |
卡夫卡版本支持 |
描述 |
---|---|---|---|
訂閱 |
一個以逗號分隔的話題。 |
0.8,0.10 |
主題訂閱列表。 |
subscribePattern |
Java正則表達式字符串。 |
0.10 |
該模式用於訂閱主題(s)。 |
分配 |
JSON字符串 |
0.8,0.10 |
具體topicPartitions消費。 |
其他值得注意的配置:
選項 |
價值 |
默認值 |
卡夫卡版本支持 |
描述 |
---|---|---|---|---|
kafka.bootstrap.servers |
以逗號分隔的主持人:端口。 |
空 |
0.8,0.10 |
[要求]卡夫卡 |
failOnDataLoss |
|
|
0.10 |
(可選的)是否失敗的查詢時數據丟失的可能。查詢可以從卡夫卡永久無法讀取數據,由於許多場景如刪除話題,話題截斷前處理,等等。我們試圖估計保守是否數據可能丟失。有時這可能導致假警報。設置這個選項 |
minPartitions |
整數> = 0,0 =禁用。 |
0(禁用) |
0.10 |
(可選的)最小數量的分區從卡夫卡讀取。火花2.1.0-db2和上麵,您可以配置火花使用任意最小的分區從卡夫卡使用讀取 |
kafka.group.id |
卡夫卡消費者組ID。 |
沒有設置 |
0.10 |
(可選的)組ID使用從卡夫卡在閱讀。支持引發2.2 +。小心地使用這個。默認情況下,每個查詢生成一個獨特的組ID讀取數據。這可以確保每個查詢都有自己的消費群體,沒有麵臨幹擾其他消費者一樣,因此可以閱讀所有分區的訂閱的主題。在某些情況下(例如,卡夫卡組的授權),您可能需要使用特定的授權組id讀取數據。您可以選擇設置組ID。然而,這個要特別小心,因為它可能導致不可預測的行為。
|
startingOffsets |
最早的,最新的 |
最新的 |
0.10 |
(可選的)查詢時的起點開始,要麼是“最早”,從最早的偏移量,或一個json字符串指定為每個TopicPartition的起始偏移量。在json, 2作為一個抵消最早可以用來參考,最新的1。注意:對於批處理查詢,最新(隱式或通過使用1以json)是不允許的。對於流媒體查詢,這隻適用於當開始一個新的查詢和恢複總是撿起從哪裏查詢。新發現的分區在查詢將從最早開始。 |
看到結構化流卡夫卡集成指南其他可選的配置。
重要的
你不應該設置以下卡夫卡參數卡夫卡0.10連接器,它將拋出一個異常:
group.id
:設置這個參數是不允許火花版本低於2.2。auto.offset.reset
:相反,設置源選擇startingOffsets
指定從哪裏開始。為了保持一致性,結構化流(而不是卡夫卡消費者)管理內部消費的補償。這將確保你不要錯過任何數據動態訂閱後的新主題/分區。startingOffsets
隻適用於當你開始一個新的流媒體查詢,,總是從一個檢查點恢複好轉的查詢。key.deserializer
:鑰匙總是反序列化為字節數組ByteArrayDeserializer
。使用DataFrame操作來顯式地對鑰匙進行反序列化。value.deserializer
:值總是反序列化為字節數組ByteArrayDeserializer
。使用DataFrame操作顯式的值進行反序列化。enable.auto.commit
:設置這個參數是不允許的。火花跟蹤卡夫卡抵消內部和不承諾任何抵消。interceptor.classes
:卡夫卡源總是讀鍵和值字節數組。它使用起來不安全ConsumerInterceptor
因為它可能打破查詢。
指標
請注意
在磚運行時8.1及以上。
你可以得到平均最小值和最大值的補償的數量背後的流媒體查詢最新抵消在所有的訂閱的主題avgOffsetsBehindLatest
,maxOffsetsBehindLatest
,minOffsetsBehindLatest
指標。看到閱讀指標交互。
請注意
在磚運行時9.1及以上。
獲得的字節總數估計查詢過程沒有消耗從訂閱的主題通過檢查的價值estimatedTotalBytesBehindLatest
。這估計是基於批量加工的最後300秒。估計的時間框架是基於可以改變通過設置選項bytesEstimateWindowLength
到一個不同的值。例如,將其設置為10分鍾:
df=火花。readStream\。格式(“卡夫卡”)\。選項(“bytesEstimateWindowLength”,“10 m”)/ /米為分鍾,你可以也使用“600年代”為600年秒
如果您正在運行流在一個筆記本,你可以看到這些度量標準下原始數據選項卡中流儀表板查詢進展:
{“源”:({“描述”:“KafkaV2訂閱(主題)”,“指標”:{“avgOffsetsBehindLatest”:“4.0”,“maxOffsetsBehindLatest”:“4”,“minOffsetsBehindLatest”:“4”,“estimatedTotalBytesBehindLatest”:“80.0”},}]}
使用SSL
啟用SSL連接卡夫卡,聽從指示的融合性的文檔通過SSL加密和身份驗證。您可以提供描述的配置,前綴卡夫卡。
,如選項。例如,您指定的信任存儲位置屬性kafka.ssl.truststore.location
。
我們建議您:
一旦安裝和秘密存儲路徑,您可以執行以下操作:
df=火花。readStream\。格式(“卡夫卡”)\。選項(“kafka.bootstrap.servers”,…)\。選項(“kafka.security.protocol”,“SASL_SSL”)\。選項(“kafka.ssl.truststore.location”,<dbfs- - - - - -信任存儲庫- - - - - -位置>)\。選項(“kafka.ssl.keystore.location”,<dbfs- - - - - -密鑰存儲庫- - - - - -位置>)\。選項(“kafka.ssl.keystore.password”,dbutils。秘密。得到(範圍= <證書- - - - - -範圍- - - - - -的名字>,關鍵= <密鑰存儲庫- - - - - -密碼- - - - - -關鍵- - - - - -的名字>))\。選項(“kafka.ssl.truststore.password”,dbutils。秘密。得到(範圍= <證書- - - - - -範圍- - - - - -的名字>,關鍵= <信任存儲庫- - - - - -密碼- - - - - -關鍵- - - - - -的名字>))