跳轉到主要內容
工程的博客

火花結構化流

分享這篇文章

Apache火花的第一個版本2.0增加了一個新的更高級的API,結構化流,為構建連續應用程序。的主要目標是使其更容易構建端到端流媒體應用程序集成存儲,服務係統和批處理作業一致和容錯。在這篇文章中,我們解釋了為什麼這是很難做到與當前分布式流媒體引擎,並介紹結構化流。

為什麼流是困難的

乍一看,構建一個分布式流媒體引擎可能看起來簡單推出一組服務器和推動它們之間的數據。不幸的是,分布式流處理遇到的多種並發症不影響簡單計算就像批處理作業。

首先,考慮一個簡單的應用程序:我們收到(phone_id、時間、動作)事件從一個移動應用程序,並且想要數數有多少行動每小時發生的每種類型,然後在MySQL存儲結果。如果我們運行這個應用程序作為批處理作業,有一個表的所有輸入事件,我們可以將其表示為以下SQL查詢:

選擇行動,窗口(時間“1小時”),(*)事件集團通過行動,窗口(時間,“1小時”)

在分布式流媒體引擎,我們可以設置節點處理數據在“使用映射-規約模式”模式,如下所示。第一層中的每個節點讀取輸入數據的分區(說,手機的流從一組),然後散列的事件(行動,小時)將它們發送到減速機節點,追蹤集團的統計,定期更新MySQL。

image00

不幸的是,這種類型的設計可以帶來不少挑戰:

  1. 一致性:這個分布式設計會導致記錄處理係統的一部分之前,他們在另一個處理,導致荒謬的結果。例如,假設應用程序發送一個“開放”的事件,當用戶打開它,和一個“關閉”事件關閉。如果減速機節點負責“開放”是低於一個“關閉”,我們可能會看到一個“關閉”的總數量高於在MySQL中“打開”,這將沒有意義。上圖實際上顯示了一個這樣的例子。
  2. 容錯:如果一個映射器或還原劑失敗?減速機不應該算一個動作在MySQL中兩次,但是應該知道如何請求時舊的數據映射器。流引擎經過大量的麻煩提供強大的語義,至少引擎。在許多引擎,然而,保持一致的結果在外部存儲留給用戶。
  3. 無序的數據:在現實世界中,來自不同數據源的數據可以出來的秩序:例如,一個電話可能上傳數據小時如果是覆蓋。隻是寫減速器運營商承擔數據到達時間字段的順序將不工作,他們需要準備接收無序的數據,並相應地更新導致MySQL。

在最新的流媒體係統中,部分或所有這些擔憂都是留給用戶。這是不幸的,因為這些問題————應用程序與外部世界的交互————一些最難的思考和正確的。特別是,沒有簡單的方法來獲得語義簡單上麵的SQL查詢。

結構化流模型

在結構化流,我們直麵問題的語義係統通過一個強有力的保證:在任何時候,應用程序的輸出相當於執行一個批處理作業數據的前綴。例如,在我們的監視應用程序,結果表總是會在MySQL中相當於一個前綴的每個手機的更新流(無論數據進入係統到目前為止)和運行我們上麵顯示的SQL查詢。永遠不會有“開放”事件計算速度比“關閉”事件,重複更新失敗,等。結構化流自動處理一致性和可靠性在引擎和與外部係統的交互(如以事務的方式更新MySQL)。

前綴的完整性保證很容易推斷我們確定的三個挑戰。特別是:

  1. 輸出表總是一致的與所有的記錄數據的前綴。例如,隻要每個電話作為連續流上傳數據(例如,相同的分區在Apache卡夫卡),我們將永遠過程和計數的事件順序。
  2. 容錯是由結構化流整體處理,包括在交互輸出下沉。這是一個支持的主要目標連續應用程序
  3. 的影響無序的數據是明確的。我們知道工作輸出計數按行動和時間分組流的一個前綴。如果我們以後得到更多的數據,我們可能會看到一個時間字段一個小時過去,我們會更新在MySQL中各自的行。結構化流api還支持過濾過於舊數據,如果用戶想要的。但從根本上說,無序的數據並不是一個“特殊情況”:查詢說group by時間字段,並看到一個古老的時間並不比看到一個不同的重複動作。

結構化流的最後一個好處是,API是非常易於使用:它隻是火花DataFrame和數據集API。用戶描述查詢他們想要運行,輸入和輸出位置和選擇更多的細節。然後係統運行查詢增量,保持足夠的狀態從失敗中恢複過來,保持一致的結果在外部存儲,等等。例如,如何寫我們的流媒體監控應用程序:

//連續讀取數據一個S3的位置val inputDF=spark.readStream.json (s3: / /日誌)//做操作使用標準DataFrame APIMySQLinputDF.groupBy($“行動”,窗口($“時間”,“1小時”))。().writeStream.format (jdbc)開始(" jdbc: mysql / /…”)

下麵這段代碼是幾乎相同的批版——隻有“讀”和“寫”改變:

/ /讀取數據從一個S3的位置val inputDF = spark.read.json (“s3: / /日誌”)/ /操作使用標準DataFrame API和MySQL編寫inputDF.groupBy ($“行動”,窗口(美元)“時間”,“1小時”).count ().writeStream.format (“jdbc”).save (“jdbc: mysql / /…”)

下一小節將詳細說明該模型,以及API。

模型的細節

從概念上講,結構化流將到達的所有數據視為一個無界的輸入表。流中的每個新項目就像一行添加到輸入表。我們不會保留所有輸入,但我們的研究結果將相當於所有它並運行一個批處理作業。
image01
開發人員定義了一個查詢在此輸入表,就好像它是一個靜態表,計算最後一個結果表將寫入到輸出下沉。火花自動將這批查詢轉換為流執行計劃。這就是所謂的incrementalization:引發出狀態需要維護數據更新結果每次記錄的到來。最後,開發人員指定觸發器控製何時更新結果。每次引發火災,引發檢查新數據輸入表(新行),並逐步更新結果。

結構化模型

模型的最後一部分輸出模式。每次更新結果表,開發人員想寫更改外部係統,如S3, HDFS,或數據庫。我們通常想寫輸出增量。為此,結構化流提供了三種輸出模式:

  • 附加:隻有新行添加到結果表自上次觸發將寫入到外部存儲。這是隻適用於查詢結果表中現有的行不能改變一個輸入流(如地圖)。
  • 完整的:整個更新結果表將被寫入外部存儲器。
  • 更新:隻有結果表中的行,更新自上次引發外部存儲器將會改變。這種模式適用於輸出下沉,可以更新到位,比如MySQL表。

讓我們看看我們移動監視應用程序可以運行在這個模型。我們的批量查詢是計算一個計數的行動(行動,小時)分組。逐步運行該查詢,火花將保持一些州的數量每一對迄今為止,和更新新記錄到達時。為每一個記錄改變,它將輸出數據根據其輸出模式。下圖顯示了該執行使用更新輸出模式:

在每一個觸發點,我們把之前的分組數,用新數據更新到自上次引發新結果表。我們僅僅發出需要的變化輸出模式,水槽,在這兒,我們更新記錄(行動,小時)對改變在觸發MySQL(紅色所示)。

注意,係統也會自動處理數據。在上圖中,phone3的“開放”的事件,發生在1:58打電話,隻能係統在秒。盡管如此,即使是過去的兩點,我們在MySQL一點更新記錄。然而,前綴完整性保證在結構流確保我們從每個源過程記錄他們的順序到達。例如,因為phone1“關閉”事件到“開放”事件後,我們將永遠更新“開放”的數量在我們更新的“關閉”。

故障恢複和存儲係統的要求

結構化流使其結果有效,即使機器失敗。要做到這一點,它把兩個要求輸入源和輸出彙:

  1. 輸入源必須可複製最近,所以數據可以重讀如果工作崩潰。例如,像亞馬遜這樣的消息總線運動和Apache卡夫卡是可複製,文件係統的輸入源。隻有幾分鍾的數據需要保留;結構化流將保持自己的內部狀態。
  2. 輸出水槽必須支持事務更新,這樣係統可以使一組記錄自動出現。當前版本的結構化下沉流媒體實現了這個文件,我們還計劃將其添加為常見的數據庫和鍵值存儲。

我們發現大多數火花應用程序已經在使用水槽與這些性質和來源,因為用戶希望自己的工作是可靠的。

除了這些需求,結構化流將管理其內部狀態在一個可靠的存儲係統,如S3或者HDFS,存儲數據,如運行在我們的例子中。鑒於這些屬性,結構完整的端到端流將執行前綴。

結構化流API

結構化流集成到引發的數據集和DataFrame api;在大多數情況下,你隻需要添加一些方法調用運行流計算。它還增加了新的運營商為窗口的聚合和設置參數執行模型(如輸出模式)。在Apache 2.0火花,我們已經建立了一個alpha版本係統的核心api。更多的運營商,如sessionization,會在將來的版本中。

API基礎知識

流在結構化流表示為DataFrames或數據集isStreaming屬性設置為true。您可以創建使用特殊的閱讀方法從各種來源。例如,假設我們想要在我們的監視應用程序讀取數據從JSON文件上載到Amazon S3。下麵的代碼顯示了如何做到這一點在Scala中:

val inputDF = spark.readStream.json (“s3: / /日誌”)

我們的結果DataFrame inputDF,是我們輸入表,將不斷擴展與新行新文件添加到目錄中。表有兩列,時間和行動。現在你可以使用普通的DataFrame /數據集操作轉換數據。在我們的示例中,我們希望每小時計數操作類型。要做到這一點,我們必須在行動組數據窗口和1小時的時間。

val countsDF=inputDF.groupBy($“行動”,窗口($“時間”,“1小時”))()

新DataFrame countsDF是我們的結果表,列行動,窗口,和計數,並將不斷更新查詢時開始。注意,這一轉變將給每小時計數即使inputDF靜態表。這允許開發人員測試他們的業務邏輯對靜態數據集和無縫應用流數據而不改變邏輯。

最後,我們告訴引擎寫這個表水槽,開始流計算。

= countsDF.writeStream val查詢。格式(“jdbc”).start (“jdbc: / /……”)

返回的查詢StreamingQuery,處理到活動執行流和可用於管理和監控執行。

除了這些基礎知識外,還有更多的操作可以通過結構化流。

映射、過濾和聚合

結構化流程序可以使用DataFrame轉換數據和數據集的現有方法,包括地圖、過濾、選擇、和其他人。此外,運行(或無限)聚合,如從一開始的時候,都可以通過現有的api。這就是我們在監視應用程序中使用。

事件時間窗口的聚合

流媒體應用程序經常需要計算各種類型的數據窗戶,包括滑動窗口相互重疊(如一個小時窗口,進步每5分鍾),和翻滾的窗戶,不(如每小時)。在結構化流,窗口隻是表示成一個group by。每個輸入事件可以被映射到一個或多個窗口,並簡單地更新一個或多個結果表行。

Windows可以指定在DataFrames使用窗口函數。例如,我們可以改變我們的監測工作數行為通過滑動窗口如下:

inputDF.groupBy($“行動”,窗口($“時間”,“1小時”、“5分鍾”))()

而我們之前的表單的應用程序輸出結果(小時、行動、計數),這種新形式的一個將輸出結果(窗口、動作數),如(“1:10-2:10”、“開放”,17)。如果遲到記錄到達,我們將更新所有相應的MySQL的窗戶。不像在其他係統中,窗口不僅是一個特殊的操作流計算;我們可以運行相同的代碼在一個批處理作業進行分組數據以同樣的方式。

窗口的集合是一個領域,我們將繼續擴大結構化流。特別是,在火花2.1中,我們計劃增加水印下降的功能過於舊數據當足夠的時間過去了。沒有這種類型的功能,所有舊的係統可能需要跟蹤狀態窗口,這將不是應用程序運行的規模。此外,我們計劃添加支持基於會話的窗口,即事件從一個來源為變長分組會議根據業務邏輯。

加入與靜態數據流

因為結構化流僅僅使用DataFrame API,它是直接加入流對靜態DataFrame,比如Apache蜂巢表:

//數據每一個客戶一個靜態“beplay体育app下载地址顧客”,//然後加入流DataFrameval beplay体育app下载地址customersDF=spark.table(“beplay体育app下载地址客戶”)inputDF。加入(客戶sDF, "customer_id").groupBy ($“customer_name”,小時(“時間”)()

此外,靜態DataFrame本身可以計算使用火花查詢,讓我們混批處理和流計算。

交互式查詢

結構化流可以公開結果直接通過火花的JDBC服務器交互查詢。在火花2.0中,有一個基本的“記憶”輸出水槽為此不為大量數據而設計的。然而,在將來的版本中,這將讓你寫一個內存中的火花SQL表查詢結果,並直接運行查詢。

//拯救我們之前統計查詢一個- - - - - -內存countsDF.writeStream.format(“記憶”).queryName(“計數”).outputMode(“完整的”)開始()//然後任何線程可以查詢使用SQLsql(“選擇總和(數量)從計數action =登錄”)

比較與其他引擎

展示獨特的結構化流,下一個表比較它與其他幾個係統。正如我們之前所討論的,結構化流強勁的前綴完整性的保證使它相當於批作業,容易融入更大的應用程序。此外,基於火花使集成批處理和交互式查詢。

streaming-engine-comparison

結論

結構化流將是一個更簡單的模型構建端到端實時應用程序,建立在功能效果最好火花流。雖然結構化流是在αApache 2.0火花,我們希望這篇文章鼓勵你嚐試一下。

長期的,很像DataFrame API,我們預計結構化流補充火花流媒體通過提供更多的限製但高層次的接口。如果您正在運行火花流今天,不要擔心————將繼續支持。但我們相信,結構化流可以打開更多的用戶的實時計算。

在磚結構流也完全支持,包括自由磚社區版

閱讀更多

此外,以下資源覆蓋結構化流:

免費試著磚
看到所有工程的博客的帖子
Baidu
map