取消
顯示的結果
而不是尋找
你的意思是:

如何測試卡夫卡從磚筆記本連接

User16869510359
尊敬的貢獻者

結構化流的工作是失敗的,因為它無法連接到卡夫卡。我相信這個問題是與火花。我怎麼能隔離如果是引發圖書館問題或一個實際的網絡問題。

1接受解決方案

接受的解決方案

User16869510359
尊敬的貢獻者

下麵的代碼片段可以用來測試連接

進口java.util。進口java.util數組。進口org.apache.kafka.clients.admin屬性。AdminClient org.apache.kafka.clients.admin進口。AdminClientConfig org.apache.kafka.clients.admin進口。ListTopicsOptions val道具=新屬性()prop.put (“security.protocol”、“SSL”);prop.put (“ssl.truststore。位置”、“/ dbfs /鍵/自定義。密鑰存儲庫”);prop.put (“ssl.truststore.password”、“changeit”);prop.put (“ssl.keystore。location", "/dbfs/keys/custom.keystore" ); prop.put("ssl.keystore.password","changeit"); prop.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-server:9092") val adminClient = AdminClient.create(prop) val listTopicsOptions = new ListTopicsOptions(); listTopicsOptions.listInternal(true); println(adminClient.listTopics(listTopicsOptions).names().get());

在原帖子查看解決方案

2回答2

User16869510359
尊敬的貢獻者

下麵的代碼片段可以用來測試連接

進口java.util。進口java.util數組。進口org.apache.kafka.clients.admin屬性。AdminClient org.apache.kafka.clients.admin進口。AdminClientConfig org.apache.kafka.clients.admin進口。ListTopicsOptions val道具=新屬性()prop.put (“security.protocol”、“SSL”);prop.put (“ssl.truststore。位置”、“/ dbfs /鍵/自定義。密鑰存儲庫”);prop.put (“ssl.truststore.password”、“changeit”);prop.put (“ssl.keystore。location", "/dbfs/keys/custom.keystore" ); prop.put("ssl.keystore.password","changeit"); prop.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-server:9092") val adminClient = AdminClient.create(prop) val listTopicsOptions = new ListTopicsOptions(); listTopicsOptions.listInternal(true); println(adminClient.listTopics(listTopicsOptions).names().get());

時候
新的貢獻者三世

嗨@Harikrishnan Kunhumveettil,

我也試圖連接到Heroku卡夫卡從磚的筆記本,但麵臨的挑戰有關jks文件。我使用下麵的scala代碼:

——代碼

進口org.apache.spark.sql.functions。{get_json_object, json_tuple}

var streamingInputDF =

spark.readStream

.format(“卡夫卡”)

.option (“kafka.bootstrap。服務器”、“卡夫卡+ ssl: / / host1:端口1,卡夫卡+ ssl: / / host2:端口2”)

.option (“kafka.security.protocol”、“ssl”)

.option (“kafka.ssl.truststore。位置”、“< <信任庫。jks位置dbfs > >”)

.option (“kafka.ssl.truststore。密碼”、“< <密碼> >”)

.option (“kafka.ssl.keystore。位置”、“< <密鑰存儲庫。jks位置dbfs > >”)

.option (“kafka.ssl.keystore。密碼”、“< <密碼> >”)

.option(“訂閱”、“< < topic_name > >”)

.option (“startingOffsets”、“最新”)

.option (“minPartitions”,“10”)

.option (“failOnDataLoss”,“真正的”)

.load ()

顯示器(streamingInputDF)

——代碼

錯誤,我讓下麵:

——錯誤- - - - - -

kafkashaded.org.apache.kafka.common.KafkaException:未能構建卡夫卡消費者

在kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer。< init > (KafkaConsumer.java: 823)

在kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer。< init > (KafkaConsumer.java: 632)

在kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer。< init > (KafkaConsumer.java: 613)

org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer (ConsumerStrategy.scala: 107)

org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.consumer (KafkaOffsetReaderConsumer.scala: 83)

在org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer。anonfun partitionsAssignedToConsumer美元2美元(KafkaOffsetReaderConsumer.scala: 561)

在org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer。anonfun withRetriesWithoutInterrupt美元1美元(KafkaOffsetReaderConsumer.scala: 606)

在scala.runtime.java8.JFunction0專門sp.apply美元(美元JFunction0 mcV $ sp.java: 23)

org.apache.spark.util.UninterruptibleThread.runUninterruptibly (UninterruptibleThread.scala: 77)

org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.withRetriesWithoutInterrupt (KafkaOffsetReaderConsumer.scala: 605)

在org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer。anonfun partitionsAssignedToConsumer美元1美元(KafkaOffsetReaderConsumer.scala: 559)

org.apache.spark.util.UninterruptibleThreadRunner.runUninterruptibly (UninterruptibleThreadRunner.scala: 50)

org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.partitionsAssignedToConsumer (KafkaOffsetReaderConsumer.scala: 559)

org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.fetchLatestOffsets (KafkaOffsetReaderConsumer.scala: 339)

在org.apache.spark.sql.kafka010.KafkaMicroBatchStream。anonfun getOrCreateInitialPartitionOffsets美元1美元(KafkaMicroBatchStream.scala: 211)

scala.Option.getOrElse (Option.scala: 189)

org.apache.spark.sql.kafka010.KafkaMicroBatchStream.getOrCreateInitialPartitionOffsets (KafkaMicroBatchStream.scala: 206)

org.apache.spark.sql.kafka010.KafkaMicroBatchStream.initialOffset (KafkaMicroBatchStream.scala: 111)

在org.apache.spark.sql.execution.streaming.MicroBatchExecution。anonfun constructNextBatch美元5美元(MicroBatchExecution.scala: 414)

scala.Option.getOrElse (Option.scala: 189)

在org.apache.spark.sql.execution.streaming.MicroBatchExecution。anonfun constructNextBatch美元3美元(MicroBatchExecution.scala: 414)

org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken (ProgressReporter.scala: 293)

在org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken (ProgressReporter.scala: 291美元)

org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken (StreamExecution.scala: 73)

在org.apache.spark.sql.execution.streaming.MicroBatchExecution。anonfun constructNextBatch美元2美元(MicroBatchExecution.scala: 407)

scala.collection.TraversableLike。anonfun地圖1美元美元(TraversableLike.scala: 238)

scala.collection.immutable.Map Map1.foreach美元(Map.scala: 128)

scala.collection.TraversableLike.map (TraversableLike.scala: 238)

在scala.collection.TraversableLike.map (TraversableLike.scala: 231美元)

scala.collection.AbstractTraversable.map (Traversable.scala: 108)

在org.apache.spark.sql.execution.streaming.MicroBatchExecution。anonfun constructNextBatch美元1美元(MicroBatchExecution.scala: 404)

在scala.runtime.java8.JFunction0 mcZ sp.apply美元(JFunction0 mcZ sp.java美元:23)

org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked (MicroBatchExecution.scala: 677)

org.apache.spark.sql.execution.streaming.MicroBatchExecution.constructNextBatch (MicroBatchExecution.scala: 400)

在org.apache.spark.sql.execution.streaming.MicroBatchExecution。anonfun runActivatedStream美元2美元(MicroBatchExecution.scala: 225)

在scala.runtime.java8.JFunction0專門sp.apply美元(美元JFunction0 mcV $ sp.java: 23)

org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken (ProgressReporter.scala: 293)

在org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken (ProgressReporter.scala: 291美元)

org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken (StreamExecution.scala: 73)

在org.apache.spark.sql.execution.streaming.MicroBatchExecution。anonfun runActivatedStream美元1美元(MicroBatchExecution.scala: 208)

在org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute (TriggerExecutor.scala: 57)

org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream (MicroBatchExecution.scala: 202)

在org.apache.spark.sql.execution.streaming.StreamExecution。anonfun runStream美元1美元(StreamExecution.scala: 370)

在scala.runtime.java8.JFunction0專門sp.apply美元(美元JFunction0 mcV $ sp.java: 23)

org.apache.spark.sql.SparkSession.withActive (SparkSession.scala: 854)

在org.apache.spark.sql.execution.streaming.StreamExecution.org apache引發美元sql執行流StreamExecution $ $美元美元runStream (StreamExecution.scala: 341)

在org.apache.spark.sql.execution.streaming.StreamExecution不久美元1.美元運行(StreamExecution.scala: 268)

引起的:kafkashaded.org.apache.kafka.common.KafkaException: kafkashaded.org.apache.kafka.common.KafkaException:未能加載SSL密鑰庫/ FileStore /表/證書/密鑰存儲庫。類型的jks jks

kafkashaded.org.apache.kafka.common.network.SslChannelBuilder.configure (SslChannelBuilder.java: 74)

kafkashaded.org.apache.kafka.common.network.ChannelBuilders.create (ChannelBuilders.java: 157)

kafkashaded.org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder (ChannelBuilders.java: 73)

kafkashaded.org.apache.kafka.clients.ClientUtils.createChannelBuilder (ClientUtils.java: 105)

在kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer。< init > (KafkaConsumer.java: 740)

…46個更多

引起的:kafkashaded.org.apache.kafka.common.KafkaException:未能加載SSL密鑰庫/ FileStore /表/證書/密鑰存儲庫。類型的jks jks

kafkashaded.org.apache.kafka.common.security.ssl.DefaultSslEngineFactory SecurityStore.load美元(DefaultSslEngineFactory.java: 306)

在kafkashaded.org.apache.kafka.common.security.ssl.DefaultSslEngineFactory SecurityStore美元。< init > (DefaultSslEngineFactory.java: 285)

kafkashaded.org.apache.kafka.common.security.ssl.DefaultSslEngineFactory.createKeystore (DefaultSslEngineFactory.java: 255)

kafkashaded.org.apache.kafka.common.security.ssl.DefaultSslEngineFactory.configure (DefaultSslEngineFactory.java: 139)

kafkashaded.org.apache.kafka.common.security.ssl.SslFactory.instantiateSslEngineFactory (SslFactory.java: 136)

kafkashaded.org.apache.kafka.common.security.ssl.SslFactory.configure (SslFactory.java: 93)

kafkashaded.org.apache.kafka.common.network.SslChannelBuilder.configure (SslChannelBuilder.java: 72)

…50多

引起的:java.nio.file。NoSuchFileException: / FileStore /表/證書/ keystore.jks

sun.nio.fs.UnixException.translateToIOException (UnixException.java: 86)

sun.nio.fs.UnixException.rethrowAsIOException (UnixException.java: 102)

sun.nio.fs.UnixException.rethrowAsIOException (UnixException.java: 107)

sun.nio.fs.UnixFileSystemProvider.newByteChannel (UnixFileSystemProvider.java: 214)

java.nio.file.Files.newByteChannel (Files.java: 361)

java.nio.file.Files.newByteChannel (Files.java: 407)

java.nio.file.spi.FileSystemProvider.newInputStream (FileSystemProvider.java: 384)

java.nio.file.Files.newInputStream (Files.java: 152)

kafkashaded.org.apache.kafka.common.security.ssl.DefaultSslEngineFactory SecurityStore.load美元(DefaultSslEngineFactory.java: 299)

…56個更多

——錯誤- - - - - -

讓我知道如果你能幫我解決這個問題。

謝謝,

時候

歡迎來到磚社區:讓學習、網絡和一起慶祝

加入我們的快速增長的數據專業人員和專家的80 k +社區成員,準備發現,幫助和合作而做出有意義的聯係。

點擊在這裏注冊今天,加入!

參與令人興奮的技術討論,加入一個組與你的同事和滿足我們的成員。

Baidu
map