跳轉到主要內容
工程的博客

引入Stream-Stream連接在Apache 2.3火花

現在可以在磚4.0運行時
分享這篇文章

因為我們介紹結構化流Apache 2.0火花,它支持連接(內連接和某種類型的外部連接)流和靜態DataFrame /數據集。的釋放Apache火花tripwire,現在可以在磚4.0運行時磚的一部分統一的分析平台Beplay体育安卓版本現在,我們支持stream-stream連接。在這篇文章中,我們將探討如何使用stream-stream連接規範,挑戰我們解決,他們支持什麼類型的工作負載。讓我們先從stream-stream加入典型用例——廣告盈利。

Stream-Stream加入的理由:廣告盈利

想象你有兩個流——一個流(即廣告印象。,when an advertisement was displayed to a user) and another stream of ad clicks (i.e., when the displayed ad was clicked by the user). To monetize the ads, you have to match which ad impression led to a click. In other words, you need to join these streams based on a common key, the unique identifier of each ad that is present in events of both streams. At a high-level, the problem looks like as follows.

雖然這是概念上的一個簡單的想法,有一些核心技術挑戰需要克服。

  1. 後期的處理與緩衝/延遲數據:印象事件及其相應的點擊事件可能無序與它們之間任意延遲到達。因此,流處理引擎必須占這些適當的緩衝延遲直到他們匹配。即使所有連接(靜態或流)可以使用緩衝區,真正的挑戰是避免緩衝區從無限製增長。
  2. 限製緩衝區大小:唯一的方法來限製流加入緩衝區的大小是由下降延遲超過某個閾值的數據。這個最大延遲閾值應該配置由用戶根據業務需求和係統資源限製之間的平衡。

  3. 定義良好的語義:保持一致的SQL加入語義之間的靜態連接和流連接,有或沒有上述閾值。

我們已經解決了所有這些挑戰stream-stream連接。作為結果,你可以表達你的計算使用SQL連接的明確的語義,以及控製之間的延遲容忍事件有關。讓我們來看看。

首先我們假設這些流是兩個不同的卡夫卡的話題。您將定義流DataFrames如下:

印象= (#模式-去:字符串,impressionTime:時間戳,…火花.readStream格式(“卡夫卡”).option (“訂閱”,“印象”).load ())
              點擊= (#模式-去:字符串,clickTime:時間戳,…火花.readStream格式(“卡夫卡”).option (“訂閱”,“點擊”).load ())

然後你需要做內部相等連接如下。

impressions.join(點擊,“去”)DataFrames #去是很常見的

正如所有結構化流查詢,這個代碼是完全一樣你會如果DataFrames寫的印象點擊定義靜態數據。執行該查詢時,結構化流引擎將緩衝區的點擊和印象流狀態。為特定的廣告,盡快加入輸出將生成相關的事件都收到了(也就是說,一旦收到第二個事件)。作為數據到達時,加入輸出將生成增量和書麵查詢下沉(例如一個卡夫卡的話題)。

最後,累積的結果不會加入不同的連接查詢被應用於兩個靜態數據集(即語義相同的SQL連接)。事實上,這將是相同的即使提出了一個流,另一個作為一個靜態數據集。然而,在這個查詢,我們沒有給出任何提示多久引擎應該緩衝事件找到匹配。因此,發動機可能永遠緩衝事件,積累一個無限數量的流狀態。讓我們來看看我們可以提供額外的信息查詢中限製狀態。

管理負責Stream-Stream流連接

限製流狀態由stream-stream連接,你需要知道以下信息關於你的用例:

  1. 什麼是代這兩個事件之間的時間範圍在各自的來源嗎?在我們的用例中,我們假設一個點擊後0秒到1小時內可能發生相應的印象。
  2. 最多時間事件可以被推遲在交通源和處理引擎?例如,廣告點擊從瀏覽器可能會推遲由於斷斷續續的連接和更晚到達和比預期的無序。假設,印象和點擊最多可以推遲2和3小時,分別。

這些時間限製為每一個事件,事件處理引擎可以自動計算多久需要緩衝生成正確的結果。例如,它將評估以下。

  1. 印象需要緩衝最多4小時(事件時間)3-hour-late點擊可能與4小時前(即一個印象。3-hour-late +高達1小時延遲之間的印象並單擊)。
  2. 相反,點擊需要緩衝最多2小時(事件時間)2-hour-late印象可能與點擊收到2個小時前。

因此,引擎可以從流掉舊的印象和點擊確定時的緩衝事件預計不會得到任何未來的比賽。

在高級動畫演示了如何更新水印事件時間和清理。

玩這個視頻,請點擊這裏,接受餅幹

這些時間限製可以查詢中編碼的水印和時間範圍與條件。

  • 水印:水印在結構化流是一種極限狀態的所有狀態流操作通過指定多少後期數據需要考慮。具體地說,在事件時間水印是一個移動的閾值,落後的最大事件時間被處理過的數據查詢。落後的差距(又名水印延遲)定義了引擎應該多長時間等待數據到達和後期在查詢中指定使用withWatermark。更詳細地了解它在我們以前的博客文章流媒體聚合。stream-stream內部連接,您可以有選擇地指定水印延遲但你必須指定限製所有國家都流。
  • 時間範圍條件:這種聯接條件限製其他事件,每個事件的時間範圍可以加入反對。這個可以指定兩種方式之一:

    • 時間範圍聯接條件(例如…加入rightTime之間leftTime rightTime +間隔1小時),
    • 加入在事件時間窗口(例如…加入leftTimeWindow = rightTimeWindow)。

在一起,我們對廣告的內連接貨幣化是這樣的。

pyspark.sql。進口expr函數#定義水印impressionsWithWatermark=印象\.selectExpr(“去impressionAdId”,“impressionTime”) \.withWatermark (“impressionTime”,“10秒”)#馬克斯10秒末clicksWithWatermark=點擊\.selectExpr(“去clickAdId”,“clickTime”) \.withWatermark (“clickTime”、“20秒”)#馬克斯20.秒末
              #內心的加入時間範圍條件impressionsWithWatermark.join (clicksWithWatermark,expr (“””clickAdId = impressionAdId和clickTime > = impressionTime和clickTime

,前麵提到的引擎將自動計算狀態限製放舊相應事件。在結構化流都有狀態,檢查點保證您得到隻有一次容錯擔保。

這是一個查詢的截圖在磚筆記本的運行與這篇文章有關。注意第三個圖,記錄查詢狀態,一段時間後變得平緩指示國家清理由水印清理舊數據。

使用Stream-Stream外部連接

早期的內連接輸出隻有那些廣告收到了這兩件事。換句話說,廣告,收到沒有點擊不會報告。相反,您可能希望所有廣告報道,有或沒有相關的點擊數據,使額外的分析後(如點擊利率)。這給我們帶來了stream-stream外部連接。所有您需要做的就是指定連接類型。

pyspark.sql。進口expr函數
              #加入時間範圍條件impressionsWithWatermark.join (clicksWithWatermark,expr (“””clickAdId = impressionAdId和clickTime > = impressionTime和clickTime

如預期的外部連接,這個查詢將開始生成輸出每一個印象,有或沒有(即。點擊數據,使用null)。然而,外部連接有一些額外的注意事項。

  • 與內部連接、水印和事件時間約束不是可選的外部連接。這是因為生成零結果,發動機必須知道事件時不會與別的。因此,水印和事件時間約束必須指定為啟用狀態過期和生成正確的外連接的結果。
  • 因此,外部零結果將生成與延遲引擎必須等待一段時間,以確保沒有而且也不會被任何比賽。這種延遲是最大緩衝時間(關於事件時間)計算每個事件的引擎正如前麵討論的部分(即。4小時和2小時點擊)。

進一步的閱讀

全部細節在支持其他類型的連接和查詢限製看一看結構化流編程指南。更多信息在結構化流其他有狀態操作,看看以下幾點:

免費試著磚

相關的帖子

看到所有工程的博客的帖子
Baidu
map