Apache卡夫卡

Apache卡夫卡連接器為磚結構流打包運行時。您使用卡夫卡連接器連接到卡夫卡0.10 +的kafka08連接器連接到卡夫卡0.8 +(棄用)。

模式

記錄的模式是:

類型

關鍵

二進製

價值

二進製

主題

字符串

分區

int

抵消

時間戳

timestampType

int

關鍵價值總是反序列化為字節數組ByteArrayDeserializer。使用DataFrame操作(鑄造(“字符串”)udf)顯式地對鍵和值進行反序列化。

快速入門

讓我們先從一個規範化WordCount例子。以下筆記本演示了如何使用結構化流運行WordCount卡夫卡。

請注意

這個筆記本的例子使用0.10卡夫卡。0.8使用卡夫卡,改變格式kafka08(即,.format (“kafka08”))。

卡夫卡與結構化流WordCount筆記本

在新標簽頁打開筆記本

配置

comphensive列表的配置選項,請參閱火花結構化流+卡夫卡集成指南。在你開始之前,這是最常見的配置選項的子集。

請注意

結構化流仍在發展,這個列表可能不是最新的。

有多種方法的指定主題訂閱。你應該隻提供其中一個參數:

選項

價值

卡夫卡版本支持

描述

訂閱

一個以逗號分隔的話題。

0.8,0.10

主題訂閱列表。

subscribePattern

Java正則表達式字符串。

0.10

該模式用於訂閱主題(s)。

分配

JSON字符串{“局部藥”:[0,1],“主題”:(2、4)}

0.8,0.10

具體topicPartitions消費。

其他值得注意的配置:

選項

價值

默認值

卡夫卡版本支持

描述

kafka.bootstrap.servers

以逗號分隔的主持人:端口。

0.8,0.10

[要求]卡夫卡bootstrap.servers配置。如果你發現沒有數據從卡夫卡,首先檢查代理地址列表。如果代理地址列表不正確,可能沒有任何錯誤。這是因為卡夫卡端假設經紀人最終會變得可用,在發生網絡錯誤重試,直到永遠。

failOnDataLoss

真正的

真正的

0.10

(可選的)是否失敗的查詢時數據丟失的可能。查詢可以從卡夫卡永久無法讀取數據,由於許多場景如刪除話題,話題截斷前處理,等等。我們試圖估計保守是否數據可能丟失。有時這可能導致假警報。設置這個選項如果它不正常工作,或者你想要查詢繼續處理盡管數據丟失。

minPartitions

整數> = 0,0 =禁用。

0(禁用)

0.10

(可選的)最小數量的分區從卡夫卡讀取。火花2.1.0-db2和上麵,您可以配置火花使用任意最小的分區從卡夫卡使用讀取minPartitions選擇。通常火花的1:1映射卡夫卡topicPartitions從卡夫卡火花分區使用。如果你設置minPartitions選擇一個值大於你的卡夫卡topicPartitions,火花將分配大型卡夫卡分區小的碎片。這個選項可以設置在高峰負荷,所以數據傾斜,當你流落後提高處理速度。時初始化卡夫卡在每個觸發消費者的成本,這可能會影響性能,如果你使用SSL連接到卡夫卡。

kafka.group.id

卡夫卡消費者組ID。

沒有設置

0.10

(可選的)組ID使用從卡夫卡在閱讀。支持引發2.2 +。小心地使用這個。默認情況下,每個查詢生成一個獨特的組ID讀取數據。這可以確保每個查詢都有自己的消費群體,沒有麵臨幹擾其他消費者一樣,因此可以閱讀所有分區的訂閱的主題。在某些情況下(例如,卡夫卡組的授權),您可能需要使用特定的授權組id讀取數據。您可以選擇設置組ID。然而,這個要特別小心,因為它可能導致不可預測的行為。

  • 並發運行查詢(包括批處理和流媒體)使用相同的組ID可能會互相幹擾,導致每個查詢隻讀數據的一部分。

  • 這也可能出現在查詢開始接二連三地/重新啟動。減少這樣的問題,設置卡夫卡消費者配置session.timeout.ms是非常小的。

startingOffsets

最早的,最新的

最新的

0.10

(可選的)查詢時的起點開始,要麼是“最早”,從最早的偏移量,或一個json字符串指定為每個TopicPartition的起始偏移量。在json, 2作為一個抵消最早可以用來參考,最新的1。注意:對於批處理查詢,最新(隱式或通過使用1以json)是不允許的。對於流媒體查詢,這隻適用於當開始一個新的查詢和恢複總是撿起從哪裏查詢。新發現的分區在查詢將從最早開始。

看到結構化流卡夫卡集成指南其他可選的配置。

重要的

不應該設置以下卡夫卡參數卡夫卡0.10連接器,它將拋出一個異常:

  • group.id:設置這個參數是不允許火花版本低於2.2。

  • auto.offset.reset:相反,設置源選擇startingOffsets指定從哪裏開始。為了保持一致性,結構化流(而不是卡夫卡消費者)管理內部消費的補償。這將確保你不要錯過任何數據動態訂閱後的新主題/分區。startingOffsets隻適用於當你開始一個新的流媒體查詢,,總是從一個檢查點恢複好轉的查詢。

  • key.deserializer:鑰匙總是反序列化為字節數組ByteArrayDeserializer。使用DataFrame操作來顯式地對鑰匙進行反序列化。

  • value.deserializer:值總是反序列化為字節數組ByteArrayDeserializer。使用DataFrame操作顯式的值進行反序列化。

  • enable.auto.commit:設置這個參數是不允許的。火花跟蹤卡夫卡抵消內部和不承諾任何抵消。

  • interceptor.classes:卡夫卡源總是讀鍵和值字節數組。它使用起來不安全ConsumerInterceptor因為它可能打破查詢。

生產結構流與卡夫卡的筆記本

在新標簽頁打開筆記本

指標

請注意

在磚運行時8.1及以上。

你可以得到平均最小值和最大值的補償的數量背後的流媒體查詢最新抵消在所有的訂閱的主題avgOffsetsBehindLatest,maxOffsetsBehindLatest,minOffsetsBehindLatest指標。看到閱讀指標交互

請注意

在磚運行時9.1及以上。

獲得的字節總數估計查詢過程沒有消耗從訂閱的主題通過檢查的價值estimatedTotalBytesBehindLatest。這估計是基於批量加工的最後300秒。估計的時間框架是基於可以改變通過設置選項bytesEstimateWindowLength到一個不同的值。例如,將其設置為10分鍾:

df=火花readStream\格式(“卡夫卡”)\選項(“bytesEstimateWindowLength”,“10 m”)/ /分鍾,可以使用“600年代”600年

如果您正在運行流在一個筆記本,你可以看到這些度量標準下原始數據選項卡中流儀表板查詢進展:

{“源”:({“描述”:“KafkaV2訂閱(主題)”,“指標”:{“avgOffsetsBehindLatest”:“4.0”,“maxOffsetsBehindLatest”:“4”,“minOffsetsBehindLatest”:“4”,“estimatedTotalBytesBehindLatest”:“80.0”},}]}

使用SSL

啟用SSL連接卡夫卡,聽從指示的融合性的文檔通過SSL加密和身份驗證。您可以提供描述的配置,前綴卡夫卡。,如選項。例如,您指定的信任存儲位置屬性kafka.ssl.truststore.location

我們建議您:

一旦安裝和秘密存儲路徑,您可以執行以下操作:

df=火花readStream\格式(“卡夫卡”)\選項(“kafka.bootstrap.servers”,)\選項(“kafka.security.protocol”,“SASL_SSL”)\選項(“kafka.ssl.truststore.location”,<dbfs- - - - - -信任存儲庫- - - - - -位置>)\選項(“kafka.ssl.keystore.location”,<dbfs- - - - - -密鑰存儲庫- - - - - -位置>)\選項(“kafka.ssl.keystore.password”,dbutils秘密得到(範圍= <證書- - - - - -範圍- - - - - -的名字>,關鍵= <密鑰存儲庫- - - - - -密碼- - - - - -關鍵- - - - - -的名字>))\選項(“kafka.ssl.truststore.password”,dbutils秘密得到(範圍= <證書- - - - - -範圍- - - - - -的名字>,關鍵= <信任存儲庫- - - - - -密碼- - - - - -關鍵- - - - - -的名字>))