你好,
我想onApplicationEnd執行自定義代碼。Outisde磚,我用火花偵聽器onApplicationEnd沒有問題。
但它不是在磚(我試著聽眾onJobEnd和這個工作)。
我也試過火花插件,和司機的日誌在init()方法有,但不關閉的:
包com。貝加爾湖java.util進口。{地圖= > JMap} org.apache.spark進口。火花Context import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, PluginContext, SparkPlugin} import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd} import org.slf4j.LoggerFactory import scala.collection.JavaConverters._ object Hello { def main(args: Array[String]): Unit = { val listenerEnd = new OnApplicationEnd import org.apache.spark.sql.SparkSession val spark = SparkSession.builder() .appName("Spark") .getOrCreate(); spark.sparkContext.addSparkListener(listenerEnd) spark.sparkContext.parallelize(Array(1,2,3,4,5,6,7,8,9,10)).count() } } class OnApplicationEnd extends SparkListener { lazy val logger = LoggerFactory.getLogger(getClass) override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = { logger.info("Hellooooooooooooooo") println("Hellooooooooooooooo") throw new Exception("FAILEEEEEED") } } class DemoPlugin extends SparkPlugin { lazy val logger = LoggerFactory.getLogger(getClass) override def driverPlugin(): DriverPlugin = { new DriverPlugin() { override def init(sc: SparkContext, myContext: PluginContext): JMap[String, String] = { println("---------------------------") println("INIIIIIIIIIIIIIIIIIIIIIIIIIIITTTTTTTTTTTTTTTTT") Map.empty[String, String].asJava } override def shutdown(): Unit = { logger.info("Hellooooooooooooooo") println("Hellooooooooooooooo") throw new Exception("FAILEEEEEED") super.shutdown() } } } override def executorPlugin(): ExecutorPlugin = { new ExecutorPlugin { } } }
我增加了這個配置集群配置:
火花。插件com.baikal。DemoPlugin火花。extraListeners com.baikal.OnApplicationEnd, com.databricks.backend.daemon.driver.DBCEventLoggingListener
就像我說的,這個插件的初始化工作和onJobEnd偵聽器。但我不工作在應用程序結束/關閉。
什麼好主意嗎?
謝謝