我試圖執行一個api調用從amazon s3對象(json),我使用foreachPartition並行執行多個調用
df.rdd。foreachPartition(分區= >{/ /初始化列表緩衝區var buffer_accounts1 = new ListBuffer [String]() / /初始化連接到amazon s3 val s3 = s3clientConnection()分區。foreach(有趣= > {/ / api來獲得對象從s3 bucket / /每一行的第一列包含s3對象名稱val obj = getS3Object (s3,“my_bucket fun.getString (0))。getContent val objString = IOUtils。toString (obj,“utf - 8”) buffer_accounts1 + = objString}) buffer_accounts1.toList.toDF .write.parquet(“對象”)(“dbfs: / mnt /測試”)})
從foreachPartition我想存儲字符串響應的所有api調用到單個dataframe。如果在我forEachPartition如果我讓100 api調用我想創建一個dataframe所有100份回複。
為此我創建一個可變列表內,想把它轉換成dataframe foreachPartition但我們不能創建一個dataframe以外的司機。
我想創建一個dataframe foreachPartition內總api調用的響應,這樣我可以申請進一步的轉換。這是如何實現的呢?
注意:——我可以寫每個磁盤為json和讀回,但導致性能下降,因為大量的磁盤I / O操作。
它可以通過使用mapPartitions
val df_response = df。mapPartitions(迭代器= > {val api_connect = new s3clientBuild () val s3client = api_connect。s3connection (AccessKey SecretKey) val res =迭代器。地圖(行= > {val name = getS3 (row.getString (0) s3client)(名稱)})res}) .toDF(“價值”)
它可以通過使用mapPartitions
val df_response = df。mapPartitions(迭代器= > {val api_connect = new s3clientBuild () val s3client = api_connect。s3connection (AccessKey SecretKey) val res =迭代器。地圖(行= > {val name = getS3 (row.getString (0) s3client)(名稱)})res}) .toDF(“價值”)