取消
顯示的結果
而不是尋找
你的意思是:

氣流在筆記本的任務——如何把XComs值嗎?

Choolanadu
新的因素

利用氣流,我已經創建了一個序列的DAG筆記本的任務。第一個筆記本返回一批id;隨後的筆記本需要這個batch_id任務。

我用DatabricksSubmitRunOperator筆記本運行任務。這個操作符將兩個值(run_id run_page_url)氣流Xcom。我使用了run_id磚rest API和使用數據,檢索我筆記本的輸出。

Xcom值檢索工作在python算子的可調用的函數。

我怎麼在DatabricksSubmitRunOperator做同樣的操作?

我怎麼得到筆記本內部的任務實例的處理嗎?

python操作員代碼工作

def pull_databricks_task_run_xcom_values (* * kwargs):

ti = kwargs(“透明國際”)

run_id =。xcom_pull(關鍵= run_id, task_ids =“notebook_task1”)

logger.info(“= = = = = =把價值基於run_id: = = = = = " + str (run_id))

run_page_url =。xcom_pull(關鍵= run_page_url, task_ids =“initiate_batch”)

logger.info(“= = = = = =把價值基於run_page_url: = = = = = " + str (run_page_url))

DAG (trigger_db_notebooks_tasks, start_date = days_ago (2), schedule_interval = None, default_args = default_args) DAG:

#任務1

notebook_task1 = DatabricksSubmitRunOperator (task_id =“notebook_task1 notebook_task = notebook_task1_dict new_cluster = default_cluster_conf [' clusters_spec '], do_xcom_push = True)

#任務1

xcom_pull = PythonOperator (task_id = download_files, provide_context = True, python_callable = pull_databricks_task_run_xcom_values)

1回複1

daniel_sahal
尊敬的貢獻者三世

據我所知,你想run_id參數傳遞給第二個筆記本任務?

您可以:

  1. 創建一個小部件參數在你磚的筆記本(https://docs.www.eheci.com/notebooks/widgets.html),將消耗你的run_id
  2. 通過參數DatabricksSubmitRunOperator(例子:https://stackoverflow.com/questions/61542653/airflow-databrickssubmitrunoperator-does-not-take-in-no..。
歡迎來到磚社區:讓學習、網絡和一起慶祝

加入我們的快速增長的數據專業人員和專家的80 k +社區成員,準備發現,幫助和合作而做出有意義的聯係。

點擊在這裏注冊今天,加入!

參與令人興奮的技術討論,加入一個組與你的同事和滿足我們的成員。

Baidu
map