推特)嗨@溜冰場,
解決方案1:
既然你要求awaitTermination在第一次查詢時它會阻塞,直到它開始前完成第二個查詢。
所以如果你想啟動所有查詢,然後使用:-
StreamingQueryManager.awaitAnyTermination
val query1 = df.writeStream.start () val query2 = df.writeStream.start () val query3 ....................................spark.streams.awaitAnyTermination ()
除了上麵的,默認情況下火花使用FIFO調度程序。這意味著第一個查詢被集群中的所有資源而執行。因為你試圖運行多個並發查詢你應該切換到公平調度器。
如果你有一些查詢,應該比其他人更多的資源,那麼你也可以調整個人調度器池。
解決方案2:-
val query1 = ds.writeSteam。{…} .start () val query2 = ds.writeSteam。{…} .start () val query3 = ds.writeSteam。{…} .start () query3.awaitTermination ()
AwaitTermination ()會擋住你的過程,直到完成,這永遠不會發生在一個流媒體應用,稱其在你最後的查詢應該可以解決你的問題。
解決方案3:
如果退出任何查詢,使用:-
spark.streams.awaitAnyTermination ()
如果退出所有查詢:-
選項1:
val query1 = ds.writeSteam。{…} .start () val query2 = ds.writeSteam。{…} .start () val query3 ...........query1.awaitTermination ();query2.awaitTermination ();query3 ................
選項2:
val query1 = ds.writeSteam。{…} .start () val query2 = ds.writeSteam。{…} .start spark.streams.active ()。foreach (x = > x.awaitTermination ())
選項3:
而(! spark.streams.active.isEmpty) {println (+ spark.streams.active“查詢目前仍然活躍:”。地圖(x = > x.name) .mkString (", ")) spark.streams.awaitAnyTermination () spark.streams.resetTerminated ()}
推特)嗨@溜冰場,
解決方案1:
既然你要求awaitTermination在第一次查詢時它會阻塞,直到它開始前完成第二個查詢。
所以如果你想啟動所有查詢,然後使用:-
StreamingQueryManager.awaitAnyTermination
val query1 = df.writeStream.start () val query2 = df.writeStream.start () val query3 ....................................spark.streams.awaitAnyTermination ()
除了上麵的,默認情況下火花使用FIFO調度程序。這意味著第一個查詢被集群中的所有資源而執行。因為你試圖運行多個並發查詢你應該切換到公平調度器。
如果你有一些查詢,應該比其他人更多的資源,那麼你也可以調整個人調度器池。
解決方案2:-
val query1 = ds.writeSteam。{…} .start () val query2 = ds.writeSteam。{…} .start () val query3 = ds.writeSteam。{…} .start () query3.awaitTermination ()
AwaitTermination ()會擋住你的過程,直到完成,這永遠不會發生在一個流媒體應用,稱其在你最後的查詢應該可以解決你的問題。
解決方案3:
如果退出任何查詢,使用:-
spark.streams.awaitAnyTermination ()
如果退出所有查詢:-
選項1:
val query1 = ds.writeSteam。{…} .start () val query2 = ds.writeSteam。{…} .start () val query3 ...........query1.awaitTermination ();query2.awaitTermination ();query3 ................
選項2:
val query1 = ds.writeSteam。{…} .start () val query2 = ds.writeSteam。{…} .start spark.streams.active ()。foreach (x = > x.awaitTermination ())
選項3:
而(! spark.streams.active.isEmpty) {println (+ spark.streams.active“查詢目前仍然活躍:”。地圖(x = > x.name) .mkString (", ")) spark.streams.awaitAnyTermination () spark.streams.resetTerminated ()}