取消
顯示的結果
而不是尋找
你的意思是:

試著捕捉多個流寫在工作

AdamRink
新的貢獻者三世

我們有問題檢查站和模式版本的日期(不知道為什麼),但它會導致工作失敗。我們有工作運行15 - 30流查詢,所以如果一個失敗,產生了一個問題。我想使檢查點錯誤,重新設置檢查點和日誌失敗。不是最優,因為我再處理流或者至少我們看的窗口。

所以我的問題就是似乎錯誤陷阱流的唯一方法是使用awaitTermination ()……這個鎖筆記本和下一個流才開始第一個流是終止。awaitAnyTermination()不會趕上工作開始時,出現錯誤,因為之前的工作停止awaitAnyTermination嗎?

1接受解決方案

接受的解決方案

Kaniz
社區經理
社區經理

推特)嗨@溜冰場,

解決方案1:

既然你要求awaitTermination在第一次查詢時它會阻塞,直到它開始前完成第二個查詢。

所以如果你想啟動所有查詢,然後使用:-

StreamingQueryManager.awaitAnyTermination
val query1 = df.writeStream.start () val query2 = df.writeStream.start () val query3 ....................................spark.streams.awaitAnyTermination ()

除了上麵的,默認情況下火花使用FIFO調度程序。這意味著第一個查詢被集群中的所有資源而執行。因為你試圖運行多個並發查詢你應該切換到公平調度器

如果你有一些查詢,應該比其他人更多的資源,那麼你也可以調整個人調度器池。

解決方案2:-

val query1 = ds.writeSteam。{…} .start () val query2 = ds.writeSteam。{…} .start () val query3 = ds.writeSteam。{…} .start () query3.awaitTermination ()

AwaitTermination ()會擋住你的過程,直到完成,這永遠不會發生在一個流媒體應用,稱其在你最後的查詢應該可以解決你的問題。

解決方案3:

如果退出任何查詢,使用:-

spark.streams.awaitAnyTermination ()

如果退出所有查詢:-

選項1:

val query1 = ds.writeSteam。{…} .start () val query2 = ds.writeSteam。{…} .start () val query3 ...........query1.awaitTermination ();query2.awaitTermination ();query3 ................

選項2:

val query1 = ds.writeSteam。{…} .start () val query2 = ds.writeSteam。{…} .start spark.streams.active ()。foreach (x = > x.awaitTermination ())

選項3:

而(! spark.streams.active.isEmpty) {println (+ spark.streams.active“查詢目前仍然活躍:”。地圖(x = > x.name) .mkString (", ")) spark.streams.awaitAnyTermination () spark.streams.resetTerminated ()}

在原帖子查看解決方案

5回複5

Kaniz
社區經理
社區經理

推特)嗨@溜冰場!我的名字叫Kaniz,我這裏的技術主持人。很高興認識你,謝謝你的問題!看看你的同行在社區中有一個回答你的問題。否則我將盡快給你回電。謝謝。

AdamRink
新的貢獻者三世

Thx Kaniz

Kaniz
社區經理
社區經理

推特)嗨@溜冰場,

解決方案1:

既然你要求awaitTermination在第一次查詢時它會阻塞,直到它開始前完成第二個查詢。

所以如果你想啟動所有查詢,然後使用:-

StreamingQueryManager.awaitAnyTermination
val query1 = df.writeStream.start () val query2 = df.writeStream.start () val query3 ....................................spark.streams.awaitAnyTermination ()

除了上麵的,默認情況下火花使用FIFO調度程序。這意味著第一個查詢被集群中的所有資源而執行。因為你試圖運行多個並發查詢你應該切換到公平調度器

如果你有一些查詢,應該比其他人更多的資源,那麼你也可以調整個人調度器池。

解決方案2:-

val query1 = ds.writeSteam。{…} .start () val query2 = ds.writeSteam。{…} .start () val query3 = ds.writeSteam。{…} .start () query3.awaitTermination ()

AwaitTermination ()會擋住你的過程,直到完成,這永遠不會發生在一個流媒體應用,稱其在你最後的查詢應該可以解決你的問題。

解決方案3:

如果退出任何查詢,使用:-

spark.streams.awaitAnyTermination ()

如果退出所有查詢:-

選項1:

val query1 = ds.writeSteam。{…} .start () val query2 = ds.writeSteam。{…} .start () val query3 ...........query1.awaitTermination ();query2.awaitTermination ();query3 ................

選項2:

val query1 = ds.writeSteam。{…} .start () val query2 = ds.writeSteam。{…} .start spark.streams.active ()。foreach (x = > x.awaitTermination ())

選項3:

而(! spark.streams.active.isEmpty) {println (+ spark.streams.active“查詢目前仍然活躍:”。地圖(x = > x.name) .mkString (", ")) spark.streams.awaitAnyTermination () spark.streams.resetTerminated ()}

AdamRink
新的貢獻者三世

問題是,在啟動時如果流失敗,不會撞到awaitAnyTermination嗎?我幾乎想要while循環,把它放在一個後台線程開始開始,然後火之後所有的流…不知道這是可能的嗎?

歡迎來到磚社區:讓學習、網絡和一起慶祝

加入我們的快速增長的數據專業人員和專家的80 k +社區成員,準備發現,幫助和合作而做出有意義的聯係。

點擊在這裏注冊今天,加入!

參與令人興奮的技術討論,加入一個組與你的同事和滿足我們的成員。

Baidu
map