開始
用戶指南
管理指南
參考指南
資源
2023年1月9日更新
給我們反饋
開發和部署數據處理管道通常需要管理任務之間複雜的依賴關係。例如,管道可能從源讀取數據、清理數據、轉換已清理的數據,並將轉換後的數據寫入目標。在操作數據管道時,需要對其進行測試、調度和故障排除。
工作流係統通過允許您定義任務之間的依賴關係、安排管道運行的時間以及監控工作流來解決這些挑戰。Apache氣流是一種用於管理和調度數據管道的開源解決方案。氣流將數據管道表示為操作的有向無環圖(dag)。在Python文件中定義一個工作流,風流管理調度和執行。該氣流Databricks連接可以讓您利用優化的Spark引擎提供的Databricks與氣流的調度功能。
本文描述了如何安裝風流,並提供了一個使用風流運行Databricks作業的示例。
氣流和Databricks之間的集成在氣流1.9.0版本及更高版本中可用。本文中的示例使用了風流2.1.0版進行測試。
氣流需要Python 3.6、3.7或3.8。本文中的示例使用Python 3.8進行測試。
要安裝氣流數據集集成,打開終端並運行以下命令:
mkdir氣流cd氣流pipenv——蟒蛇3..8 pipenv炮彈出口AIRFLOW_HOME=$ (鬆材線蟲病)Pipenv安裝apache-氣流= =2.1.0 pipenv install apache-airflow-providers-databricks mkdir dags氣流db init氣流用戶創建——用戶名admin——firstname ——lastname ——角色admin——email your@email.com
這些命令:
創建一個名為氣流然後切換到那個目錄。
氣流
使用pipenv來創建和生成Python虛擬環境.Databricks建議使用Python虛擬環境來隔離包版本和代碼對該環境的依賴關係。這種隔離有助於減少意外的包版本不匹配和代碼依賴衝突。
pipenv
初始化名為AIRFLOW_HOME的路徑氣流目錄中。
AIRFLOW_HOME
安裝氣流和氣流數據提供程序包。
創建一個氣流/無進取心的人目錄中。氣流使用熟練的技藝目錄來存儲DAG定義。
氣流/無進取心的人
熟練的技藝
初始化一個SQLite數據庫,供風流跟蹤元數據使用。在生產氣流部署中,您將使用標準數據庫配置氣流。的SQLite數據庫和氣流部署的默認配置在氣流目錄中。
為氣流創建一個管理用戶。
安裝臨時演員,例如,芹菜,s3,密碼運行:
芹菜
s3
密碼
pip安裝"apache-氣流[數據,芹菜,s3,密碼]"
需要通過氣流web服務器查看氣流界麵。打開終端,執行如下命令啟動web服務器。
氣流網絡服務器
調度程序是用於調度dagg的風流組件。打開一個新終端,運行如下命令:
pipenv殼出口AIRFLOW_HOME=$ (鬆材線蟲病)氣流調度器
要驗證風流的安裝,你可以運行風流包含的一個樣例dag:
在瀏覽器窗口中打開http://localhost:8080/home.氣流熟練的技藝屏幕上出現了。
單擊暫停/ Unpause DAG切換以取消暫停一個示例dag,例如example_python_operator.
example_python_operator
控件觸發示例DAG開始按鈕。
單擊DAG名稱,可查看該DAG的詳細信息,包括DAG的運行狀態。
氣流數據集成提供了兩個不同的操作符來觸發作業:
的DatabricksRunNowOperator需要現有的Databricks作業,並使用觸發一個新的作業運行(帖子/工作/運行) API請求來觸發運行。Databricks推薦使用DatabricksRunNowOperator因為它減少了作業定義的重複,並且使用此操作符觸發的作業運行很容易在工作界麵.
帖子/工作/運行
DatabricksRunNowOperator
的DatabricksSubmitRunOperator在“數據庫”中不要求作業存在,並使用創建並觸發一次運行(帖子/ /運行/提交工作) API請求提交作業規範並觸發運行。
帖子/ /運行/提交工作
Databricks氣流操作員每天將作業運行頁URL寫入氣流日誌polling_period_seconds(默認為30秒)。有關更多信息,請參見apache-airflow-providers-databricks套餐頁麵上的氣流網站。
polling_period_seconds
下麵的示例演示如何創建一個簡單的運行在本地機器上的風流部署,並部署一個示例DAG來觸發Databricks中的運行。在這個例子中,你:
創建一個新的筆記本,並添加基於配置參數打印問候語的代碼。
創建一個Databricks作業,其中隻有一個運行筆記本的任務。
配置到Databricks工作區的風流連接。
創建一個氣流DAG來觸發筆記本作業。在Python腳本中使用DatabricksRunNowOperator.
使用氣流界麵觸發DAG並查看運行狀態。
這個例子使用了一個包含兩個單元格的筆記本:
第一個單元格包含Databricks實用程序文本小部件定義一個名為問候設置為默認值世界.
問候
世界
的值問候變量前綴為你好.
你好
創建筆記本:
轉到Databricks登錄頁並選擇創建空白筆記本,或按新在側欄中選擇筆記本.的創建筆記本對話框出現了。
在創建筆記本對話,給你的筆記本起個名字,比如你好氣流.集默認的語言來Python.離開集群設置為默認值。您將在創建使用此筆記本的任務時配置集群。
點擊創建.
複製以下Python代碼並將其粘貼到筆記本的第一個單元格中。
dbutils.小部件.文本(“問候”,“世界”,“問候”)問候=dbutils.小部件.得到(“問候”)
在第一個單元格下麵添加一個新單元格,並複製並粘貼以下Python代碼到新單元格中:
打印(“你好{}".格式(問候))
點擊工作流在側欄中。
點擊.
的任務選項卡將顯示創建任務對話框。
取代為你的工作添加一個名字…你的工作名稱。
在任務名稱字段,輸入任務的名稱,例如,greeting-task.
在類型下拉,選擇筆記本.
使用文件瀏覽器找到您創建的筆記本,單擊筆記本名稱,然後單擊確認.
點擊添加下參數.在關鍵字段中,輸入問候.在價值字段中,輸入氣流用戶.
氣流用戶
點擊創建任務.
如需立即運行作業,請單擊在右上角。控件,也可以運行作業運行標簽,點擊現在運行在活躍的運行表格
單擊運行選擇並單擊查看詳細信息在活躍的運行表或已完成運行(過去60天)表格
複製工作ID價值。此值用於從“氣流”觸發作業。
請注意
作為安全最佳實踐,當使用自動化工具、係統、腳本和應用程序進行身份驗證時,Databricks建議您使用屬於的訪問令牌服務主體而不是工作區用戶。要為服務主體創建訪問令牌,請參見管理服務主體的訪問令牌.
氣流使用Databricks個人訪問令牌(PAT)連接到Databricks。看到個人訪問令牌查閱有關創建PAT的說明。
氣流安裝包含Databricks的默認連接。要使用上麵創建的個人訪問令牌更新連接到您的工作區:
在瀏覽器窗口中打開http://localhost:8080/connection/list/.
下康涅狄格州ID,定位databricks_default然後點擊編輯記錄按鈕。
中的值宿主字段的工作空間實例名稱您的Databricks部署。
在額外的字段,輸入如下值:
{“令牌”:“PERSONAL_ACCESS_TOKEN”}
取代PERSONAL_ACCESS_TOKEN使用Databricks個人訪問令牌。
PERSONAL_ACCESS_TOKEN
在Python文件中定義一個風流DAG。創建一個DAG來觸發示例筆記本作業:
在文本編輯器或IDE中,創建一個名為databricks_dag.py其內容如下:
databricks_dag.py
從氣流進口DAG從airflow.providers.databricks.operators.databricks進口DatabricksRunNowOperator從airflow.utils.dates進口days_agodefault_args={“主人”:“氣流”}與DAG(“databricks_dag”,start_date=days_ago(2),schedule_interval=沒有一個,default_args=default_args)作為dag:opr_run_now=DatabricksRunNowOperator(task_id=“run_now”,databricks_conn_id=“databricks_default”,job_id=JOB_ID)
取代JOB_ID使用之前保存的作業ID的值。
JOB_ID
將文件保存在氣流/無進取心的人目錄中。氣流自動讀取和安裝存儲在DAG文件氣流/無進取心的人.
要在氣流界麵中觸發和驗證DAG:
定位databricks_dag然後點擊暫停/ Unpause DAG切換以取消暫停DAG。
databricks_dag
觸發DAG通過點擊開始按鈕。
中的運行運行列查看運行的狀態和詳細信息。