取消
顯示的結果
而不是尋找
你的意思是:

失蹤的行處理記錄時使用foreachbatch火花結構化流從Azure事件中心

sparkstreaming
新的貢獻者三世

我新的實時場景,需要創建一個火花在磚結構化流工作。我想申請一些規則從後端配置驗證基於每個傳入JSON消息。我需要做以下操作傳入的JSON

  1. 我需要把傳入的JSON
  2. 在JSON基於源地產,我需要獲取驗證配置消息源和應用它
  3. 我需要驗證錯誤日誌到突觸表
  4. 如果沒有驗證錯誤的數據需要保存到一個單獨的基於後端配置表。

我試著實現這種使用磚中的foreachBatch筆記本。我的代碼片段看起來如下:

  • 閱讀從EventHub
streamingDF = spark.readStream.format (eventhubs) .options (* * ehConf) .load ()
  • 啟動foreachbatch查詢
streamingDF.writeStream.outputMode(“追加”)\ .option (“checkpointLocation”,“dbfs: / FileStore /檢查點/ rtdqchekpoint”) \ .foreachBatch (processStream) .start () .awaitTermination ()
  • processStream函數
msgDF = df.select (df.body.cast .alias(“字符串”)(“msgBody”)) msgDfVal = msgDF.collect()消息= msgDfVal[0] =[0]屬性df.select (df.properties) .collect () sourceFeedName =屬性[0][0][' sourceFeedName '] ruleConfigDF = getBusinessRulesforSource (sourceFeedName) dfJson = spark.read.json (sc.parallelize([信息]))srcDqDataDf =平(dfJson) errorDetails = applyBusinessRule (srcDataDf ruleConfigDF)如果(errorDetails ! =沒有和len (errorDetails) > 0): errCnt = len (errorDetails) saveErrorDetailsToSynapse (errorDetails) saveDatatoSynapse (srcDataDf)

然而在處理我發現隻有少數進入processStream函數傳入的消息。例如:如果10條消息隻有5或6隨機進入processStream消息。我試著把所有的處理邏輯。仍然隻有幾條消息。

因為我新到實時處理,有人能幫我為什麼這是發生在我做錯什麼。

5回複5

Rishi045
新的貢獻者二世

是你能夠達成任何解決方案如果有你能不能幫幫我。

歡迎來到磚社區:讓學習、網絡和一起慶祝

加入我們的快速增長的數據專業人員和專家的80 k +社區成員,準備發現,幫助和合作而做出有意義的聯係。

點擊在這裏注冊今天,加入!

參與令人興奮的技術討論,加入一個組與你的同事和滿足我們的成員。

Baidu
map