結構化流媒體教程

傳感器、物聯網設備、社交網絡和在線交易都會產生數據,需要持續監控並迅速采取行動。因此,對大規模實時流處理的需求比以往任何時候都更加明顯。本教程模塊介紹結構化流,這是Apache Spark中處理流數據集的主要模型。在結構化流中,數據流被視為不斷追加的表。這導致流處理模型非常類似於批處理模型。您將流計算表示為靜態表上的標準批處理查詢,但Spark將其作為無界輸入表上的增量查詢運行。

結構化流流程

將輸入數據流視為輸入表。流中到達的每個數據項就像添加到輸入表中的新行。

結構化流模型

對輸入的查詢生成一個結果表。在每個觸發間隔(例如,每1秒),新行被追加到輸入表,最終更新結果表。無論何時更新結果表,更改後的結果行都被寫入外部接收器。輸出被定義為寫入外部存儲的內容。輸出可以配置在不同的模式:

  • 完整的模式:將整個更新後的結果表寫入外部存儲。由存儲連接器決定如何處理整個表的寫入。

  • Append模式:隻有在最後一個觸發器之後追加到結果表中的新行才被寫入外部存儲。這僅適用於結果表中現有行不希望更改的查詢。

  • 更新模式:隻有自上次觸發器以來在結果表中更新的行被寫入外部存儲。這與完全模式不同,因為更新模式隻輸出自上次觸發器以來更改的行。如果查詢不包含聚合,則它相當於追加模式。

在本教程模塊中,您將學習如何:

我們還提供樣的筆記本您可以導入它來訪問和運行模塊中包含的所有代碼示例。

加載樣例數據

開始使用結構化流的最簡單方法是使用數據庫中可用的示例Databricks數據集/ databricks-datasets在Databricks工作區中可訪問的文件夾。Databricks中有作為文件的樣例事件數據/ / databricks-datasets / structured-streaming /事件用於構建結構化流應用程序。讓我們看一下這個目錄的內容。

事件數據集

文件中的每一行都包含一個JSON記錄,包含兩個字段:時間而且行動

“時間”1469501675“行動”“開放”“時間”1469501678“行動”“關閉”“時間”1469501680“行動”“開放”“時間”1469501685“行動”“開放”“時間”1469501686“行動”“開放”“時間”1469501689“行動”“開放”“時間”1469501691“行動”“開放”“時間”1469501694“行動”“開放”“時間”1469501696“行動”“關閉”“時間”1469501702“行動”“開放”“時間”1469501703“行動”“開放”“時間”1469501704“行動”“開放”

初始化流

由於示例數據隻是一組靜態文件,您可以通過按創建文件的時間順序每次讀取一個文件來模擬來自這些文件的流。

pyspark.sql.types進口pyspark.sql.functions進口inputPath“/ databricks-datasets / structured-streaming /事件/”#定義模式以加速處理jsonSchemaStructType([StructField“時間”TimestampType(),真正的),StructField“行動”StringType(),真正的])streamingInputDF火花readStream模式jsonSchema#設置JSON數據的模式選項“maxFilesPerTrigger”1#每次選擇一個文件,將一個文件序列視為一個流格式“json”負載inputPathstreamingCountsDFstreamingInputDFgroupBystreamingInputDF行動窗口streamingInputDF時間“1小時”))()

啟動流作業

您可以通過定義接收器並啟動它來開始流計算。在本例中,要以交互方式查詢計數,請設置完整的在內存表中設置1小時計數。

查詢streamingCountsDFwriteStream格式“記憶”# memory =存儲內存中的表(僅用於測試)queryName“計數”# counts =內存中表的名稱outputMode“完整的”# complete =所有計數都應該在表中開始()

查詢流查詢的句柄是否命名計數它在後台運行。該查詢不斷地拾取文件並更新窗口計數。

命令窗口報告流的狀態:

流狀態

當你展開的時候計數,你會得到一個儀表板,顯示處理的記錄數量、批統計數據和聚合的狀態:

流指示板

交互式地查詢流

可以定期查詢計數聚合:

sql選擇行動date_format窗口結束“MMM-dd HH: mm”作為時間計數訂單通過時間行動

從這一係列屏幕截圖中可以看到,查詢在每次執行時都會更改,以反映基於輸入數據流的操作計數。

流更新1
流更新2
流更新3

筆記本

要訪問這些代碼示例和更多內容,請導入以下筆記本。有關更多結構化流的示例,請參見什麼是Apache Spark結構化流?

Apache Spark結構化流Python筆記本

在新標簽頁打開筆記本