取消
顯示的結果
而不是尋找
你的意思是:

失蹤的行處理記錄時使用foreachbatch火花結構化流從Azure事件中心

sparkstreaming
新的貢獻者三世

我新的實時場景,需要創建一個火花在磚結構化流工作。我想申請一些規則從後端配置驗證基於每個傳入JSON消息。我需要做以下操作傳入的JSON

  1. 我需要把傳入的JSON
  2. 在JSON基於源地產,我需要獲取驗證配置消息源和應用它
  3. 我需要驗證錯誤日誌到突觸表
  4. 如果沒有驗證錯誤的數據需要保存到一個單獨的基於後端配置表。

我試著實現這種使用磚中的foreachBatch筆記本。我的代碼片段看起來如下:

  • 閱讀從EventHub
streamingDF = spark.readStream.format (eventhubs) .options (* * ehConf) .load ()
  • 啟動foreachbatch查詢
streamingDF.writeStream.outputMode(“追加”)\ .option (“checkpointLocation”,“dbfs: / FileStore /檢查點/ rtdqchekpoint”) \ .foreachBatch (processStream) .start () .awaitTermination ()
  • processStream函數
msgDF = df.select (df.body.cast .alias(“字符串”)(“msgBody”)) msgDfVal = msgDF.collect()消息= msgDfVal[0] =[0]屬性df.select (df.properties) .collect () sourceFeedName =屬性[0][0][' sourceFeedName '] ruleConfigDF = getBusinessRulesforSource (sourceFeedName) dfJson = spark.read.json (sc.parallelize([信息]))srcDqDataDf =平(dfJson) errorDetails = applyBusinessRule (srcDataDf ruleConfigDF)如果(errorDetails ! =沒有和len (errorDetails) > 0): errCnt = len (errorDetails) saveErrorDetailsToSynapse (errorDetails) saveDatatoSynapse (srcDataDf)

然而在處理我發現隻有少數進入processStream函數傳入的消息。例如:如果10條消息隻有5或6隨機進入processStream消息。我試著把所有的處理邏輯。仍然隻有幾條消息。

因為我新到實時處理,有人能幫我為什麼這是發生在我做錯什麼。

1接受解決方案

接受的解決方案

Hubert_Dudek1
尊敬的貢獻者三世

嗯也許你不閱讀所有分區/分區鍵。看你去活動中心在Azure和處理數據- >探索和驗證嗎?

在磚可以使用顯示器(streamingDF)做一些驗證。

在生產.collect()不應該被使用。代碼看起來就像你從批處理隻有第一行。所有的邏輯都可以使用火花dataframe行動實現df和轉換。

如果你有自己的函數getBusinessRulesforSource等可以轉化成火花udf比應用於datframe函數和。

在原帖子查看解決方案

5回複5

Hubert_Dudek1
尊敬的貢獻者三世

嗯也許你不閱讀所有分區/分區鍵。看你去活動中心在Azure和處理數據- >探索和驗證嗎?

在磚可以使用顯示器(streamingDF)做一些驗證。

在生產.collect()不應該被使用。代碼看起來就像你從批處理隻有第一行。所有的邏輯都可以使用火花dataframe行動實現df和轉換。

如果你有自己的函數getBusinessRulesforSource等可以轉化成火花udf比應用於datframe函數和。

謝謝你的快速回複。我試過顯示器(streamingDF)和所有的記錄被顯示出來。但隻有隨機消息被傳遞到foreachbatch (processStream)。從文檔我看到所有批處理操作foreachbatch選項。它是更好的處理而不是foreachbatch創建udf。

Rishi045
新的貢獻者二世

任何解決以上問題

SaikatSengupta
新的貢獻者二世

我麵臨著同樣的問題,當我試圖運行forEachbatch Azure事件中心。誰能幫忙嗎?

對於我來說,我一直接收實時訂單azure事件中心,但我總是需要選擇最新的訂單和消除所有相同的曆史交易中心已經可用的內部事件。

為例:我可以接受數據集如下(忽略“a”的字段tx)

圖像所以,在這個例子中,我需要選擇最新訂單最大tx和最大tx,最低process_timestamp有時相同的tx一兩次與兩個不同的過程時間戳。在活動中心你可以對一個特定的順序也有類似的數據集有多個行相同的批處理。

我在foreach批量使用下麵的代碼,但它不處理事件中心的所有行。

def upsertToMT4TradeDelta (microBatchOutputDF batchId): microBatchOutputDF = microBatchOutputDF.orderBy(坳(tx) .desc (), (“process_timestamp”) .asc上校())。dropDuplicates([“實例”、“order_id”]) microBatchOutputDF.createOrReplaceTempView(“更新”)microBatchOutputDF._jdf.sparkSession ()。sql(並入rdh“””。live_mt4_trade t t上使用更新u。實例= u。在stance and t.order_id = u.order_id WHEN MATCHED AND u.tx > t.tx THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT * """)

歡迎來到磚社區:讓學習、網絡和一起慶祝

加入我們的快速增長的數據專業人員和專家的80 k +社區成員,準備發現,幫助和合作而做出有意義的聯係。

點擊在這裏注冊今天,加入!

參與令人興奮的技術討論,加入一個組與你的同事和滿足我們的成員。

Baidu
map