開始
加載和管理數據
處理數據
政府
引用和資源
2023年6月23日更新
給我們反饋
這篇文章展示了一個示例的編排與Apache數據管道氣流磚工作。您還將了解如何設置氣流與磚的集成。編製管理工作複雜任務之間的依賴關係。
開發和部署一個數據處理管道通常需要管理複雜任務之間的依賴關係。例如,管道可能讀取數據從源,清理數據,將清洗數據,轉換後的數據寫入一個目標。您需要測試、進度和解決數據管道時實施。
工作流係統應對這些挑戰,允許您定義任務之間的依賴關係,安排當管道運行和監控工作流。Apache氣流是一個開源的解決方案來管理和調度的數據管道。氣流代表數據管道有向無環圖(無進取心的人)的操作。你定義一個工作流在Python文件和氣流管理調度和執行。氣流磚連接允許您利用優化的火花引擎提供的磚與氣流的調度功能。
氣流和磚之間的集成是1.9.0後來氣流中可用版本。本文中的示例與氣流2.1.0版測試。
氣流需要Python 3.6、3.7或3.8。這篇文章中的示例是用Python 3.8做了試驗。
安裝氣流磚集成,打開終端並運行下列命令。一定要用您的用戶名和電子郵件在最後一行:
mkdir氣流cd氣流pipenv——python38 pipenv殼出口AIRFLOW_HOME=$ (鬆材線蟲病)pipenv安裝apache-airflow= =2.1.0 pipenv安裝apache-airflow-providers-databricks mkdir熟練的技藝氣流db init氣流用戶創建用戶名admin——firstname < firstname >——lastname < lastname >——角色管理——電子郵件your.com
當你複製和運行腳本,您執行這些步驟:
創建一個目錄命名氣流和更改到該目錄。
氣流
使用pipenv創建和產卵Python虛擬環境。磚建議使用Python的虛擬環境隔離包版本和代碼依賴環境。這種隔離有助於減少意外包版本不匹配和代碼依賴碰撞。
pipenv
初始化一個環境變量命名AIRFLOW_HOME設置的路徑氣流目錄中。
AIRFLOW_HOME
安裝氣流,氣流磚提供者包。
創建一個氣流/無進取心的人目錄中。氣流使用熟練的技藝目錄存儲DAG定義。
氣流/無進取心的人
熟練的技藝
初始化一個氣流SQLite數據庫用來跟蹤的元數據。在生產氣流部署,您將配置氣流與一個標準的數據庫。氣流的SQLite數據庫和默認配置部署中初始化氣流目錄中。
創建管理員用戶對氣流。
安裝臨時演員例如,芹菜,s3,密碼運行:
芹菜
s3
密碼
pip安裝“apache-airflow(磚、芹菜、s3、密碼)”
氣流web服務器需要查看氣流UI。啟動web服務器,打開終端並運行下列命令:
氣流網絡服務器
調度器是氣流組件,日程安排熟練的技藝。要運行它,打開一個新的終端和運行以下命令:
pipenv殼出口AIRFLOW_HOME=$ (鬆材線蟲病)氣流調度器
驗證氣流安裝,您可以運行一個例子與氣流無進取心的人包括:
在一個瀏覽器窗口,打開http://localhost: 8080 / home。氣流熟練的技藝屏幕上出現了。
單擊暫停/ Unpause DAG切換到unpause裝飾邊的一個例子,例如,example_python_operator。
example_python_operator
點擊觸發DAG的例子開始按鈕。
單擊DAG名稱查看詳細信息,包括DAG的運行狀態。
氣流磚集成提供了兩個不同的操作觸發的工作:
的DatabricksRunNowOperator需要一個現有的磚的工作和使用引發新工作運行(帖子/工作/運行)API請求觸發運行。磚推薦使用DatabricksRunNowOperator因為它可以減少重複工作的定義和工作運行觸發這個操作符很容易找到的工作界麵。
帖子/工作/運行
DatabricksRunNowOperator
的DatabricksSubmitRunOperator不需要工作存在在磚和使用創建並觸發一次運行(帖子/ /運行/提交工作)API請求提交作業規範和觸發運行。
帖子/ /運行/提交工作
磚氣流操作符寫作業運行氣流日誌每一個頁麵的URLpolling_period_seconds(默認是30秒)。有關更多信息,請參見apache-airflow-providers-databricks包在氣流的網站頁麵。
polling_period_seconds
下麵的例子演示了如何創建一個簡單的氣流部署在本地機器上運行和部署一個例子DAG觸發運行在磚。對於本例,您:
創建一個新的筆記本和添加代碼來打印一個問候根據配置參數。
創建一個磚工作運行筆記本的一個任務。
配置一個氣流聯係磚工作區。
創建一個氣流DAG觸發筆記本工作。你在一個Python腳本使用定義DAGDatabricksRunNowOperator。
利用氣流UI觸發DAG並查看運行狀態。
這個例子使用一個筆記本,其中包含兩個單元:
第一個單元格包含一個磚實用程序的文本小部件定義一個變量命名問候設置為默認值世界。
問候
世界
第二個單元格打印的值問候變量的前綴你好。
你好
創建筆記本:
去你的磚著陸頁麵並選擇創建空白筆記本,或點擊新在側邊欄並選擇筆記本。的創建筆記本對話框出現了。
在創建筆記本對話框中,給你的筆記本一個名字,例如你好氣流。集默認的語言來Python。離開集群設置為默認值。您將配置集群創建任務時使用這個筆記本。
點擊創建。
複製下麵的Python代碼粘貼到第一個單元格的筆記本。
dbutils。小部件。文本(“問候”,“世界”,“問候”)問候=dbutils。小部件。得到(“問候”)
添加一個新的細胞低於第一個單元格和下麵的Python代碼複製並粘貼到新的細胞:
打印(“你好{}”。格式(問候))
點擊工作流在側邊欄。
點擊。
的任務選項卡顯示了創建任務對話框。
取代添加一個名稱為你的工作…對你的工作名稱。
在任務名稱字段中,輸入任務的名稱,例如,greeting-task。
在類型下拉,選擇筆記本。
使用文件瀏覽器來找到您創建的筆記本,點擊筆記本名稱,點擊確認。
點擊添加下參數。在關鍵字段中,輸入問候。在價值字段中,輸入氣流用戶。
氣流用戶
點擊創建任務。
立即運行任務,點擊在右上角。您還可以通過單擊運行工作運行選項卡並單擊現在運行在活躍的運行表。
單擊運行選項卡並單擊查看詳細信息在活躍的運行表或完成運行60天(過去)表。
複製工作ID價值。這個值需要觸發工作從氣流。
請注意
作為一個安全最佳實踐進行身份驗證時使用自動化工具,係統、腳本和應用程序,磚建議您使用OAuth令牌或個人訪問令牌屬於服務主體而不是用戶工作區。為服務主體,創建令牌管理個人訪問令牌服務主體。
氣流連接使用磚磚個人訪問令牌(PAT)。看到個人訪問令牌為創建一個帕特的說明。
磚的氣流安裝包含一個默認的連接。更新連接連接到您的工作空間中使用上麵創建的個人訪問令牌:
在一個瀏覽器窗口,打開http://localhost: 8080 /聯係/清單/。
下康涅狄格州ID,定位databricks_default並單擊編輯記錄按鈕。
取代的價值主機場的工作區實例名你的磚部署。
在額外的字段中,輸入下列值:
{“令牌”:“PERSONAL_ACCESS_TOKEN”}
取代PERSONAL_ACCESS_TOKEN與你的磚個人訪問令牌。
PERSONAL_ACCESS_TOKEN
你在一個Python文件定義一個氣流DAG。創建一個DAG觸發的例子筆記本工作:
在文本編輯器中或者IDE,創建一個新文件命名databricks_dag.py用下麵的內容:
databricks_dag.py
從氣流進口DAG從airflow.providers.databricks.operators.databricks進口DatabricksRunNowOperator從airflow.utils.dates進口days_agodefault_args={“主人”:“氣流”}與DAG(“databricks_dag”,start_date=days_ago(2),schedule_interval=沒有一個,default_args=default_args)作為dag:opr_run_now=DatabricksRunNowOperator(task_id=“run_now”,databricks_conn_id=“databricks_default”,job_id=JOB_ID)
取代JOB_ID早些時候與工作ID的值保存。
JOB_ID
保存文件氣流/無進取心的人目錄中。氣流自動讀取和安裝DAG文件存儲在氣流/無進取心的人。
觸發並驗證的DAG氣流界麵:
定位databricks_dag並單擊暫停/ Unpause DAG切換unpause DAG。
databricks_dag
觸發DAG點擊開始按鈕。
點擊運行運行列視圖狀態和運行的細節。