kafkashaded.org.apache.kafka.common.KafkaException:未能構建卡夫卡消費者
在kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer。< init > (KafkaConsumer.java: 823)
在kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer。< init > (KafkaConsumer.java: 632)
在kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer。< init > (KafkaConsumer.java: 613)
org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer (ConsumerStrategy.scala: 107)
org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.consumer (KafkaOffsetReaderConsumer.scala: 83)
在org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer。anonfun partitionsAssignedToConsumer美元2美元(KafkaOffsetReaderConsumer.scala: 561)
在org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer。anonfun withRetriesWithoutInterrupt美元1美元(KafkaOffsetReaderConsumer.scala: 606)
在scala.runtime.java8.JFunction0專門sp.apply美元(美元JFunction0 mcV $ sp.java: 23)
org.apache.spark.util.UninterruptibleThread.runUninterruptibly (UninterruptibleThread.scala: 77)
org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.withRetriesWithoutInterrupt (KafkaOffsetReaderConsumer.scala: 605)
在org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer。anonfun partitionsAssignedToConsumer美元1美元(KafkaOffsetReaderConsumer.scala: 559)
org.apache.spark.util.UninterruptibleThreadRunner.runUninterruptibly (UninterruptibleThreadRunner.scala: 50)
org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.partitionsAssignedToConsumer (KafkaOffsetReaderConsumer.scala: 559)
org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.fetchEarliestOffsets (KafkaOffsetReaderConsumer.scala: 321)
在org.apache.spark.sql.kafka010.KafkaMicroBatchStream。anonfun getOrCreateInitialPartitionOffsets美元1美元(KafkaMicroBatchStream.scala: 209)
scala.Option.getOrElse (Option.scala: 189)
org.apache.spark.sql.kafka010.KafkaMicroBatchStream.getOrCreateInitialPartitionOffsets (KafkaMicroBatchStream.scala: 206)
org.apache.spark.sql.kafka010.KafkaMicroBatchStream.initialOffset (KafkaMicroBatchStream.scala: 111)
在org.apache.spark.sql.execution.streaming.MicroBatchExecution。anonfun constructNextBatch美元5美元(MicroBatchExecution.scala: 414)
scala.Option.getOrElse (Option.scala: 189)
在org.apache.spark.sql.execution.streaming.MicroBatchExecution。anonfun constructNextBatch美元3美元(MicroBatchExecution.scala: 414)
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken (ProgressReporter.scala: 293)
在org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken (ProgressReporter.scala: 291美元)
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken (StreamExecution.scala: 73)
在org.apache.spark.sql.execution.streaming.MicroBatchExecution。anonfun constructNextBatch美元2美元(MicroBatchExecution.scala: 407)
scala.collection.TraversableLike。anonfun地圖1美元美元(TraversableLike.scala: 238)
scala.collection.immutable.Map Map1.foreach美元(Map.scala: 128)
scala.collection.TraversableLike.map (TraversableLike.scala: 238)
在scala.collection.TraversableLike.map (TraversableLike.scala: 231美元)
scala.collection.AbstractTraversable.map (Traversable.scala: 108)
在org.apache.spark.sql.execution.streaming.MicroBatchExecution。anonfun constructNextBatch美元1美元(MicroBatchExecution.scala: 404)
在scala.runtime.java8.JFunction0 mcZ sp.apply美元(JFunction0 mcZ sp.java美元:23)
org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked (MicroBatchExecution.scala: 677)
org.apache.spark.sql.execution.streaming.MicroBatchExecution.constructNextBatch (MicroBatchExecution.scala: 400)
在org.apache.spark.sql.execution.streaming.MicroBatchExecution。anonfun runActivatedStream美元2美元(MicroBatchExecution.scala: 225)
在scala.runtime.java8.JFunction0專門sp.apply美元(美元JFunction0 mcV $ sp.java: 23)
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken (ProgressReporter.scala: 293)
在org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken (ProgressReporter.scala: 291美元)
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken (StreamExecution.scala: 73)
在org.apache.spark.sql.execution.streaming.MicroBatchExecution。anonfun runActivatedStream美元1美元(MicroBatchExecution.scala: 208)
在org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute (TriggerExecutor.scala: 57)
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream (MicroBatchExecution.scala: 202)
在org.apache.spark.sql.execution.streaming.StreamExecution。anonfun runStream美元1美元(StreamExecution.scala: 370)
在scala.runtime.java8.JFunction0專門sp.apply美元(美元JFunction0 mcV $ sp.java: 23)
org.apache.spark.sql.SparkSession.withActive (SparkSession.scala: 854)
在org.apache.spark.sql.execution.streaming.StreamExecution.orgapache引發美元sql執行流StreamExecution $ $美元美元runStream (StreamExecution.scala: 341)
在org.apache.spark.sql.execution.streaming.StreamExecution不久美元1.美元運行(StreamExecution.scala: 268)
引起的:kafkashaded.org.apache.kafka.common.KafkaException: javax.security.auth.login。LoginException:無法找到的LoginModule類:org.apache.kafka.common.security.plain.PlainLoginModule
kafkashaded.org.apache.kafka.common.network.SaslChannelBuilder.configure (SaslChannelBuilder.java: 172)
kafkashaded.org.apache.kafka.common.network.ChannelBuilders.create (ChannelBuilders.java: 157)
kafkashaded.org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder (ChannelBuilders.java: 73)
kafkashaded.org.apache.kafka.clients.ClientUtils.createChannelBuilder (ClientUtils.java: 105)
在kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer。< init > (KafkaConsumer.java: 740)
…46個更多
引起的:javax.security.auth.login。LoginException:無法找到的LoginModule類:org.apache.kafka.common.security.plain.PlainLoginModule
javax.security.auth.login.LoginContext.invoke (LoginContext.java: 794)
javax.security.auth.login.LoginContext.access 000美元(LoginContext.java: 195)
在javax.security.auth.login.LoginContext 4.美元運行(LoginContext.java: 682)
在javax.security.auth.login.LoginContext 4.美元運行(LoginContext.java: 680)
在java.security.AccessController。doPrivileged(本地方法)
javax.security.auth.login.LoginContext.invokePriv (LoginContext.java: 680)
javax.security.auth.login.LoginContext.login (LoginContext.java: 587)
kafkashaded.org.apache.kafka.common.security.authenticator.AbstractLogin.login (AbstractLogin.java: 60)
在kafkashaded.org.apache.kafka.common.security.authenticator.LoginManager。< init > (LoginManager.java: 62)
kafkashaded.org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager (LoginManager.java: 105)
kafkashaded.org.apache.kafka.common.network.SaslChannelBuilder.configure (SaslChannelBuilder.java: 158)
…50多
@Kaniz Fatma我有同樣的問題。
python導入pyspark.sql %。從pyspark.sql fn功能。類型進口StringType binary_to_string = fn。udf(λx: str (int.from_bytes (x, byteorder = '大')),StringType ()) df = spark.readStream.format .option(“卡夫卡”)(“訂閱”,“mytopic”) .option (“kafka.bootstrap。服務器”、“主持人:港口”).option .option (“startingOffsets”、“最早”)(“kafka.sasl。機製”、“普通”).option (“kafka.security。協議”、“SASL_SSL”) .option (“kafka.sasl。用戶名”、“myuser”) .option (“kafka.sasl。密碼”、“mypwd”) .option (“kafka.group。id”、“TestGroup2”) .load () .withColumn(“關鍵”,fn.col(“關鍵”).cast (StringType ())) .withColumn (fixedValue, fn。expr(“子串(價值6長度(值)5)”)).withColumn (valueSchemaId, binary_to_string (fn。expr(“子串(價值,2、4)))).select(“主題”、“分區”,“抵消”,“時間戳”,‘timestampType’,‘鑰匙’,‘valueSchemaId’,‘fixedValue’))顯示(df)
異常堆棧的頂部
kafkashaded.org.apache.kafka.common.KafkaException:未能構建卡夫卡消費者
在kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer。< init > (KafkaConsumer.java: 823)
在kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer。< init > (KafkaConsumer.java: 665)
在kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer。< init > (KafkaConsumer.java: 613)
org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer (ConsumerStrategy.scala: 107)