開始
用戶指南
管理指南
參考指南
資源
更新於2022年6月10日
給我們反饋
開發和部署數據處理管道通常需要管理任務之間的複雜依賴關係。例如,管道可以從源讀取數據、清理數據、轉換清理的數據,並將轉換後的數據寫入目標。在對數據管道進行操作時,您需要對它們進行測試、調度和故障排除。
工作流係統通過允許您定義任務之間的依賴關係、管道運行時的調度以及監控工作流來解決這些挑戰。磚建議多任務的工作不依賴外部係統來管理工作流程。Databricks作業通過標準的身份驗證和訪問控製方法提供任務編排。您可以使用熟悉的、用戶友好的界麵來管理作業,以創建和管理複雜的工作流。您可以定義一個包含多個任務的作業,其中每個任務運行諸如筆記本或JAR之類的代碼,並通過指定任務之間的依賴關係來控製任務在作業中的執行順序。您可以將作業的任務配置為順序運行或並行運行。
Databricks還支持工作流管理Apache氣流.
Apache氣流是一個用於管理和調度數據管道的開源解決方案。氣流用操作的有向無環圖(dag)表示數據管道。你在一個Python文件中定義一個工作流,風流管理調度和執行。
氣流提供Databricks和氣流之間的緊密集成。氣流數據ricks集成可以讓您利用優化的星火引擎提供的數據ricks與氣流的調度功能。
氣流和Databricks的集成在氣流1.9.0及更高版本。本文中的示例使用氣流2.1.0版本進行了測試。
氣流需要Python 3.6、3.7或3.8。本文中的示例是用Python 3.8測試的。
安裝氣流Databricks集成設備,打開終端,執行如下命令:
mkdir氣流cd氣流pipenv——python3.8 pipenv殼出口AIRFLOW_HOME=$ (鬆材線蟲病)pipenv安裝apache-airflow= =2.1.0 pipenv install apache-airflow-provider -databricks mkdir dags氣流db init氣流用戶創建——username admin——firstname ——lastname ——role admin——email your@email.com
這些命令:
創建一個名為氣流然後切換到那個目錄。
氣流
使用pipenv創建和生成一個Python虛擬環境.Databricks建議使用Python虛擬環境來隔離包版本和對該環境的代碼依賴。這種隔離有助於減少意外的包版本不匹配和代碼依賴衝突。
pipenv
初始化一個名為AIRFLOW_HOME設置為氣流目錄中。
AIRFLOW_HOME
安裝氣流和氣流數據供應商軟件包。
創建一個氣流/無進取心的人目錄中。氣流使用熟練的技藝目錄來存儲DAG定義。
氣流/無進取心的人
熟練的技藝
初始化一個SQLite數據庫,用於風流跟蹤元數據。在生產氣流部署中,你可以用標準數據庫配置氣流。中初始化了SQLite數據庫和氣流部署的默認配置氣流目錄中。
創建氣流管理用戶。
安裝臨時演員例如,芹菜,s3,密碼運行:
芹菜
s3
密碼
pip安裝“apache-氣流[數據庫,芹菜,s3,密碼]”
氣流web服務器需要查看氣流界麵。打開終端,執行如下命令啟動web服務器:
氣流網絡服務器
調度程序是氣流組件,用於調度dag。打開一個新的終端,運行如下命令:
pipenv殼出口AIRFLOW_HOME=$ (鬆材線蟲病)氣流調度器
為了驗證氣流的安裝,你可以運行一個帶有氣流的dag示例:
在瀏覽器窗口中打開http://localhost:8080/home.氣流熟練的技藝屏幕上出現了。
單擊暫停/ Unpause DAG切換以取消其中一個示例dag的暫停,例如example_python_operator.
example_python_operator
通過單擊。觸發示例DAG開始按鈕。
單擊DAG名稱可以查看DAG的詳細信息,包括該DAG的運行狀態。
氣流Databricks集成提供了兩種不同的作業觸發操作:
的DatabricksRunNowOperator需要現有的Databricks作業,並使用觸發新的作業運行(帖子/工作/運行) API請求來觸發運行。磚推薦使用DatabricksRunNowOperator由於它減少了作業定義的重複,因此很容易在工作界麵.
帖子/工作/運行
DatabricksRunNowOperator
的DatabricksSubmitRunOperator不需要工作存在於數據ricks中,並使用創建並觸發一次性運行(帖子/ /運行/提交工作) API請求提交作業規範並觸發運行。
帖子/ /運行/提交工作
Databricks氣流操作符每次將作業運行頁麵URL寫入氣流日誌polling_period_seconds(默認為30秒)。有關更多信息,請參見apache-airflow-providers-databricks氣流網站上的軟件包頁麵。
polling_period_seconds
下麵的例子演示了如何創建一個運行在本地機器上的簡單的氣流部署,並部署一個示例DAG來觸發Databricks中的運行。對於這個例子,你:
創建一個新的筆記本,並添加基於已配置參數打印問候語的代碼。
創建一個Databricks作業,使用一個單獨的任務運行筆記本。
配置一個氣流連接到你的Databricks工作空間。
創建一個氣流DAG來觸發筆記本作業。在Python腳本中定義DAGDatabricksRunNowOperator.
使用氣流界麵觸發DAG並查看運行狀態。
下麵的例子使用了一個包含兩個單元格的筆記本:
第一個單元格包含一個Databricks實用程序文本小部件定義一個名為問候設置為默認值世界.
問候
世界
的值,第二個單元格打印問候變量前綴你好.
你好
創建筆記本:
進入Databricks登錄頁麵並選擇創建空白筆記本或者點擊創建在側邊欄中選擇筆記本從菜單中。的創建筆記本對話框出現了。
在創建筆記本對話,給你的筆記本起一個名字,例如你好氣流.集默認的語言來Python.離開集群設置為默認值。您將在創建使用此筆記本的任務時配置集群。
點擊創建.
複製下麵的Python代碼並將其粘貼到筆記本的第一個單元格中。
dbutils.小部件.文本(“問候”,“世界”,“問候”)問候=dbutils.小部件.得到(“問候”)
在第一個單元格下麵添加一個新單元格,複製並粘貼以下Python代碼到新單元格:
打印(“你好{}".格式(問候))
點擊工作流在側邊欄。
點擊.
的任務選項卡顯示創建任務對話框。
取代為你的工作添加一個名字……寫上你的工作名稱。
在任務名稱字段中,輸入任務的名稱,例如greeting-task.
在類型下拉,選擇筆記本.
使用文件瀏覽器找到您創建的筆記本,單擊筆記本名稱,然後單擊確認.
點擊添加下參數.在關鍵字段中,輸入問候.在價值字段中,輸入氣流用戶.
氣流用戶
點擊創建任務.
若要立即運行作業,請單擊在右上角。還可以通過單擊運行選項卡並單擊現在運行在活躍的運行表格
單擊運行選項卡並單擊查看詳細信息在活躍的運行表或已完成的測試(過去60天)表格
複製工作ID價值。要從氣流觸發作業,需要這個值。
氣流使用Databricks個人訪問令牌(PAT)連接到Databricks。看到個人訪問令牌查看創建PAT的說明。
你的氣流安裝包含一個默認的Databricks連接。使用上麵創建的個人訪問令牌更新連接到您的工作空間:
在瀏覽器窗口中打開http://localhost:8080/connection/list/.
下康涅狄格州ID,定位databricks_default並單擊編輯記錄按鈕。
替換宿主場的工作區實例名您的Databricks部署。
在額外的字段,輸入以下值:
{“令牌”:“PERSONAL_ACCESS_TOKEN”}
取代PERSONAL_ACCESS_TOKEN與您的Databricks個人訪問令牌。
PERSONAL_ACCESS_TOKEN
你可以在Python文件中定義一個氣流DAG。創建一個DAG來觸發示例notebook任務:
在文本編輯器或IDE中,創建一個名為databricks_dag.py具體內容如下:
databricks_dag.py
從氣流進口DAG從airflow.providers.databricks.operators.databricks進口DatabricksRunNowOperator從airflow.utils.dates進口days_agodefault_args={“主人”:“氣流”}與DAG(“databricks_dag”,start_date=days_ago(2),schedule_interval=沒有一個,default_args=default_args)作為dag:opr_run_now=DatabricksRunNowOperator(task_id=“run_now”,databricks_conn_id=“databricks_default”,job_id=JOB_ID)
取代JOB_ID使用前麵保存的作業ID的值。
JOB_ID
保存文件到氣流/無進取心的人目錄中。中存儲的DAG文件,氣流自動讀取並安裝氣流/無進取心的人.
在氣流界麵中觸發和驗證DAG:
定位databricks_dag並單擊暫停/ Unpause DAG切換以取消暫停DAG。
databricks_dag
控件觸發DAG開始按鈕。
點擊運行運行列來查看運行的狀態和詳細信息。