我有getS3Object函數(json)位於aws s3對象
client_connect擴展對象序列化{val s3_get_path = " / dbfs / mnt / s3response“def getS3Objects (s3ObjectName:字符串,s3Client: AmazonS3):字符串s = {val objectKey = " $ {s3ObjectName} " val inputS3Stream = s3Client。getObject (myS3Bucket objectKey)。getObjectContent val inputS3String = IOUtils。toString (inputS3Stream,“utf - 8”) val filePath = s“$ {s3_get_path} / $ {objectKey} " val =新文件(filePath) val fileWriter = new fileWriter(文件)val bw = new BufferedWriter (fileWriter) bw.write (inputS3String) bw.close () fileWriter.close () inputS3String}}
攝取使用消息流框架
源流dataframe source_df的讀取從azure事件中心看起來像下麵
身體| |
| 8 c44f2715ab81c16ecb31d527e18465d.json ~ 2021-05-26 ~ 13-14-56 ~哦|
| a4f9e914c1a40e5828b0eb129b1234b2.json ~ 2022 = 05-09 ~ 15-12-22 ~ |
“身體”列包含字符串值由“~”分隔開的,第一個元素的對象id作為參數傳遞到getS3Object函數
該函數的第二個參數是S3client用於連接aws S3是一個可序列化的類內定義的。
最終類s3clientBuild()擴展了可序列化的{def s3connection (AccessKey:字符串,SecretKey: String) = {val clientRegion:區域=區域。US_EAST_1 val信譽= new BasicAWSCredentials (AccessKey SecretKey) AmazonS3ClientBuilder.standard () .withRegion (clientRegion) .withCredentials(新AWSStaticCredentialsProvider(信譽)).build ()}}
val AccessKey = dbutils.secrets。得到(=“範圍”,範圍鍵=“AccessKey-ID”)
val SecretKey = dbutils.secrets。得到(=“範圍”,範圍鍵=“AccessKey-Secret”)
寫流:
val streamWriter = source_df .writeStream .queryName .option (“Write_stream”) (“checkpointLocation”, chk_pt) .trigger(觸發器。ProcessingTime(3秒)).outputMode(“追加”).foreachBatch ((batchDF: DataFrame batchId:長)= > {batchDF.persist嚐試()val object_df = batchDF.select(分裂(坳(“身體”)、“~”).getItem (0)。as (“ObjectID”)) val df_response = object_df.repartition (2)。mapPartitions(迭代器= > {val api_connect = new s3clientBuild () val s3client = api_connect。s3connection (AccessKey SecretKey) val resp =迭代器。地圖(行= > {val name = cli_connector.getS3Objects (row.getString (0) s3client)(名稱)})resp}) .toDF(“價值”).select (from_json(“價值”.cast美元(“字符串”),MySchema)作為“字段”).select(“。*”美元)df_response.count () batchDF.unpersist()}與{成功(_)= >案件失敗(e) = >{扔e}})
然而我得到以下錯誤信息:-
任務不是序列化:. io .NotSerializableException: org.apache.spark.sql.streaming.DataStreamWriter序列化stack: - object not serializable (class: org.apache.spark.sql.streaming.DataStreamWriter, value: org.apache.spark.sql.streaming.DataStreamWriter@1f1c5f4f) - field (class: $lineeabf6de089d548a29e8a43ad48edbc49125.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw, name: sensorwriter_0526, type: class org.apache.spark.sql.streaming.DataStreamWriter) - object (class $lineeabf6de089d548a29e8a43ad48edbc49125.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw, $lineeabf6de089d548a29e8a43ad48edbc49125.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw@41c5f54f) - element of array (index: 0) - array (class [Ljava.lang.Object;, size 1) - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;) - object (class java.lang.invoke.SerializedLambda)
該如何解決呢?
@Sandesh Puligundla嗯很難說,但我確信那是因為你寫的對象。
請記住,引發分布。
你可能想查看這些鏈接:
https://stackoverflow.com/questions/40596871/how-spark-handles-object
不是一個實際的回答你的問題,對不起。但是這個錯誤很難查明(對我而言,希望一些好的程序員可以解決這個問題)。