進口org.apache.spark.sql._
進口scala.collection.JavaConverters._
進口com.microsoft.azure.eventhubs._
進口java.util.concurrent._
進口scala.collection.immutable._
進口org.apache.spark.eventhubs._
進口scala.concurrent.Future
進口scala.concurrent.ExecutionContext.Implicits.global
進口org.apache.spark.sql.streaming。{OutputMode,觸發}
進口scala.concurrent.duration._
var testConnectionstr = "連接字符串"
val參數= EventHubsConf (testConnectionstr) .setMaxEventsPerTrigger (5)
val df = spark.readStream.format(“δ”).table (“gold.redemption”)
val ds = df
.selectExpr (“RedemptionId”、“ProgramId”、“ClaimsPK_CL_FILEID”)
.writeStream
.format (“eventhubs”)
.options (parameters.toMap)
.option (“startingOffsets”、“最新”)
.option (“checkpointLocation”、“路徑/ /檢查點/ dir”)
.start ()
錯誤日誌——
ava.lang。NoSuchMethodError: org.apache.spark.sql.AnalysisException。< init > (Ljava / lang / String; Lscala /選項;Lscala /選項;Lscala /選項;Lscala /選項;)V
在org.apache.spark.sql.eventhubs.EventHubsWriter。美元anonfun validateQuery 2美元(EventHubsWriter.scala: 53)
scala.Option.getOrElse (Option.scala: 189)
org.apache.spark.sql.eventhubs.EventHubsWriter .validateQuery美元(EventHubsWriter.scala: 53)
org.apache.spark.sql.eventhubs.EventHubsWriter .write美元(EventHubsWriter.scala: 70)
org.apache.spark.sql.eventhubs.EventHubsSink.addBatch (EventHubsSink.scala: 39)
在org.apache.spark.sql.execution.streaming.MicroBatchExecution。anonfun runBatch美元17美元(MicroBatchExecution.scala: 805)
在org.apache.spark.sql.execution.SQLExecution。美元anonfun withCustomExecutionEnv 8美元(SQLExecution.scala: 240)
org.apache.spark.sql.execution.SQLExecution .withSQLConfPropagated美元(SQLExecution.scala: 388)
在org.apache.spark.sql.execution.SQLExecution。美元anonfun withCustomExecutionEnv 1美元(SQLExecution.scala: 187)
org.apache.spark.sql.SparkSession.withActive (SparkSession.scala: 973)
org.apache.spark.sql.execution.SQLExecution .withCustomExecutionEnv美元(SQLExecution.scala: 142)
org.apache.spark.sql.execution.SQLExecution .withNewExecutionId美元(SQLExecution.scala: 338)
org.apache.spark.sql.execution.streaming.MicroBatchExecution。anonfun runBatch美元16美元(MicroBatchExecution.scala: 803)
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken (ProgressReporter.scala: 320)
在org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken (ProgressReporter.scala: 318美元)
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken (StreamExecution.scala: 73)
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch (MicroBatchExecution.scala: 803)
在org.apache.spark.sql.execution.streaming.MicroBatchExecution。anonfun runActivatedStreamWithListener美元5美元(MicroBatchExecution.scala: 339)
org.apache.spark.sql.execution.streaming.MicroBatchExecution.withSchemaEvolution (MicroBatchExecution.scala: 904)
在org.apache.spark.sql.execution.streaming.MicroBatchExecution。anonfun runActivatedStreamWithListener美元2美元(MicroBatchExecution.scala: 336)
在scala.runtime.java8.JFunction0專門sp.apply美元(美元JFunction0 mcV $ sp.java: 23)
.options (parameters.toMap)
.option (“startingOffsets”、“最新”)
.option (“checkpointLocation”、“路徑/ /檢查點/ dir”)
.start ()
dataframe寫需要有以下模式:
專欄|類型- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -身體(必需)|字符串或二進製partitionId(*可選)|字符串partitionKey(*可選)|字符串
這對我來說工作(pyspark版本):
df。withColumn(“身體”,F。to_json (F.struct (* df.columns)選項= {“ignoreNullFields”:假}))\ .select(身體)\ .write \ .format (eventhubs) \ .options (* * ehconf) \ .save ()