使用數據鏈球和Apache氣流更輕鬆地構建數據和ML管道

我們很高興宣布Apache氣流對數據映的支持一係列增強功能。這些新功能使在流行的開源編排者中構建強大的數據和機器學習(ML)管道變得容易。借助最新的增強功能,例如新的Databrickssqloperator,客戶現在可以使用氣流在數據映中使用標準SQL查詢和攝beplay体育app下载地址入數據,在筆記本上運行分析和ML任務,觸發Delta Live Tables來轉換Lakehouse中的數據等等。

Apache氣流是使用Python進行編程作者,計劃和監視數據和機器學習管道(稱Beplay体育安卓版本為DAGS)的流行,可擴展的平台。氣流包含大量的內置操作員,可輕鬆與從數據庫到雲存儲的所有內容進行交互。Databricks具有支持的自2017年以來的氣流,使氣流用戶能夠在Databricks的Lakehouse Platform上觸發工作流程,將筆記本電腦,罐子和Python腳本結合在一起,該平台可擴展到地球上最具挑戰性的數據和ML工作流。Beplay体育安卓版本

讓我們通過現實世界中的任務參觀新功能:構建一個簡單的數據管道,該管道將新近放置的天氣數據從API加載到三角洲表中,而無需使用Databricks筆記本電腦來執行該作業。出於本博客文章的目的,我們將在Azure上做所有事情,但是在AWS和GCP上,該過程幾乎相同。另外,我們將在SQL端點但是,如果您更喜歡使用通用數據鏈球群集群,則該過程非常相似。最終的示例DAG在氣流UI中看起來像這樣:

氣流DAG用於將數據攝入Databricks SQL表中

為了簡潔起見,我們將從此博客文章中介紹一些代碼。您可以看到所有代碼這裏

安裝和配置氣流

該博客文章假設您安裝了氣流2.1.0或更高的安裝Databricks連接。安裝用於Apache Airffore的Databricks提供商的最新版本:

PIP安裝apache-airflow-Providers-databricks

創建一個存儲天氣數據的表

我們將氣流DAG定義為每天運行。第一個任務,create_table,運行一個SQL語句,該語句創建一個名為的表AirFlow_Weather在裏麵默認模式如果表已經不存在。此任務證明了Databrickssqloperator可以在Databricks計算上運行任意SQL語句,包括SQL端點。

使用dag(“ load_weather_into_dbsql”,start_date = days_ago(0),schedue_interval =“@daily”,default_args = default_args,catchup = false,catchup = false)as dag = table =“ default.airflow_weather.airflow_weather” schema雙重,降水double,“ \”區域字符串,溫度長,風長,“ \” next_days數組“ create_table = databrickssqloperator(task_id =“ create_table”,sql = [f“ create table){schema})使用delta“],),)

從API檢索天氣數據並上傳到雲存儲

接下來,我們使用PythOnoperator向Weather API提出請求,將存儲在臨時位置中的JSON文件中。

一旦我們在本地擁有天氣數據,我們將使用LocalFilesystemtowasBoperator上傳到雲存儲,因為我們使用了Azure存儲。當然,氣流還支持將文件上傳到Amazon S3或Google Cloud Storage:

get_weather_data = pythonoperator(task_id =“ get_weather_data”,python_callable = get_weather_data,op_kwargs = {“ output_path”:“ outper__path”:“/tmp/-prod,file_path =“/tmp/{{ds}}。json”,container_name ='test',blob_name =“ airflow/landing/landing/{{ds}}}

請注意,上麵使用{{ds}}變量來指示氣流將變量替換為計劃的任務運行日期,從而使我們提供一致的,不衝突的文件名。

將數據攝取到表中

最後,我們準備將數據導入表。為此,我們使用方便DataBrickScopyIntooperator,生成一個複製成SQL語句。該副本進入命令是一種簡單但功能強大的方法,可以將文件從雲存儲中攝入表格中:

import_weather_data = DatabricksCopyIntoOperator( task_id="import_weather_data", expression_list="date::date, * except(date)", table_name=table, file_format="JSON", file_location="abfss://mycontainer@mystoreaccount.dfs.core.windows.net/airflow/landing/“,files = [“ {{ds}}}。json”])

就是這樣!現在,我們有了可靠的數據管道,將數據從API中攝取到隻有幾行代碼的表中。

但這還不是全部……

我們也很樂意宣布改進,使與數據鏈球協會集成氣流。

  • Databrickssubmitrunoperator已升級以使用最新的作業API v2.1。使用新的API,可以為使用Databrickssubmitrunoperator提交的作業配置訪問控件要容易得多,因此開發人員或支持團隊可以輕鬆訪問Job UI和日誌。
  • 氣流現在可以觸發三角洲實時桌管。
  • 氣流DAG現在可以傳遞JAR任務類型的參數。
  • 可以將Databricks存儲庫更新為特定的分支或標簽,以確保作業始終使用代碼的最新版本。
  • 在Azure上,可以使用Azure Active Directory代幣而不是個人訪問令牌(PAT)。例如,如果氣流在具有托管身份的Azure VM上運行,Databricks運營商可以使用托管身份來對Azure Databricks進行身份驗證,而無需使用PAT令牌。了解有關此和其他身份驗證增強功能的更多信息這裏

Databricks上的氣流用戶的未來是光明的

我們對這些改進感到興奮,並期待看到氣流社區通過數據映射的建造。我們很想聽聽您的回饋我們應該在哪些功能上添加下一步。

免費嚐試Databricks 開始

注冊

Baidu
map