下麵的代碼片段可以用來測試連接
進口java.util。進口java.util數組。進口org.apache.kafka.clients.admin屬性。AdminClient org.apache.kafka.clients.admin進口。AdminClientConfig org.apache.kafka.clients.admin進口。ListTopicsOptions val道具=新屬性()prop.put (“security.protocol”、“SSL”);prop.put (“ssl.truststore。位置”、“/ dbfs /鍵/自定義。密鑰存儲庫”);prop.put (“ssl.truststore.password”、“changeit”);prop.put (“ssl.keystore。location", "/dbfs/keys/custom.keystore" ); prop.put("ssl.keystore.password","changeit"); prop.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-server:9092") val adminClient = AdminClient.create(prop) val listTopicsOptions = new ListTopicsOptions(); listTopicsOptions.listInternal(true); println(adminClient.listTopics(listTopicsOptions).names().get());
下麵的代碼片段可以用來測試連接
進口java.util。進口java.util數組。進口org.apache.kafka.clients.admin屬性。AdminClient org.apache.kafka.clients.admin進口。AdminClientConfig org.apache.kafka.clients.admin進口。ListTopicsOptions val道具=新屬性()prop.put (“security.protocol”、“SSL”);prop.put (“ssl.truststore。位置”、“/ dbfs /鍵/自定義。密鑰存儲庫”);prop.put (“ssl.truststore.password”、“changeit”);prop.put (“ssl.keystore。location", "/dbfs/keys/custom.keystore" ); prop.put("ssl.keystore.password","changeit"); prop.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-server:9092") val adminClient = AdminClient.create(prop) val listTopicsOptions = new ListTopicsOptions(); listTopicsOptions.listInternal(true); println(adminClient.listTopics(listTopicsOptions).names().get());
嗨@Harikrishnan Kunhumveettil,
我也試圖連接到Heroku卡夫卡從磚的筆記本,但麵臨的挑戰有關jks文件。我使用下麵的scala代碼:
——代碼
進口org.apache.spark.sql.functions。{get_json_object, json_tuple}
var streamingInputDF =
spark.readStream
.format(“卡夫卡”)
.option (“kafka.bootstrap。服務器”、“卡夫卡+ ssl: / / host1:端口1,卡夫卡+ ssl: / / host2:端口2”)
.option (“kafka.security.protocol”、“ssl”)
.option (“kafka.ssl.truststore。位置”、“< <信任庫。jks位置dbfs > >”)
.option (“kafka.ssl.truststore。密碼”、“< <密碼> >”)
.option (“kafka.ssl.keystore。位置”、“< <密鑰存儲庫。jks位置dbfs > >”)
.option (“kafka.ssl.keystore。密碼”、“< <密碼> >”)
.option(“訂閱”、“< < topic_name > >”)
.option (“startingOffsets”、“最新”)
.option (“minPartitions”,“10”)
.option (“failOnDataLoss”,“真正的”)
.load ()
顯示器(streamingInputDF)
——代碼
錯誤,我讓下麵:
——錯誤- - - - - -
kafkashaded.org.apache.kafka.common.KafkaException:未能構建卡夫卡消費者
在kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer。< init > (KafkaConsumer.java: 823)
在kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer。< init > (KafkaConsumer.java: 632)
在kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer。< init > (KafkaConsumer.java: 613)
org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer (ConsumerStrategy.scala: 107)
org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.consumer (KafkaOffsetReaderConsumer.scala: 83)
在org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer。anonfun partitionsAssignedToConsumer美元2美元(KafkaOffsetReaderConsumer.scala: 561)
在org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer。anonfun withRetriesWithoutInterrupt美元1美元(KafkaOffsetReaderConsumer.scala: 606)
在scala.runtime.java8.JFunction0專門sp.apply美元(美元JFunction0 mcV $ sp.java: 23)
org.apache.spark.util.UninterruptibleThread.runUninterruptibly (UninterruptibleThread.scala: 77)
org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.withRetriesWithoutInterrupt (KafkaOffsetReaderConsumer.scala: 605)
在org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer。anonfun partitionsAssignedToConsumer美元1美元(KafkaOffsetReaderConsumer.scala: 559)
org.apache.spark.util.UninterruptibleThreadRunner.runUninterruptibly (UninterruptibleThreadRunner.scala: 50)
org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.partitionsAssignedToConsumer (KafkaOffsetReaderConsumer.scala: 559)
org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.fetchLatestOffsets (KafkaOffsetReaderConsumer.scala: 339)
在org.apache.spark.sql.kafka010.KafkaMicroBatchStream。anonfun getOrCreateInitialPartitionOffsets美元1美元(KafkaMicroBatchStream.scala: 211)
scala.Option.getOrElse (Option.scala: 189)
org.apache.spark.sql.kafka010.KafkaMicroBatchStream.getOrCreateInitialPartitionOffsets (KafkaMicroBatchStream.scala: 206)
org.apache.spark.sql.kafka010.KafkaMicroBatchStream.initialOffset (KafkaMicroBatchStream.scala: 111)
在org.apache.spark.sql.execution.streaming.MicroBatchExecution。anonfun constructNextBatch美元5美元(MicroBatchExecution.scala: 414)
scala.Option.getOrElse (Option.scala: 189)
在org.apache.spark.sql.execution.streaming.MicroBatchExecution。anonfun constructNextBatch美元3美元(MicroBatchExecution.scala: 414)
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken (ProgressReporter.scala: 293)
在org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken (ProgressReporter.scala: 291美元)
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken (StreamExecution.scala: 73)
在org.apache.spark.sql.execution.streaming.MicroBatchExecution。anonfun constructNextBatch美元2美元(MicroBatchExecution.scala: 407)
scala.collection.TraversableLike。anonfun地圖1美元美元(TraversableLike.scala: 238)
scala.collection.immutable.Map Map1.foreach美元(Map.scala: 128)
scala.collection.TraversableLike.map (TraversableLike.scala: 238)
在scala.collection.TraversableLike.map (TraversableLike.scala: 231美元)
scala.collection.AbstractTraversable.map (Traversable.scala: 108)
在org.apache.spark.sql.execution.streaming.MicroBatchExecution。anonfun constructNextBatch美元1美元(MicroBatchExecution.scala: 404)
在scala.runtime.java8.JFunction0 mcZ sp.apply美元(JFunction0 mcZ sp.java美元:23)
org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked (MicroBatchExecution.scala: 677)
org.apache.spark.sql.execution.streaming.MicroBatchExecution.constructNextBatch (MicroBatchExecution.scala: 400)
在org.apache.spark.sql.execution.streaming.MicroBatchExecution。anonfun runActivatedStream美元2美元(MicroBatchExecution.scala: 225)
在scala.runtime.java8.JFunction0專門sp.apply美元(美元JFunction0 mcV $ sp.java: 23)
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken (ProgressReporter.scala: 293)
在org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken (ProgressReporter.scala: 291美元)
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken (StreamExecution.scala: 73)
在org.apache.spark.sql.execution.streaming.MicroBatchExecution。anonfun runActivatedStream美元1美元(MicroBatchExecution.scala: 208)
在org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute (TriggerExecutor.scala: 57)
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream (MicroBatchExecution.scala: 202)
在org.apache.spark.sql.execution.streaming.StreamExecution。anonfun runStream美元1美元(StreamExecution.scala: 370)
在scala.runtime.java8.JFunction0專門sp.apply美元(美元JFunction0 mcV $ sp.java: 23)
org.apache.spark.sql.SparkSession.withActive (SparkSession.scala: 854)
在org.apache.spark.sql.execution.streaming.StreamExecution.org apache引發美元sql執行流StreamExecution $ $美元美元runStream (StreamExecution.scala: 341)
在org.apache.spark.sql.execution.streaming.StreamExecution不久美元1.美元運行(StreamExecution.scala: 268)
引起的:kafkashaded.org.apache.kafka.common.KafkaException: kafkashaded.org.apache.kafka.common.KafkaException:未能加載SSL密鑰庫/ FileStore /表/證書/密鑰存儲庫。類型的jks jks
kafkashaded.org.apache.kafka.common.network.SslChannelBuilder.configure (SslChannelBuilder.java: 74)
kafkashaded.org.apache.kafka.common.network.ChannelBuilders.create (ChannelBuilders.java: 157)
kafkashaded.org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder (ChannelBuilders.java: 73)
kafkashaded.org.apache.kafka.clients.ClientUtils.createChannelBuilder (ClientUtils.java: 105)
在kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer。< init > (KafkaConsumer.java: 740)
…46個更多
引起的:kafkashaded.org.apache.kafka.common.KafkaException:未能加載SSL密鑰庫/ FileStore /表/證書/密鑰存儲庫。類型的jks jks
kafkashaded.org.apache.kafka.common.security.ssl.DefaultSslEngineFactory SecurityStore.load美元(DefaultSslEngineFactory.java: 306)
在kafkashaded.org.apache.kafka.common.security.ssl.DefaultSslEngineFactory SecurityStore美元。< init > (DefaultSslEngineFactory.java: 285)
kafkashaded.org.apache.kafka.common.security.ssl.DefaultSslEngineFactory.createKeystore (DefaultSslEngineFactory.java: 255)
kafkashaded.org.apache.kafka.common.security.ssl.DefaultSslEngineFactory.configure (DefaultSslEngineFactory.java: 139)
kafkashaded.org.apache.kafka.common.security.ssl.SslFactory.instantiateSslEngineFactory (SslFactory.java: 136)
kafkashaded.org.apache.kafka.common.security.ssl.SslFactory.configure (SslFactory.java: 93)
kafkashaded.org.apache.kafka.common.network.SslChannelBuilder.configure (SslChannelBuilder.java: 72)
…50多
引起的:java.nio.file。NoSuchFileException: / FileStore /表/證書/ keystore.jks
sun.nio.fs.UnixException.translateToIOException (UnixException.java: 86)
sun.nio.fs.UnixException.rethrowAsIOException (UnixException.java: 102)
sun.nio.fs.UnixException.rethrowAsIOException (UnixException.java: 107)
sun.nio.fs.UnixFileSystemProvider.newByteChannel (UnixFileSystemProvider.java: 214)
java.nio.file.Files.newByteChannel (Files.java: 361)
java.nio.file.Files.newByteChannel (Files.java: 407)
java.nio.file.spi.FileSystemProvider.newInputStream (FileSystemProvider.java: 384)
java.nio.file.Files.newInputStream (Files.java: 152)
kafkashaded.org.apache.kafka.common.security.ssl.DefaultSslEngineFactory SecurityStore.load美元(DefaultSslEngineFactory.java: 299)
…56個更多
——錯誤- - - - - -
讓我知道如果你能幫我解決這個問題。
謝謝,
時候