目的:檢索對象從一個S3 bucket使用得到的api調用,檢索對象寫入azure datalake和在發生錯誤的情況下,就像404年代(對象未找到)宇宙將錯誤信息寫入數據庫
“my_dataframe”由一列(s3ObjectName)對象名稱:-
| s3ObjectName |
| a1.json |
| b2.json |
| c3.json |
| d4.json |
| e5.json |
/ /重試失敗的事件函數,寫宇宙錯誤def重試[T] (n: Int) (fn: = > T): T ={嚐試{fn返回}匹配{成功(x) = > x案件失敗(T: Throwable) = > {thread . sleep(1000)如果(n > 1){重試(n - 1) (fn)}其他{val loggerDf = Seq ((t.toString)) .toDF .withColumn(“描述”)(“類型”,點燃(“失敗”)).withColumn (“id”, uuid ()) loggerDf.write.format (cosmos.oltp) .options (ExceptionCfg) .mode(“追加”).save()把T}}}} / /執行s3 api調用my_dataframe.rdd。foreachPartition(分區= > {val信譽= new BasicAWSCredentials (AccessKey, SecretKey) val clientRegion:區域=區域。US_EAST_1 val s3client = AmazonS3ClientBuilder.standard () .withRegion (clientRegion) .withCredentials(新AWSStaticCredentialsProvider(信譽)).build()分區。foreach (x = >{重試(2){val objectKey = x.getString(0)我= s3client瓦爾。getObject (s3bucket_name objectKey)。getObjectContent val inputS3String = IOUtils。toString(我,“utf - 8”) val filePath = s = " $ {data_lake_file_path} " val文件新文件(filePath) val fileWriter = new fileWriter(文件)val bw = new BufferedWriter (fileWriter) bw.write (inputS3String) bw.close () fileWriter.close ()}}})
當執行上述結果在以下錯誤:-
引起的:java.lang.NullPointerException
這個錯誤發生在重試功能時創建dataframe問道loggerDf和寫宇宙
有另一種方式寫宇宙DB的錯誤消息嗎?
嗨@Sandesh Puligundla問題是您正在使用火花foreachpartition內部上下文。您可以創建一個dataframe隻有火花司機。引用一些堆棧溢出
IIUC然後一個解決方案是使用“mapPartitions”和“地圖”內部函數調用重試函數並返回結果。這將創建一個新的dataframe狀態碼。然後你可以篩選失敗的和寫他們宇宙db。
示例代碼
val火花= SparkSession.builder()部分(“地方[*]”).getOrCreate () val my_dataframe = spark.range進口spark.implicits (10)。_ val df = my_dataframe。mapPartitions s3(迭代器= >{/ / /客戶端初始化迭代器。地圖(行= > {val arr =重試(火花){/ /如果執行文件操作(行% 2 = = 0)其他{1/0 / /這將導致異常}{行}}}的arr)}) .toDF (“uuid”,“狀態”,“消息”)/ /寫這df宇宙db df.filter($“地位”= = =“失敗”),告訴()}def重試[T] (n: Int,火花:SparkSession) (fn: = > T):(字符串,字符串,字符串)={嚐試{fn}匹配{成功案例(x) = > {(java.util.UUID.randomUUID () .toString,“成功”," ")}失敗案例(T: Throwable) = > {thread . sleep(1000)如果(n > 1){重試(n - 1),火花)(fn)}其他{(java.util.UUID.randomUUID () .toString,“失敗”,t.getMessage)}}}}
嗨@Sandesh Puligundla問題是您正在使用火花foreachpartition內部上下文。您可以創建一個dataframe隻有火花司機。引用一些堆棧溢出
IIUC然後一個解決方案是使用“mapPartitions”和“地圖”內部函數調用重試函數並返回結果。這將創建一個新的dataframe狀態碼。然後你可以篩選失敗的和寫他們宇宙db。
示例代碼
val火花= SparkSession.builder()部分(“地方[*]”).getOrCreate () val my_dataframe = spark.range進口spark.implicits (10)。_ val df = my_dataframe。mapPartitions s3(迭代器= >{/ / /客戶端初始化迭代器。地圖(行= > {val arr =重試(火花){/ /如果執行文件操作(行% 2 = = 0)其他{1/0 / /這將導致異常}{行}}}的arr)}) .toDF (“uuid”,“狀態”,“消息”)/ /寫這df宇宙db df.filter($“地位”= = =“失敗”),告訴()}def重試[T] (n: Int,火花:SparkSession) (fn: = > T):(字符串,字符串,字符串)={嚐試{fn}匹配{成功案例(x) = > {(java.util.UUID.randomUUID () .toString,“成功”," ")}失敗案例(T: Throwable) = > {thread . sleep(1000)如果(n > 1){重試(n - 1),火花)(fn)}其他{(java.util.UUID.randomUUID () .toString,“失敗”,t.getMessage)}}}}
嗨@Sandesh Puligundla,
如果@Gaurav Rupnar完全回答你的問題,你會很高興他的答案標記為最好,這樣其他人可以迅速找到解決方案了嗎?