在筆記本中模塊化或鏈接代碼
本文描述了如何使用Databricks筆記本來編寫使用模塊化代碼、鏈接或嵌入式筆記本以及if-then-else邏輯的複雜工作流。
模塊化或鏈接筆記本的方法
的運行%命令允許在一個筆記本中包含另一個筆記本。你可以用運行%
將代碼模塊化,例如將支持函數放在單獨的筆記本中。您還可以使用它來連接實現分析步驟的筆記本。當你使用運行%
,被調用的筆記本立即執行,其中定義的函數和變量在調用筆記本中可用。
筆記本工作流程是對運行%
因為它們允許您向筆記本電腦傳遞參數並從筆記本電腦返回值。這允許您構建具有依賴關係的複雜工作流和管道。例如,您可以在一個目錄中獲得文件列表,並將名稱傳遞給另一個筆記本,這是不可能的運行%
.您還可以基於返回值創建if-then-else工作流,或者使用相對路徑調用其他筆記本。
要實現筆記本工作流程,請使用dbutils.notebook。*
方法。不像運行%
,dbutils.notebook.run ()
方法啟動一個新作業來運行筆記本。
這些方法,像所有的dbutils
api,隻在Python和Scala中可用。但是,你可以使用dbutils.notebook.run ()
調用一個R筆記本。
警告
基於筆記本工作流程的工作必須在30天或更短時間內完成。不支持基於模塊化或鏈接筆記本任務的長時間運行的作業。
API
中可用的方法dbutils.notebook
構建筆記本工作流程的API是:運行
而且退出
.參數和返回值都必須是字符串。
運行(路徑:字符串,timeout_seconds:int,參數:地圖):字符串
運行一個筆記本並返回它的退出值。該方法啟動一個立即運行的臨時作業。
的timeout_seconds
參數控製運行的超時時間(0表示沒有超時):調用運行
如果未在指定時間內完成,則拋出異常。如果Databricks宕機超過10分鍾,無論如何,運行筆記本都會失敗timeout_seconds
.
的參數
參數設置目標筆記本的小部件值。具體來說,如果您正在運行的筆記本有一個名為一個
,然後傳遞一個鍵值對(“A”:“B”)
參數的參數的一部分run ()
調用,然後檢索小部件的值一個
將返回“B”
.中可以找到關於創建和使用小部件的說明磚小部件篇文章。
警告
的參數
參數隻接受拉丁字符(ASCII字符集)。使用非ascii字符將返回錯誤。無效的非ascii字符的例子有中文、日文漢字和表情符號。
運行
使用
dbutils.筆記本.運行(“notebook-name”,60,{“參數”:“數據”,“argument2”:“data2”,...})
dbutils.筆記本.運行(“notebook-name”,60,地圖(“參數”->“數據”,“argument2”->“data2”,…))
運行
例子
假設你有一本叫做工作流
使用一個名為噴火
打印小部件的值:
dbutils.小部件.文本(“foo”,“fooDefault”,“fooEmptyLabel”)打印dbutils.小部件.得到(“foo”)
運行dbutils.notebook.run(“工作流”,60歲,{“foo”:“酒吧”})
產生以下結果:
小部件具有您通過工作流傳入的值,“酒吧”
,而不是默認值。
退出(價值:字符串):無效
退出帶有值的筆記本。如果你把筆記本叫做運行
方法,這是返回的值。
dbutils.筆記本.退出(“returnValue”)
調用dbutils.notebook.exit
在一份工作中使筆記本成功完成。如果希望導致作業失敗,則拋出異常。
例子
在下麵的示例中,將參數傳遞給DataImportNotebook
並運行不同的筆記本(DataCleaningNotebook
或ErrorHandlingNotebook
的結果DataImportNotebook
.
當筆記本工作流運行時,你會看到一個到正在運行的筆記本的鏈接:
點擊筆記本鏈接筆記本作業#xxxx查詢運行的詳細信息。
傳遞結構化數據
本節演示如何在筆記本之間傳遞結構化數據。
#示例1 -通過臨時視圖返回數據。使用dbutls .notebook.exit()隻能返回一個字符串,但由於調用的notebook駐留在同一個JVM中,所以可以這樣做#返回一個引用存儲在臨時視圖中的數據的名稱。##在callee筆記本上火花.範圍(5).toDF(“價值”).createOrReplaceGlobalTempView(“my_data”)dbutils.筆記本.退出(“my_data”)##在來電者筆記本上returned_table=dbutils.筆記本.運行(“LOCATION_OF_CALLEE_NOTEBOOK”,60)global_temp_db=火花.相依.得到(“spark.sql.globalTempDatabase”)顯示(表格(global_temp_db+“。”+returned_table))#示例2 -通過DBFS返回數據。#對於較大的數據集,您可以將結果寫入DBFS,然後返回存儲數據的DBFS路徑。##在callee筆記本上dbutils.fs.rm(“/ tmp /結果/ my_data”,遞歸=真正的)火花.範圍(5).toDF(“價值”).寫.格式(“鋪”).負載(“dbfs: / tmp /結果/ my_data”)dbutils.筆記本.退出(“dbfs: / tmp /結果/ my_data”)##在來電者筆記本上returned_table=dbutils.筆記本.運行(“LOCATION_OF_CALLEE_NOTEBOOK”,60)顯示(火花.讀.格式(“鋪”).負載(returned_table))#示例3 -返回JSON數據。要返回多個值,可以使用標準JSON庫來序列化和反序列化結果。##在callee筆記本上進口jsondbutils.筆記本.退出(json.轉儲({“狀態”:“OK”,“表”:“my_data”}))##在來電者筆記本上結果=dbutils.筆記本.運行(“LOCATION_OF_CALLEE_NOTEBOOK”,60)打印(json.加載(結果))
//示例1 -通過臨時視圖返回數據。//使用dbutls .notebook.exit()隻能返回一個字符串,但由於調用的notebook駐留在同一個JVM中,所以可以這樣做//返回一個名稱,引用存儲在臨時視圖中的數據。/**在被調用筆記本中*/sc.並行化(1來5).toDF().createOrReplaceGlobalTempView(“my_data”)dbutils.筆記本.退出(“my_data”)/**在來電者筆記本中*/瓦爾returned_table=dbutils.筆記本.運行(“LOCATION_OF_CALLEE_NOTEBOOK”,60)瓦爾global_temp_db=火花.相依.得到(“spark.sql.globalTempDatabase”)顯示(表格(global_temp_db+“。”+returned_table))//示例2 -通過DBFS返回數據。//對於較大的數據集,您可以將結果寫入DBFS,然後返回存儲數據的DBFS路徑。/**在被調用筆記本中*/dbutils.fs.rm(“/ tmp /結果/ my_data”,遞歸=真正的)sc.並行化(1來5).toDF().寫.格式(“鋪”).保存(“dbfs: / tmp /結果/ my_data”)dbutils.筆記本.退出(“dbfs: / tmp /結果/ my_data”)/**在來電者筆記本中*/瓦爾returned_table=dbutils.筆記本.運行(“LOCATION_OF_CALLEE_NOTEBOOK”,60)顯示(sqlContext.讀.格式(“鋪”).負載(returned_table))//示例3 -返回JSON數據。//要返回多個值,可以使用標準JSON庫對結果進行序列化和反序列化。/**在被調用筆記本中*///導入jackson json庫進口com.fasterxml.傑克遜.模塊.scala.DefaultScalaModule進口com.fasterxml.傑克遜.模塊.scala.實驗.ScalaObjectMapper進口com.fasterxml.傑克遜.databind.objectmap//創建json序列化器瓦爾jsonMapper=新objectmap與ScalaObjectMapperjsonMapper.registerModule(DefaultScalaModule)//使用json退出dbutils.筆記本.退出(jsonMapper.writeValueAsString(地圖(“狀態”->“OK”,“表”->“my_data”)))/**在來電者筆記本中*/瓦爾結果=dbutils.筆記本.運行(“LOCATION_OF_CALLEE_NOTEBOOK”,60)println(jsonMapper.readValue[地圖[字符串,字符串]] (結果))
處理錯誤
本節說明如何處理筆記本工作流程中的錯誤。
#工作流錯誤拋出WorkflowException。defrun_with_retry(筆記本,超時,arg遊戲={},max_retries=3.):num_retries=0而真正的:試一試:返回dbutils.筆記本.運行(筆記本,超時,arg遊戲)除了異常作為e:如果num_retries>max_retries:提高e其他的:打印(“失敗的錯誤”,e)num_retries+ =1run_with_retry(“LOCATION_OF_CALLEE_NOTEBOOK”,60,max_retries=5)
//工作流中的錯誤會拋出一個WorkflowException。進口com.磚.WorkflowException//因為dbutls .notebook.run()隻是一個函數調用,你可以使用標準的Scala try-catch重試失敗//控製流。在這裏,我們展示了一個重複使用筆記本多次的例子。defrunRetry(筆記本:字符串,超時:Int,arg遊戲:地圖[字符串,字符串]=地圖.空,maxTries:Int=3.):字符串={varnumTries=0而(真正的){試一試{返回dbutils.筆記本.運行(筆記本,超時,arg遊戲)}抓{情況下e:WorkflowException如果numTries<maxTries= >println("錯誤,正在重試:"+e)}numTries+ =1}""//沒有到達}runRetry(“LOCATION_OF_CALLEE_NOTEBOOK”,超時=60,maxTries=5)