跳轉到主要內容
工程的博客

如何監視PySpark流查詢

分享這篇文章

流媒體是其中一個最重要的數據處理技術對攝入和分析。它為用戶和開發人員提供低延遲和實時數據處理能力分析和觸發動作。然而,監控數據流數據的工作負載是富有挑戰性的,因為連續加工,因為它的到來。由於這種不間斷流處理的性質,很難排除故障實時度量標準的情況下,開發和生產期間,提醒和儀表盤。

結構化流在Apache火花™地址的問題監控通過提供:

直到現在,可見API已經在PySpark失蹤,這迫使用戶使用Scala API為他們流查詢結果報警的功能,與其他外部係統儀表盤。在Python中缺乏該功能與Python的重要性變得越來越重要的增長,考慮到幾乎70%的筆記本電腦上運行命令磚是在Python中

在磚運行時的11中,我們很高興宣布在PySpark可見API現在可用。在這篇文章中,我們介紹了Python的API結構化流,連同一個循序漸進的場景添加到流查詢報警邏輯。

可觀測的API

開發人員現在可以流指標發送給外部係統,例如,報警和儀表盤與自定義指標,使用流媒體的組合查詢PySpark偵聽器接口和可觀察到的API。流查詢偵聽器接口是一個抽象類必須繼承和應該實現所有方法如下所示:

pyspark.sql.streaming進口StreamingQueryListener
              MyListener(StreamingQueryListener):defonQueryStarted(自我、事件):”“”啟動時調用查詢。參數- - - - - - - - - - -事件::類:“pyspark.sql.streaming.listener.QueryStartedEvent”可用的屬性是一樣的Scala API。筆記- - - - - -這就是所謂的同步甲:“pyspark.sql.streaming.DataStreamWriter.start”,即“onQueryStart“將呼籲所有聽眾“DataStreamWriter.start() ' '返回對應的類:“pyspark.sql.streaming.StreamingQuery”。不阻塞在這個方法,因為它會阻止您的查詢。”“”通過defonQueryProgress(自我、事件):”“”當有一些狀態更新(攝入率更新等)。參數- - - - - - - - - - -事件::類:“pyspark.sql.streaming.listener.QueryProgressEvent”可用的屬性是一樣的Scala API。筆記- - - - - -這個方法是異步的。的狀態:類:“pyspark.sql.streaming。StreamingQuery”將永遠是最新的無論何時調用此方法。因此,狀態:類的:“pyspark.sql.streaming.StreamingQuery”。可能會改變之前/當你處理事件。例如,你可能會發現:類:“StreamingQuery”終止當你處理“QueryProgressEvent”。”“”通過defonQueryTerminated(自我、事件):”“”當停止查詢,有或沒有錯誤。參數- - - - - - - - - - -事件::類:“pyspark.sql.streaming.listener.QueryTerminatedEvent”可用的屬性是一樣的Scala API。”“”通過
              
              my_listener = MyListener ()

注意,他們所有的異步工作。

  • StreamingQueryListener.onQueryStarted流媒體查詢時會觸發開始,例如,DataStreamWriter.start
  • StreamingQueryListener.onQueryProgress每個micro-batch執行完成時被調用。
  • StreamingQueryListener.onQueryTerminated被稱為在查詢停止時,例如,StreamingQuery.stop

必須添加偵聽器被激活通過StreamingQueryManager,也可以刪除後如下所示:

spark.streams.addListener (my_listener)spark.streams.removeListener (my_listener)

為了捕捉自定義指標,它們必須通過補充道DataFrame.observe。自定義指標被定義為任意聚合等功能計數(“價值”)如下所示。

df.observe(“名字”,(),…)

錯誤警報場景

在本節中,我們將描述一個真實世界的例子用例與可觀測的API。假設您有一個目錄新的CSV文件不斷從另一個係統,你必須攝取它們以流媒體的方式。在本例中,我們將使用一個本地文件係統API為簡單起見,這樣可以很容易地理解。下麵的代碼片段可以複製粘貼pysparkshell運行和嚐試。

首先,讓我們導入必要的Python類和包,然後創建一個目錄my_csv_dir在這個場景中使用的。

進口操作係統進口shutil進口時間pathlib進口路徑pyspark.sql.functions進口計數,坳,點燃pyspark.sql.streaming進口StreamingQueryListener#注意:“basedir”替換為融合路徑,例如,在磚“/ dbfs / tmp”#筆記本。basedir = os.getcwd ()#“/ dbfs / tmp”#我的CSV文件將被創建在這個目錄後清洗“my_csv_dir”#目錄的情況下,你已經跑了下麵的這個例子。my_csv_dir = os.path.join (basedir,“my_csv_dir”)shutil。rmtree (my_csv_dir ignore_errors =真正的)os.makedirs (my_csv_dir)

接下來,我們定義自己的自定義流查詢偵聽器。偵聽器將警報當有太多的畸形CSV攝入為每個過程中記錄。如果畸形的記錄總數的50%以上的處理記錄,我們會打印出一個日誌消息。然而,在生產環境中,您可以連接到外部係統,而不是簡單地打印出來。

#定義我的聽眾。MyListener(StreamingQueryListener):defonQueryStarted(自我、事件):打印(f”{event.name}”({事件。id})開始!”)defonQueryProgress(自我、事件):行= event.progress.observedMetrics.get (“指標”)如果沒有一個:如果行。畸形/ row.cnt >0.5:打印(“警告!哎喲!有太多的畸形”f”記錄{row.malformed}{row.cnt}!”)其他的:打印(f”{row.cnt}行處理!”)defonQueryTerminated(自我、事件):打印(f”{事件。id}終止了!”)
              #添加我的偵聽器。my_listener = MyListener ()spark.streams.addListener (my_listener)

激活聽者,我們之前將它添加查詢在這個例子。然而,重要的是要注意,您可以添加偵聽器的異步查詢開始和終止,因為他們工作。這允許你連接或分離的運行流查詢沒有阻止他們。

現在我們將開始一個流媒體查詢中攝食的文件my_csv_dir目錄中。在處理過程中,我們也觀察畸形的記錄和處理記錄的數量。CSV數據源存儲在畸形的記錄_corrupt_record,默認情況下,我們將計算列畸形的數量記錄。

#現在,開始流查詢監控“my_csv_dir”目錄中。#每一個時間在那裏CSV文件到達這裏,我們會處理它們。my_csv=spark.readStream.schema (“my_key INT, my_val雙_corrupt_record字符串“). csv(路徑(my_csv_dir) .as_uri ())#“DataFrame。觀察的計算加工過的畸形的記錄,#發送一個事件偵聽器。my_observed_csv=my_csv.observe (“指標”,(點燃(1).alias(“問”),#號加工過的(坳(“_corrupt_record”)) .alias(“畸形”))#號畸形的my_query=my_observed_csv.writeStream.format (“控製台”)。queryName(“我的觀察”)。開始()

現在,我們已經定義了流媒體查詢和報警功能,讓我們創建CSV文件,這樣他們可攝入以流媒體的方式:

#現在,我們將編寫要處理CSV數據以流的方式。#這個CSV文件都是格式良好的。開放(os.path.join (my_csv_dir“my_csv_1.csv”),“w”)作為f:_ = f.write (1.1“\ n”)_ = f.write (“123123 .123 \ n”)time . sleep (5)#假設另一個CSV文件抵達5秒。#哎喲!它有兩個畸形的記錄的3。我的觀察者查詢應該警惕!開放(os.path.join (my_csv_dir“my_csv_error.csv”),“w”)作為f:_ = f.write (1.123“\ n”)_ = f.write (“哎喲!畸形的記錄! \ n”)_ = f.write (“Arrgggh ! \ n”)time . sleep (5)#好吧,全部完成。讓我們停止查詢5秒。my_query.stop ()spark.streams.removeListener (my_listener)

在這裏我們將看到查詢開始,終止和過程是正確記錄。因為有兩個畸形的記錄在CSV文件中,提出的警告是正確使用以下錯誤信息:


警報!哎喲!有太多的畸形記錄2 3 !

結論

PySpark用戶現在可以通過流媒體設置自定義指標和觀察他們查詢偵聽器接口和可觀察到的API。他們可以連接或分離的這種邏輯運行時動態地查詢需要。這個功能地址需要儀表盤,提醒其他外部係統和報告。

流查詢偵聽器接口和可觀察到的API可用DBR 11β,預計可在未來Apache火花。試試這兩個新功能今天磚通過DBR 11β。

流查詢偵聽器接口和可觀察到的API可用DBR 11β,預計可在未來Apache火花。

免費試著磚

相關的帖子

看到所有開源的帖子
Baidu
map