我麵臨著同樣的問題,這篇文章:https://community.www.eheci.com/s/question/0D58Y00009DE82zSAD/databricks-kafka-read-not-connecting
就我而言我連接彙合的雲。我能ping引導服務器,我能netstat成功在端口9092上。但當我試著使用一批消費者消費數據(或流,這並不重要),log4j服務器日誌得到消息:淹了
22/10/04 07:39:18警告NetworkClient:[消費者clientId =磚,groupId = new_group2]引導代理pkc - 75 - m1o.europe west3.gcp.confluent。雲:9092 (id: 1架:null)斷開連接
用戶id = " < API Key > "
密碼= " < API的秘密> "
主機= " pkc - 75 - m1o.europe west3.gcp.confluent.cloud: 9092”
主題= " topic_0 "
sasl_mech = "普通"
inputDF =火花\
.read \
.format \(“卡夫卡”)
.option (“kafka.bootstrap。服務器主機)\”
.option (“ssl.endpoint.identification。算法”、“https”) \
.option (“sasl。機製”,sasl_mech) \
.option(“安全。協議”、“SASL_SSL”) \
.option (“sasl.jaas。配置”、“org.apache.kafka.common.security.plain.PlainLoginModule需要用戶名密碼=“{}”=“{}”;”。格式(用戶id、密碼))\
.option \(“訂閱”、主題)
.option (“kafka.client。id”、“磚”)\
.option (“kafka.group。id”、“new_group2”) \
.option (“spark.streaming.kafka。maxRatePerPartition”、“5”) \
.option (“startingOffsets”、“最早”)\
.option (“kafka.session.timeout。\女士”,“10000”)
.option (“retry.backoff。\女士”,“1000”)
.option (“value.deserializer”、“ByteArrayDeserializer”) \
.load ()
顯示器(inputDF)
嗨@Debayan穆克吉,不,我沒有。
但在支流的幫助我改變了以下的聲明,這解決了。
inputDF =(火花.readStream .format .option (“kafka.bootstrap(“卡夫卡”)。服務器”,主機).option (“kafka.ssl.endpoint.identification。算法”、“https”) .option (“kafka.sasl。機製”、“普通”).option (“kafka.security。協議”、“SASL_SSL”) .option (“kafka.sasl.jaas。配置”、“kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule需要用戶名密碼=“{}”=“{}”;”。格式(用戶id、密碼)).option(“訂閱”,主題).option (“kafka.client。id”、“磚”).option (“kafka.group。id”、“new_group2”) .option (“spark.streaming.kafka。maxRatePerPartition”、“5”) .option (“startingOffsets”,“最早”).option (“kafka.session.timeout。女士”,“10000”).load ())
嗨@Debayan穆克吉,不,我沒有。
但在支流的幫助我改變了以下的聲明,這解決了。
inputDF =(火花.readStream .format .option (“kafka.bootstrap(“卡夫卡”)。服務器”,主機).option (“kafka.ssl.endpoint.identification。算法”、“https”) .option (“kafka.sasl。機製”、“普通”).option (“kafka.security。協議”、“SASL_SSL”) .option (“kafka.sasl.jaas。配置”、“kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule需要用戶名密碼=“{}”=“{}”;”。格式(用戶id、密碼)).option(“訂閱”,主題).option (“kafka.client。id”、“磚”).option (“kafka.group。id”、“new_group2”) .option (“spark.streaming.kafka。maxRatePerPartition”、“5”) .option (“startingOffsets”,“最早”).option (“kafka.session.timeout。女士”,“10000”).load ())
嗨@Sascha Zevenhuizen,你有機會讀這個醫生@Jose岡薩雷斯共享類似的線程共享了嗎?