結構化流媒體教程
傳感器、物聯網設備、社交網絡和在線交易都會產生數據,需要持續監控並迅速采取行動。因此,對大規模實時流處理的需求比以往任何時候都更加明顯。本教程模塊介紹結構化流,這是Apache Spark中處理流數據集的主要模型。在結構化流中,數據流被視為不斷追加的表。這導致流處理模型非常類似於批處理模型。您將流計算表示為靜態表上的標準批處理查詢,但Spark將其作為無界輸入表上的增量查詢運行。
將輸入數據流視為輸入表。流中到達的每個數據項就像添加到輸入表中的新行。
對輸入的查詢生成一個結果表。在每個觸發間隔(例如,每1秒),新行被追加到輸入表,最終更新結果表。無論何時更新結果表,更改後的結果行都被寫入外部接收器。輸出被定義為寫入外部存儲的內容。輸出可以配置在不同的模式:
完整的模式:將整個更新後的結果表寫入外部存儲。由存儲連接器決定如何處理整個表的寫入。
Append模式:隻有在最後一個觸發器之後追加到結果表中的新行才被寫入外部存儲。這僅適用於結果表中現有行不希望更改的查詢。
更新模式:隻有自上次觸發器以來在結果表中更新的行被寫入外部存儲。這與完全模式不同,因為更新模式隻輸出自上次觸發器以來更改的行。如果查詢不包含聚合,則它相當於追加模式。
在本教程模塊中,您將學習如何:
我們還提供樣的筆記本您可以導入它來訪問和運行模塊中包含的所有代碼示例。
加載樣例數據
開始使用結構化流的最簡單方法是使用數據庫中可用的示例Databricks數據集/ databricks-datasets
在Databricks工作區中可訪問的文件夾。Databricks中有作為文件的樣例事件數據/ / databricks-datasets / structured-streaming /事件
用於構建結構化流應用程序。讓我們看一下這個目錄的內容。
文件中的每一行都包含一個JSON記錄,包含兩個字段:時間
而且行動
.
{“時間”:1469501675,“行動”:“開放”}{“時間”:1469501678,“行動”:“關閉”}{“時間”:1469501680,“行動”:“開放”}{“時間”:1469501685,“行動”:“開放”}{“時間”:1469501686,“行動”:“開放”}{“時間”:1469501689,“行動”:“開放”}{“時間”:1469501691,“行動”:“開放”}{“時間”:1469501694,“行動”:“開放”}{“時間”:1469501696,“行動”:“關閉”}{“時間”:1469501702,“行動”:“開放”}{“時間”:1469501703,“行動”:“開放”}{“時間”:1469501704,“行動”:“開放”}
初始化流
由於示例數據隻是一組靜態文件,您可以通過按創建文件的時間順序每次讀取一個文件來模擬來自這些文件的流。
從pyspark.sql.types進口*從pyspark.sql.functions進口*inputPath=“/ databricks-datasets / structured-streaming /事件/”#定義模式以加速處理jsonSchema=StructType([StructField(“時間”,TimestampType(),真正的),StructField(“行動”,StringType(),真正的)])streamingInputDF=(火花.readStream.模式(jsonSchema)#設置JSON數據的模式.選項(“maxFilesPerTrigger”,1)#每次選擇一個文件,將一個文件序列視為一個流.格式(“json”).負載(inputPath))streamingCountsDF=(streamingInputDF.groupBy(streamingInputDF.行動,窗口(streamingInputDF.時間,“1小時”)).數())
啟動流作業
您可以通過定義接收器並啟動它來開始流計算。在本例中,要以交互方式查詢計數,請設置完整的在內存表中設置1小時計數。
查詢=(streamingCountsDF.writeStream.格式(“記憶”)# memory =存儲內存中的表(僅用於測試).queryName(“計數”)# counts =內存中表的名稱.outputMode(“完整的”)# complete =所有計數都應該在表中.開始())
查詢
流查詢的句柄是否命名計數
它在後台運行。該查詢不斷地拾取文件並更新窗口計數。
命令窗口報告流的狀態:
當你展開的時候計數
,你會得到一個儀表板,顯示處理的記錄數量、批統計數據和聚合的狀態:
交互式地查詢流
可以定期查詢計數
聚合:
%sql選擇行動,date_format(窗口.結束,“MMM-dd HH: mm”)作為時間,數從計數訂單通過時間,行動
從這一係列屏幕截圖中可以看到,查詢在每次執行時都會更改,以反映基於輸入數據流的操作計數。
筆記本
要訪問這些代碼示例和更多內容,請導入以下筆記本。有關更多結構化流的示例,請參見什麼是Apache Spark結構化流?.