取消
顯示的結果
而不是尋找
你的意思是:

錯誤日誌宇宙db

Sandesh87
新的貢獻者三世

目的:檢索對象從一個S3 bucket使用得到的api調用,檢索對象寫入azure datalake和在發生錯誤的情況下,就像404年代(對象未找到)宇宙將錯誤信息寫入數據庫

“my_dataframe”由一列(s3ObjectName)對象名稱:-

| s3ObjectName |

| a1.json |

| b2.json |

| c3.json |

| d4.json |

| e5.json |

/ /重試失敗的事件函數,寫宇宙錯誤def重試[T] (n: Int) (fn: = > T): T ={嚐試{fn返回}匹配{成功(x) = > x案件失敗(T: Throwable) = > {thread . sleep(1000)如果(n > 1){重試(n - 1) (fn)}其他{val loggerDf = Seq ((t.toString)) .toDF .withColumn(“描述”)(“類型”,點燃(“失敗”)).withColumn (“id”, uuid ()) loggerDf.write.format (cosmos.oltp) .options (ExceptionCfg) .mode(“追加”).save()把T}}}} / /執行s3 api調用my_dataframe.rdd。foreachPartition(分區= > {val信譽= new BasicAWSCredentials (AccessKey, SecretKey) val clientRegion:區域=區域。US_EAST_1 val s3client = AmazonS3ClientBuilder.standard () .withRegion (clientRegion) .withCredentials(新AWSStaticCredentialsProvider(信譽)).build()分區。foreach (x = >{重試(2){val objectKey = x.getString(0)我= s3client瓦爾。getObject (s3bucket_name objectKey)。getObjectContent val inputS3String = IOUtils。toString(我,“utf - 8”) val filePath = s = " $ {data_lake_file_path} " val文件新文件(filePath) val fileWriter = new fileWriter(文件)val bw = new BufferedWriter (fileWriter) bw.write (inputS3String) bw.close () fileWriter.close ()}}})

當執行上述結果在以下錯誤:-

引起的:java.lang.NullPointerException

這個錯誤發生在重試功能時創建dataframe問道loggerDf和寫宇宙

有另一種方式寫宇宙DB的錯誤消息嗎?

1接受解決方案

接受的解決方案

User16763506477
貢獻者三世

嗨@Sandesh Puligundla問題是您正在使用火花foreachpartition內部上下文。您可以創建一個dataframe隻有火花司機。引用一些堆棧溢出

https://stackoverflow.com/questions/46964250/nullpointerexception-creating-dataset-dataframe-inside-..。

https://stackoverflow.com/questions/40691086/how-to-use-sqlcontext-and-sparkcontext-inside-foreachpa..。

IIUC然後一個解決方案是使用“mapPartitions”和“地圖”內部函數調用重試函數並返回結果。這將創建一個新的dataframe狀態碼。然後你可以篩選失敗的和寫他們宇宙db。

示例代碼

val火花= SparkSession.builder()部分(“地方[*]”).getOrCreate () val my_dataframe = spark.range進口spark.implicits (10)。_ val df = my_dataframe。mapPartitions s3(迭代器= >{/ / /客戶端初始化迭代器。地圖(行= > {val arr =重試(火花){/ /如果執行文件操作(行% 2 = = 0)其他{1/0 / /這將導致異常}{行}}}的arr)}) .toDF (“uuid”,“狀態”,“消息”)/ /寫這df宇宙db df.filter($“地位”= = =“失敗”),告訴()}def重試[T] (n: Int,火花:SparkSession) (fn: = > T):(字符串,字符串,字符串)={嚐試{fn}匹配{成功案例(x) = > {(java.util.UUID.randomUUID () .toString,“成功”," ")}失敗案例(T: Throwable) = > {thread . sleep(1000)如果(n > 1){重試(n - 1),火花)(fn)}其他{(java.util.UUID.randomUUID () .toString,“失敗”,t.getMessage)}}}}

在原帖子查看解決方案

3回複3

匿名
不適用

你好!我的名字是風笛手和我的一個社區版主磚。謝謝你的問題。看起來很多人能從答案。謝謝你的耐心,我們等待響應。

User16763506477
貢獻者三世

嗨@Sandesh Puligundla問題是您正在使用火花foreachpartition內部上下文。您可以創建一個dataframe隻有火花司機。引用一些堆棧溢出

https://stackoverflow.com/questions/46964250/nullpointerexception-creating-dataset-dataframe-inside-..。

https://stackoverflow.com/questions/40691086/how-to-use-sqlcontext-and-sparkcontext-inside-foreachpa..。

IIUC然後一個解決方案是使用“mapPartitions”和“地圖”內部函數調用重試函數並返回結果。這將創建一個新的dataframe狀態碼。然後你可以篩選失敗的和寫他們宇宙db。

示例代碼

val火花= SparkSession.builder()部分(“地方[*]”).getOrCreate () val my_dataframe = spark.range進口spark.implicits (10)。_ val df = my_dataframe。mapPartitions s3(迭代器= >{/ / /客戶端初始化迭代器。地圖(行= > {val arr =重試(火花){/ /如果執行文件操作(行% 2 = = 0)其他{1/0 / /這將導致異常}{行}}}的arr)}) .toDF (“uuid”,“狀態”,“消息”)/ /寫這df宇宙db df.filter($“地位”= = =“失敗”),告訴()}def重試[T] (n: Int,火花:SparkSession) (fn: = > T):(字符串,字符串,字符串)={嚐試{fn}匹配{成功案例(x) = > {(java.util.UUID.randomUUID () .toString,“成功”," ")}失敗案例(T: Throwable) = > {thread . sleep(1000)如果(n > 1){重試(n - 1),火花)(fn)}其他{(java.util.UUID.randomUUID () .toString,“失敗”,t.getMessage)}}}}

嗨@Sandesh Puligundla,

如果@Gaurav Rupnar完全回答你的問題,你會很高興他的答案標記為最好,這樣其他人可以迅速找到解決方案了嗎?

歡迎來到磚社區:讓學習、網絡和一起慶祝

加入我們的快速增長的數據專業人員和專家的80 k +社區成員,準備發現,幫助和合作而做出有意義的聯係。

點擊在這裏注冊今天,加入!

參與令人興奮的技術討論,加入一個組與你的同事和滿足我們的成員。

Baidu
map