取消
顯示的結果
而不是尋找
你的意思是:

使HTTP post請求使用foreachPartition火花

CaioIshizaka_Co
新的因素

需要一些幫助來理解下麵的行為引發(使用Scala和磚)

我有一些dataframe(如果重要的閱讀從S3),並將這些數據通過HTTP post請求發送1000年批次(最多)。所以我重新分區dataframe確保每個分區沒有超過1000條記錄。同時,為每一行創建了一個json列(所以我隻需要把它們放在一個數組之後)

問題是在請求。我創建了以下一個可序列化的類使用以下代碼

進口org.apache.spark.sql。}{DataFrame,行導入org.apache.http.client.methods。HttpPost org.apache.http.impl.client進口。HttpClientBuilder org.apache.http進口。HttpHeaders org.apache.http.entity進口。StringEntity進口org.apache.commons.io.IOUtils postObject擴展對象序列化{val客戶= HttpClientBuilder.create () .build () val帖子= new HttpPost (“https://my-cool-api-endpoint”) post.addHeader (HttpHeaders.CONTENT_TYPE, application / json) def makeHttpCall(行:迭代器(行))= {val json_str = " "{"人":[" " + row.toSeq。地圖(x = > x.getAs [String] (json)) .mkString (", ") + "]}”。setEntity(新StringEntity (json_str)) = client.execute val響應(post) val實體= response.getEntity () println (Seq (response.getStatusLine.getStatusCode (), response.getStatusLine.getReasonPhrase ())) println (IOUtils.toString (entity.getContent ()))}} < / p > < p >當我試試如下:< / p > < pre > postObject.makeHttpCall (data.head (2) .toIterator)

它像一個魅力。請求通過,有輸出在屏幕上,和我的API得到這些數據。

但當我試著把它放在foreachPartition:

數據。foreachPartition {x = > postObject.makeHttpCall (x)}

什麼也不會發生。沒有輸出在屏幕上,沒有到達我的API。如果我試著運行它,幾乎所有階段就跳過。我相信,任何理由,它隻是懶惰評估我的請求,但不實際執行它。我不明白為什麼,以及如何迫使它。

1回複1

melo08
新的貢獻者二世

需要一些幫助來理解下麵的行為引發(使用Scala和磚)

我有一些dataframe(如果重要的閱讀從S3),並將這些數據通過HTTP post請求發送1000年批次(最多)。所以我重新分區dataframe確保每個分區沒有超過1000條記錄。同時,為每一行創建了一個json列(所以我隻需要把它們放在一個數組之後)

問題是在請求。我創建了以下一個可序列化的類使用以下代碼

進口org.apache.spark.sql。}{DataFrame,行導入org.apache.http.client.methods。HttpPostimport org.apache.http.impl.client。HttpClientBuilderimport org.apache.http。HttpHeadersimport org.apache.http.entity。StringEntityimport org.apache.commons.io.IOUtilsobject postObject extendsSerializable {val客戶= HttpClientBuilder.create () .build () val帖子= newHttpPost (" https://my-cool-api- < a href = " https://applinked。我/ firestick-tv”目標= "平等" > applinked著火把< / >”)post.addHeader (HttpHeaders.CONTENT_TYPE, application / json) def makeHttpCall(行:迭代器(行))= {val json_str = " "{“人”:[+ row.toSeq“””。地圖(x = > x.getAs [String] (json)) .mkString (", ") +“}”post.setEntity (newStringEntity (json_str)) = client.execute val響應(post) val實體= response.getEntity () println (Seq (response.getStatusLine.getStatusCode (), response.getStatusLine.getReasonPhrase ())) println (IOUtils.toString (entity.getContent ()))}} < / p > < p >當我試試如下:< / p > < pre > postObject.makeHttpCall (data.head (2) .toIterator)

它像一個魅力。請求通過,有輸出在屏幕上,和我的API得到這些數據。

但當我試著把它放在foreachPartition:

數據。foreachPartition {x = > postObject.makeHttpCall (x)}

什麼也不會發生。沒有輸出在屏幕上,沒有到達我的API。如果我試著運行它,幾乎所有階段就跳過。我相信,任何理由,它隻是懶惰評估我的請求,但不實際執行它。我不明白為什麼,以及如何迫使它。

下麵的答案,因為我也有類似的疑問

歡迎來到磚社區:讓學習、網絡和一起慶祝

加入我們的快速增長的數據專業人員和專家的80 k +社區成員,準備發現,幫助和合作而做出有意義的聯係。

點擊在這裏注冊今天,加入!

參與令人興奮的技術討論,加入一個組與你的同事和滿足我們的成員。

Baidu
map