利用氣流,我已經創建了一個序列的DAG筆記本的任務。第一個筆記本返回一批id;隨後的筆記本需要這個batch_id任務。
我用DatabricksSubmitRunOperator筆記本運行任務。這個操作符將兩個值(run_id run_page_url)氣流Xcom。我使用了run_id磚rest API和使用數據,檢索我筆記本的輸出。
Xcom值檢索工作在python算子的可調用的函數。
我怎麼在DatabricksSubmitRunOperator做同樣的操作?
我怎麼得到筆記本內部的任務實例的處理嗎?
python操作員代碼工作
def pull_databricks_task_run_xcom_values (* * kwargs):
ti = kwargs(“透明國際”)
run_id =。xcom_pull(關鍵= run_id, task_ids =“notebook_task1”)
logger.info(“= = = = = =把價值基於run_id: = = = = = " + str (run_id))
run_page_url =。xcom_pull(關鍵= run_page_url, task_ids =“initiate_batch”)
logger.info(“= = = = = =把價值基於run_page_url: = = = = = " + str (run_page_url))
DAG (trigger_db_notebooks_tasks, start_date = days_ago (2), schedule_interval = None, default_args = default_args) DAG:
#任務1
notebook_task1 = DatabricksSubmitRunOperator (task_id =“notebook_task1 notebook_task = notebook_task1_dict new_cluster = default_cluster_conf [' clusters_spec '], do_xcom_push = True)
#任務1
xcom_pull = PythonOperator (task_id = download_files, provide_context = True, python_callable = pull_databricks_task_run_xcom_values)
據我所知,你想run_id參數傳遞給第二個筆記本任務?
您可以: