嗨@Leszek,
經過你共享,探索進一步的聯係我發現它最適合於I / O操作。
但是我是CPU綁定操作,很多計算發生一件事,我需要我的筆記本運行至少有3500 3500倍輸入值和這是我的要求。
例如考慮我輸入3500個不同的值的列表,我有一個筆記本叫NotebookA我需要運行筆記本列表中的值。
運行順序一個接一個的筆記本將消耗大量的時間,這就是為什麼我在尋找並行處理。
我使用下麵的自定義python函數我發現在線作為權宜之計。你的筆記本需要參數,這樣你就可以通過每次迭代在不同的運行時的值。tasklist隻是每次迭代參數的列表
從互聯網下載# #代碼使筆記本電腦在並行的並行運行。期貨進口ThreadPoolExecutor類NotebookData: def __init__(自我、路徑、超時、參數= None,重試= 0):自我。路徑=路徑的自我。timeout =超時的自我。參數=參數自我。重試=重試def submitNotebook(筆記本):print(“筆記本運行參數{}:{}“.format(筆記本。路徑,notebook.parameters):如果(notebook.parameters):返回dbutils.notebook.run (notebook.path,筆記本。超時,notebook.parameters):返回dbutils.notebook.run (notebook.path notebook.timeout)除了例外:print(“失敗:筆記本與參數{}:{}“.format(筆記本。路徑,如果筆記本notebook.parameters))。重試< 1:提高打印(“使用參數來重新嚐試筆記本{}:{}”.format(筆記本。路徑,notebook.parameters)筆記本。重試=筆記本。重試- 1 submitNotebook(筆記本)def parallelNotebooks(筆記本,numInParallel): #如果您創建太多的筆記本並行驅動程序可能會崩潰當你提交的所有作業。#這段代碼限製並行筆記本的數量。 with ThreadPoolExecutor(max_workers=numInParallel) as ec: return [ec.submit(submitNotebook, notebook) for notebook in notebooks] #get list of data to process from the config "db" taskList = spark.sql("select * FROM Config.DeltaJobsToRun").toPandas().to_dict('records') # create a queue of notebooks based on the metadata above. Pass into the generic param notebook and execute # the dbutils.notebook.run() command that's used doesnt bubble up errors properly so finding the cause means clicking thru all the 'notebook job' links right now. TODO: investigate logging wrapper which will put the info into a good location notebooks=[] for n in taskList: notebooks.append(NotebookData("./deltaLoadParam", 0, n, 1)) res = parallelNotebooks(notebooks, 2) result = [f.result(timeout=3600) for f in res] # This is a blocking call. print(result)
祝找到特定的迭代運行的失敗與上麵的方法雖然——dbutils.notebook.run函數不返回任何有價值的信息,也沒有泡沫的例外
我計劃從磚完全刪除此功能並安排別的地方
為此創建一個工作流的工作筆記本
然後使用一個外部腳本編製工具/使用磚工作api開始工作與正確的參數