取消
顯示的結果
而不是尋找
你的意思是:

創建一個dataframe foreachPartition內的所有api請求的響應

Sandesh87
新的貢獻者三世

我試圖執行一個api調用從amazon s3對象(json),我使用foreachPartition並行執行多個調用

df.rdd。foreachPartition(分區= >{/ /初始化列表緩衝區var buffer_accounts1 = new ListBuffer [String]() / /初始化連接到amazon s3 val s3 = s3clientConnection()分區。foreach(有趣= > {/ / api來獲得對象從s3 bucket / /每一行的第一列包含s3對象名稱val obj = getS3Object (s3,“my_bucket fun.getString (0))。getContent val objString = IOUtils。toString (obj,“utf - 8”) buffer_accounts1 + = objString}) buffer_accounts1.toList.toDF .write.parquet(“對象”)(“dbfs: / mnt /測試”)})

從foreachPartition我想存儲字符串響應的所有api調用到單個dataframe。如果在我forEachPartition如果我讓100 api調用我想創建一個dataframe所有100份回複。

為此我創建一個可變列表內,想把它轉換成dataframe foreachPartition但我們不能創建一個dataframe以外的司機。

我想創建一個dataframe foreachPartition內總api調用的響應,這樣我可以申請進一步的轉換。這是如何實現的呢?

注意:——我可以寫每個磁盤為json和讀回,但導致性能下降,因為大量的磁盤I / O操作。

1接受解決方案

接受的解決方案

Sandesh87
新的貢獻者三世

它可以通過使用mapPartitions

val df_response = df。mapPartitions(迭代器= > {val api_connect = new s3clientBuild () val s3client = api_connect。s3connection (AccessKey SecretKey) val res =迭代器。地圖(行= > {val name = getS3 (row.getString (0) s3client)(名稱)})res}) .toDF(“價值”)

在原帖子查看解決方案

2回答2

Sandesh87
新的貢獻者三世

它可以通過使用mapPartitions

val df_response = df。mapPartitions(迭代器= > {val api_connect = new s3clientBuild () val s3client = api_connect。s3connection (AccessKey SecretKey) val res =迭代器。地圖(行= > {val name = getS3 (row.getString (0) s3client)(名稱)})res}) .toDF(“價值”)

jose_gonzalez
主持人
主持人

嗨@Sandesh Puligundla,

謝謝你分享的解決方案。我們將其標記為“最好”的反應,在未來是另一個用戶有相同的問題,他們將能夠找到解決方案。

歡迎來到磚社區:讓學習、網絡和一起慶祝

加入我們的快速增長的數據專業人員和專家的80 k +社區成員,準備發現,幫助和合作而做出有意義的聯係。

點擊在這裏注冊今天,加入!

參與令人興奮的技術討論,加入一個組與你的同事和滿足我們的成員。

Baidu
map