現代工業物聯網對Azure的分析——第2部分
2020年8月11日, 在公司博客上
介紹
在第1部分係列的現代工業物聯網(物聯網)對Azure的分析,我們走過了大數據用例和現代IIoT的目標分析,共享一個真實的可重複的建築在使用組織部署IIoT規模和探索的好處三角洲格式為每個現代IIoT分析所需的數據湖功能。
讀數據Lakehouse上升探索為什麼lakehouses未來的數據架構和數據倉庫的父親,Bill Inmon。
部署
我們使用Azure的覆盆子π物聯網機器的傳感器讀數模擬器來模擬實時,送到Azure物聯網中心。
數據攝取:Azure物聯網數據中心湖
我們的部署傳感器讀數的天氣(風速、風向、溫度、濕度)和風力渦輪機遠程信息處理(角和RPM)發送到一個物聯網雲計算中心。Azure磚可以直接從物聯網中心本地流數據到一個三角洲表ADLS和顯示數據的輸入和處理速率。
#讀直接從物聯網中心使用EventHubs庫Azure磚iot_stream = (spark.readStream。格式(“eventhubs”)#從物聯網中心直接讀取.options (* * ehConf)#使用Event-Hub-enabled連接字符串.load ()#加載數據.withColumn (“閱讀”,F.from_json (F.col (“身體”).cast (“字符串”),模式))#提取消息的有效負載.select (“閱讀。*”F.to_date (“reading.timestamp”).alias (“日期”))#為分區創建一個“日期”字段)#我們的物聯網中心流分割成獨立的溪流和把它們寫進自己的三角洲地區write_turbine_to_delta = (iot_stream。過濾器(溫度是空的)#過濾渦輪遙測與其他流.select (“日期”,“時間戳”,“的deviceId”,“轉”,“角”)#提取感興趣的領域.writeStream。格式(“δ”)#我們的河流三角洲的格式寫.partitionBy (“日期”)#分區數據按日期的性能.option (“checkpointLocation”ROOT_PATH +“/銅/ cp /渦輪”)#檢查點所以我們可以重啟流優雅.start (ROOT_PATH +“/銅/數據/ turbine_raw”)#流數據到一個ADLS路徑)
三角洲允許查詢我們的物聯網數據在幾秒鍾內被抓獲在物聯網中心。
%sql——我們可以立即查詢數據直接從存儲流到三角洲選擇*從delta.”/tmp/iiot/青銅/數據/turbine_raw”在哪裏的deviceid=“WindTurbine-1”
我們現在可以建立一個豐富和骨料的下遊管道IIoT應用程序數據進行數據分析。
數據存儲和處理:Azure磚和三角洲湖
三角洲支持一種管道數據工程,方法,數據質量和增加聚合,通過管道流。我們的時間序列數據將流經下麵的青銅,白銀和黃金數據的水平。
我們的管道從青銅銀會總渦輪傳感器數據間隔為1小時。我們將執行一個流合並命令來插入聚合記錄到我們的銀三角洲表。
#創建功能來合並渦輪和天氣數據成他們的目標三角洲表def merge_records(增量,target_path):incremental.createOrReplaceTempView(“增量”)#合並由的一個目標表,一個源表(增量),#一個加入關鍵來識別匹配(的deviceid time_interval),和操作來執行#(更新,插入,刪除)當一個匹配發生或不incremental._jdf.sparkSession ()。sql(f”“”並入turbine_hourly t使用增量i.date = t。日期和i.deviceId = t.deviceid和我。time_interval = t.time_interval當匹配更新設置*當不匹配插入*”“”)
#執行流合並成我們的數據流turbine_stream=(spark.readStream.format (“δ”)。表(“turbine_raw”)#讀取數據作為一個流從我們的消息來源δ表.groupBy (“的deviceId”,“日期”F.window (“時間戳”,“1小時”)#總讀數來每小時間隔.agg ({“rpm”:“avg”、“角”:“avg”}).writeStream.foreachBatch (merge_records) #每一個微- - - - - -批處理來一個函數.outputMode(“更新”)#合並作品與更新模。開始())
我們的管道從白銀和黃金將加入兩個流組合進一個表每小時天氣和渦輪測量。
#從三角洲銀表讀取流turbine_hourly = spark.readStream。格式(“δ”).option (“ignoreChanges”,真正的).table (“turbine_hourly”)weather_hourly = spark.readStream。格式(“δ”).option (“ignoreChanges”,真正的).table (“weather_hourly”)#執行流加入豐富的數據turbine_enriched = turbine_hourly。加入(weather_hourly [“日期”,“time_interval”])#執行流媒體數據流合並到我們的黃金merge_gold_stream = (turbine_enriched.writeStream.foreachBatch (merge_records).start ())
我們可以立即查詢我們的黃金三角洲表。
筆記本還包含一個細胞將生成曆史每小時閱讀和日常維護日誌,將用於訓練模型。細胞的運行:
- 回填曆史turbine_enriched表讀數為1年
- 生成曆史功率讀數power_output表中的每個渦輪機
- 生成曆史維護日誌turbine_maintenance表中的每個渦輪機
我們現在有豐富,人工智能(AI)準備好數據的高性能、可靠的格式在蔚藍的湖,可以送入我們的數據科學建模優化資產利用率。
%sql——查詢所有3表在一起創建或取代視圖gold_readings作為選擇r。*,p.power,m.maintenance作為維護從turbine_enriched r加入turbine_power p在(r.date=p.date和r.time_interval=p.time_interval和r.deviceid=p.deviceid)左加入turbine_maintenance米在(r.date=m.date和r.deviceid=m.deviceid);選擇*從gold_readings
我們的數據工程管道完成!數據已經從物聯網中心流向青銅(生)銀(聚合)金(豐富)。是時候執行一些對我們的數據的分析。
總結
總而言之,我們已經成功:
- 攝入從現場設備實時IIoT數據到Azure
- 直接執行複雜時間序列處理數據湖上
他們每件事都聯係起來的關鍵技術是三角洲湖。三角洲在ADLS提供可靠的流媒體數據管道和高性能數據科學分析查詢大量的時間序列數據。最後,它使組織能夠真正采用Lakehouse模式通過將最好的品種Azure工具寫一次,經常訪問數據存儲。
在下一篇文章我們將探討使用機器學習而獲得最大的風力渦輪機的收入最小化停機時間的機會成本。