實時流ETL結構化流在Apache 2.1火花
探索為什麼lakehouses未來的數據架構與數據倉庫的父親,Bill Inmon。
我們在大數據時代,組織收集大量數據在持續的基礎上。然而,這個數據洪流的值取決於能夠及時提取可行的見解。因此,人們越來越需要連續應用程序可以從大量數據攝入管道獲得實時可行的見解。
然而,構建工業生產連續應用程序可能是一個挑戰,開發人員需要克服很多障礙,包括:
- 提供端到端可靠性和正確性的保證——長時間運行的數據處理係統必須適應失敗通過確保輸出在批處理是一致的結果。此外,不同尋常的活動(e。g在上遊組件失敗,峰值流量等)必須不斷監控和自動減輕確保高可用性見解實時交付。
- 執行複雜的轉換數據到達無數格式(CSV、JSON、Avro等),通常必須重組,轉變和增強之前被消耗。此類重組從批處理係統要求所有傳統工具可用,但沒有添加延遲,他們通常需要。
- 後期處理或無序的數據——在處理物理世界,遲到或無序的數據是一個不爭的事實。因此,聚合和其它複雜的計算必須持續(準確地)修訂後的新信息的到來。
- 與其他係統的集成——信息來源於各種渠道(卡夫卡,HDFS, S3等),必須集成看到完整的圖片。
結構化流在Apache火花構建火花SQL的強有力的基礎之上,利用其強大的api提供一個無縫的查詢接口,同時優化的執行引擎實現低延遲,不斷更新的答案。這篇博客開始一個係列中,我們將探討如何使用Apache火花2.1的新特性來克服上述挑戰,構建我們自己的生產管道。
在這第一篇文章中,我們將關注ETL轉換原始管道AWS CloudTrail審計日誌成一個JIT的數據倉庫實現更快的即席查詢。我們將展示是多麼容易接受一批現有ETL工作,隨後productize實時流管道在磚使用結構化流。使用這個管道,我們轉換380萬年JSON文件包含79億年記錄到一個鋪桌子,它允許我們做特別查詢updated-to-the-minute拚花表快十倍比原始JSON文件。
流ETL的必要性
提取、轉換和加載(ETL)管道準備原料,非結構化數據成一種可以輕鬆高效地查詢。具體來說,他們需要能夠做到以下幾點:
- 過濾、轉換和清理數據——原始數據是自然的淩亂,需要清理融入一個定義良好的結構化的格式。例如,解析時間戳字符串的日期/時間類型比較快,過濾損壞數據,嵌套/ unnest運算/壓扁複雜結構更好地組織重要的列,等等。
- 轉換為一個更高效的存儲格式——文本、JSON和CSV數據很容易生成,人類可讀的,但非常昂貴的查詢。將它轉換為更有效的格式像拚花,Avro,或者獸人可以減少文件大小和提高處理速度。
- 通過重要的列對數據進行分區——數據分區基於一個或多個列的值,常見的查詢可以隻回答更有效地通過閱讀相關的分數總數的數據集。
傳統上,ETL執行周期性的批處理作業。例如,實時轉儲的原始數據,然後將其轉換為結構化形式每隔幾個小時,使有效的查詢。我們最初設置係統這種方式,但這種技術產生較高的延遲;我們必須等待幾個小時之前任何見解。對於許多用例,這種延遲是不可接受的。當一些可疑的是發生在一個帳戶,我們需要能夠立即問問題。等待幾分鍾至幾小時可能導致不合理延遲響應事件。
幸運的是,結構化流很容易把這些周期性實時數據管道的批處理作業。流工作表示使用相同的api作為批處理數據。另外,引擎提供相同的容錯和數據一致性保證周期性的批處理作業,同時提供更低的端到端延遲。
在文章的其餘部分,我們深入的細節如何變換AWS CloudTrail審計日誌成為一個高效、分區、拚花數據倉庫。AWS CloudTrail允許我們跟蹤所有執行的操作在不同的AWS帳戶,由交付gzip JSON S3 bucket的日誌文件。這些文件支持多種業務和關鍵任務的情報,比如成本歸因和安全監控。然而,在原來的形式,他們非常昂貴的查詢,即使Apache火花的功能。支持快速的洞察力,我們連續運行應用程序,將原始JSON日誌文件轉換成一個優化的拚花表。就讓我們一探究竟,看看如何寫這個管道。如果你想看到完整的代碼,這是Scala和Python筆記本。進口成磚和運行他們自己。
將原始日誌結構化流
我們開始通過定義基於JSON的模式記錄CloudTrail文檔。
val cloudTrailSchema=新StructType ()。添加(“記錄”,ArrayType (新StructType ()。添加(“additionalEventData StringType)。添加(“apiVersion StringType)。添加(“awsRegion StringType)//…
看到完整的連接筆記本模式。通過這種方法,我們就可以定義一個流DataFrame表示數據流從CloudTrail文件被寫在一個S3 bucket。
val rawRecords = spark.readStream. schema (cloudTrailSchema). json (“s3n: / / mybucket / AWSLogs / * / CloudTrail / * / 2017 / * / *”)
理解的好方法rawRecords
DataFrame代表是首先理解結構化流編程模型。的關鍵思想是治療任何數據流作為一個無界的表:新記錄添加到流就像行被添加到表中。
這讓我們對兩批處理和流數據表。因為表和DataFrames /數據集語義同義詞,相同的批DataFrame /數據集查詢可以應用於批處理和流數據。在這種情況下,我們將改變原始JSON數據,使其更容易使用火花SQL查詢的內置支持操縱複雜嵌套模式。這是一個簡略版的轉換。
val cloudtrailEvents=rawRecords。選擇(爆炸(美元“記錄”)作為的記錄).select (unix_timestamp (“record.eventTime”美元,“yyyy-MM-ddT”hh: mm: ss) .cast(“時間戳”)為“時間戳“記錄。*”美元)
在這裏,我們爆炸
(分裂)記錄的數組從每個文件加載到單獨的記錄。我們還在每個記錄解析字符串事件時間字符串火花的時間戳類型,和平嵌套列以便於查詢。注意,如果cloudtrailEvents
一批DataFrame在一組固定的文件,然後我們就會寫相同的查詢,我們會書麵結果隻有一次parsed.write.parquet (" / cloudtrail”)
。相反,我們將啟動一個StreamingQuery運行不斷將新數據到來。
val streamingETLQuery=cloudtrailEvents.withColumn(“日期”、“時間戳”。投(“日期”)//推導出日期.writeStream。觸發(ProcessingTime(10秒))//檢查為文件每一個10年代.format(“鋪”)//寫作為鑲木地板分區通過日期.partitionBy(“日期”).option(“路徑”、“/ cloudtrail”).option (“checkpointLocation”、“cloudtrail.checkpoint /”)。開始()
這裏我們指定以下配置StreamingQuery開始前。
- 得到的日期時間戳列
- 檢查新文件(即每10秒。觸發間隔)
- 把轉換後的數據解析DataFrame寫成Parquet-formatted表的路徑
/ cloudtrail
。 - 分區表鑲花的日期,以便我們稍後可以有效地查詢數據的時間片;監視應用程序的關鍵需求。
- 在路徑保存檢查點的信息
檢查點/ cloudtrail
對容錯的博客(稍後解釋)
的結構化流模型,這是執行這個查詢的執行。
從概念上講,rawRecords
DataFrame是一個擴展輸入表,cloudtrailEvents
DataFrame是轉換後的結果表。換句話說,當新行添加到輸入(rawRecords
),結果表(cloudtrailEvents
)將有新的改變了行。在這種特殊情況下,每10秒,火花的SQL引擎觸發器新文件的檢查。當找到新的數據(即。,新rows in the Input Table), it transforms the data to generate new rows in the Result Table, which then get written out as Parquet files.
此外,這個流查詢運行時,您可以使用SQL火花同時拚花表查詢。數據流查詢寫拚花事務性,並發交互查詢處理總是會看到最新的數據都是一致的。被稱為這個有力的保障prefix-integrity和它讓結構化流管道和較大的連續應用程序集成。
你可以閱讀更多關於結構化流模型的詳細信息,以及它優於其他流媒體引擎在我們以前的博客。
解決生產問題
早些時候,我們著重介紹了一係列的挑戰,必須解決流ETL管道生產運行。讓我們看看結構化流運行在磚平台解決了他們。Beplay体育安卓版本
隻有一次容錯擔保從故障中恢複
長時間運行的管道必須能夠容忍機器故障。與結構化流,實現容錯一樣容易指定檢查點位置查詢。在前麵的代碼片段,我們做了以下行。
.option (“checkpointLocation”,“cloudtrail.checkpoint /”)
該檢查點目錄是/查詢,查詢是活躍的,火花不斷寫元數據處理的數據到檢查站目錄。即使整個集群失敗,查詢可以重新啟動一個新的集群,使用相同的檢查點目錄,並持續恢複。更具體地說,在新集群,火花使用元數據來啟動新的查詢,沒有一個離開的,從而確保端到端隻有一次擔保和數據一致性(見故障恢複我們之前的博客)。
此外,這個相同的機製允許您升級您的查詢之間重啟,隻要輸入源和輸出模式保持不變。由於火花2.1,我們的檢查點數據的JSON編碼不會過時的兼容性。所以你可以重新啟動查詢即使更新你的火花的版本。在所有情況下,你會得到同樣的容錯和一致性保證。
請注意,磚很容易設置的自動恢複在下一節,我們將展示。
監測、報警和升級
連續應用程序運行順利,那一定是健壯的單個機器或甚至整個集群的失敗。在磚,我們已經開發出與結構化流緊密集成,允許我們連續監視StreamingQueries失敗(並自動重啟。你所要做的就是創建一個新工作,重試策略和配置工作。您還可以配置的工作發送電子郵件來通知你的失敗。
應用程序升級可以很容易地由更新你的代碼和/或火花版本,然後重新啟動工作。看到我們的指導在生產運行結構化流為更多的細節。
機器故障並不是唯一的情況下,我們需要處理,以確保強勁的處理。我們將討論如何監控交通高峰和上遊失敗在後麵的文章中詳細。
結合實時數據與曆史/批處理數據
許多應用程序需要結合曆史/批處理數據實時數據。例如,除了傳入的審計日誌,我們可能已經大量待處理的日誌等待轉換。理想情況下,我們想要實現這兩個目標,盡快交互式查詢最新的數據,也可以訪問曆史數據對未來的分析。通常是複雜的設置等管道使用大多數現有係統你要設置倍數過程:將曆史數據的批處理作業,將實時數據流管道,也許另一個步驟合並結果。
結構化流消除了這一挑戰。您可以配置上麵的查詢優化處理新的數據文件的到來,在使用空間集群處理舊文件的能力。首先,我們設置的選項latestFirst
的文件源為true,所以新文件處理。然後,我們設置了maxFilesPerTrigger
每次限製多少文件來處理。這曲調更頻繁地更新下遊數據倉庫的查詢,所以,最新的數據用於查詢盡快。在一起,我們可以定義rawLogs
DataFrame如下:
val rawJson = spark.readStream. schema (cloudTrailSchema).option (“latestFirst”,“真正的”).option (“maxFilesPerTrigger”,“20”). json (“s3n: / / mybucket / AWSLogs / * / CloudTrail / * / 2017/01 / *”)
通過這種方式,我們可以編寫一個查詢,很容易將實時數據與曆史數據,在保證低延遲、效率和數據一致性。
結論
結構化流在Apache火花是最好的框架編寫流ETL管道和磚便於運行它們的生產規模,正如我們上麵了。我們分享的高層次概述steps-extracting,轉換,加載,最後查詢設置流ETL生產流水線。我們還討論和演示了如何解決結構化流克服了挑戰和設置生產高容量和低延遲流管道。
在未來的博客文章在本係列中,我們將介紹如何解決其他障礙,包括:
- 將複雜的轉換應用到嵌套的JSON數據
- 處理數據在Apache卡夫卡與結構化流在Apache 2.2火花
- 流媒體應用程序監控
- 將結構化流與Apache卡夫卡
- 計算事件時間聚合與結構化流
- 每天運行流作業一次10 x節省成本
如果你想了解更多關於結構化流,這裏有一些有用的鏈接。
- 以前的博客帖子解釋的動機和概念結構化流:
- 結構化流編程指南Apache 2.1火花
- 引發2016年峰會討論,深入了解結構化流
接下來是什麼
你可以自己試著兩個筆記本AWS CloudTrail日誌。導入筆記本成磚。