跳轉到主要內容
公司博客上

集成Apache氣流和磚:建築ETL管道與Apache火花

2016年12月8日 公司博客上

分享這篇文章

這是一個一係列的博客將磚與常用的軟件包。看到末尾的“下一步”一節閱讀其他係列,其中包括教程AWSλ,運動等等。


Apache氣流概述

氣流作者是一個Beplay体育安卓版本平台以編程方式、調度和監控工作流。它可以用於作者工作流有向無環圖(無進取心的人)的任務。氣流調度程序執行你的任務在一個數組的工人在以下指定的依賴關係。

氣流中有兩個關鍵概念:裝飾邊描述如何運行一個工作流,而運營商確定什麼通過定義一個工作流中的任務都能真正地完成。運營商通常(但不總是)原子,這意味著它們可以站在自己的,不需要與其他運營商共享資源。

氣流是一個異構工作流管理係統支持多個係統在雲和預置的粘合。在這種情況下,磚是更大的係統的一個組件,例如,ETL或機器學習管道,氣流可以用於調度和管理。
氣流已經與一些常用的係統,如S3, MySQL,或HTTP端點;一個也可以擴展了基本模塊容易對其他係統。

這個博客是有氣流的實例已經啟動並運行。請查看“參考”部分數據如何設置氣流。

如何使用氣流磚嗎

磚REST API使編程訪問磚(而不是通過Web UI)。它可以自動創建和運行工作,productionalize數據流,等等。它還將允許我們將氣流通過氣流與磚運營商。

氣流提供運營商對於許多常見任務,您可以使用BashOperator傳感器運營商解決許多典型的ETL的用例,如引發每天更新的ETL作業發布在AWS S3或行記錄在數據庫中。

的BashOperator

BashOperator執行一個bash命令。它可以用來與磚通過集成磚API開始一個預配置的火花工作,例如:

t0=BashOperator (task_id=“dbjob”,depends_on_past=,bash_command=' curl - x POST - u用戶名:密碼https://.cloud.www.eheci.com/api/2.0/jobs/run-now - d \ '{\“job_id \”:} \,dag=dag)

您可以測試這個操作符的輸入:

%氣流測試教程dbjob 2016-10-01

在上麵的例子中操作員在磚開始一個工作,JSON負載是一個鍵/值(job_id和實際作業編號)。

注意:而不是使用curl BashOperator,您還可以使用SimpleHTTPOperator實現相同的結果。

傳感器操作符

傳感器運營商一直運行直到符合標準。例子包括:某些文件降落在S3 bucket (S3KeySensor),或一個HTTP GET請求到一個端點(HttpSensor);重要的是要樹立正確的每個重試間隔時間,“poke_interval”。有必要使用一個傳感器操作員“鉤”的持久層進行推送通知ETL工作流。

JSON文件拚花處理的例子

下麵是一個例子,設置一個管道處理JSON文件,並將它們轉換為拚花每天使用磚。氣流用於編排這管道通過檢測當準備處理日常文件和設置“S3傳感器檢測的輸出日常工作和發送最後一個電子郵件通知。

管道的安裝:

截圖顯示在氣流管道的設置

如上所示這個管道有五個步驟:

  1. 輸入S3傳感器(check_s3_for_file_s3)檢查輸入數據確實存在:
s3: / / /輸入氣流/輸入- *
  1. 磚REST API (dbjob), BashOperator REST API調用磚和動態傳遞的文件輸入和輸出參數。為了說明這一點在這個博客,我們使用下麵的命令;你的工作量,有很多方法可以保持安全如果進入你的S3密鑰氣流Python配置文件是一個安全問題:
curl - x POST - u: \https:/ /.cloud.www.eheci.com/api/2.0/jobs/run-now \- d”{job_id”:,“notebook_params”: {“inputPath”:“s3a: / / @ /輸入/測試。json”、“outputPath”:“s3a: / /: @ /輸出/ sample_parquet_data "}}

nightly-etl-job的磚

上麵的截圖是工作在磚從氣流。讀了磚的工作文檔要學習更多的知識。

  1. 磚行動涉及讀取一個輸入JSON文件並將其轉換為拚花:
val inputPath = getArgument (“inputPath”,“測試”)val testJsonData = sqlContext.read.json (inputPath)val outPath = getArgument (“outputPath”,“測試”)testJsonData.write。格式(“鋪”).mode (“覆蓋”).save (outPath)
  1. S3輸出的傳感器(check_s3_output_for_file_s3)檢查輸出數據確實存在:
s3:/ / / output-airflow / sample_parquet_data / _SUCCESS
  1. 電子郵件通知(email_notification),發送電子郵件提醒,當工作是成功的。

下麵是Python的氣流管道配置文件:

氣流進口DAG氣流。經營者進口BashOperator S3KeySensor EmailOperatordatetime進口datetime, timedelta
              today_date=datetime.today ()
              default_args={“主人”:“氣流”,“depends_on_past”:,“start_date”:today_date,“電子郵件”:【“@www.eheci.com”),“email_on_failure”:,“email_on_retry”:,“重試”:1,“retry_delay”:timedelta(分鍾=5),}
              dag=DAG (“教程”,default_args=default_args, schedule_interval=“@once”)
              inputsensor=S3KeySensor (task_id=“check_s3_for_file_in_s3”,bucket_key=“input-airflow /輸入- *”,wildcard_match=真正的,bucket_name=“peyman-datapipeline”,s3_conn_id=“S3Connection”,超時=18*60*60,poke_interval=120年,dag=dag)
              dbtask=BashOperator (task_id=“dbjob”,depends_on_past=,bash_command=' curl - x POST - u: https://demo.cloud.www.eheci.com/api/2.0/jobs/run-now - d \ '{\“job_id \”, \“notebook_params \”: {\“inputPath \”: \”:
免費試著磚
看到所有公司博客上的帖子
Baidu
map