我新的實時場景,需要創建一個火花在磚結構化流工作。我想申請一些規則從後端配置驗證基於每個傳入JSON消息。我需要做以下操作傳入的JSON
我試著實現這種使用磚中的foreachBatch筆記本。我的代碼片段看起來如下:
streamingDF = spark.readStream.format (eventhubs) .options (* * ehConf) .load ()
streamingDF.writeStream.outputMode(“追加”)\ .option (“checkpointLocation”,“dbfs: / FileStore /檢查點/ rtdqchekpoint”) \ .foreachBatch (processStream) .start () .awaitTermination ()
msgDF = df.select (df.body.cast .alias(“字符串”)(“msgBody”)) msgDfVal = msgDF.collect()消息= msgDfVal[0] =[0]屬性df.select (df.properties) .collect () sourceFeedName =屬性[0][0][' sourceFeedName '] ruleConfigDF = getBusinessRulesforSource (sourceFeedName) dfJson = spark.read.json (sc.parallelize([信息]))srcDqDataDf =平(dfJson) errorDetails = applyBusinessRule (srcDataDf ruleConfigDF)如果(errorDetails ! =沒有和len (errorDetails) > 0): errCnt = len (errorDetails) saveErrorDetailsToSynapse (errorDetails) saveDatatoSynapse (srcDataDf)
然而在處理我發現隻有少數進入processStream函數傳入的消息。例如:如果10條消息隻有5或6隨機進入processStream消息。我試著把所有的處理邏輯。仍然隻有幾條消息。
因為我新到實時處理,有人能幫我為什麼這是發生在我做錯什麼。
嗯也許你不閱讀所有分區/分區鍵。看你去活動中心在Azure和處理數據- >探索和驗證嗎?
在磚可以使用顯示器(streamingDF)做一些驗證。
在生產.collect()不應該被使用。代碼看起來就像你從批處理隻有第一行。所有的邏輯都可以使用火花dataframe行動實現df和轉換。
如果你有自己的函數getBusinessRulesforSource等可以轉化成火花udf比應用於datframe函數和。
嗯也許你不閱讀所有分區/分區鍵。看你去活動中心在Azure和處理數據- >探索和驗證嗎?
在磚可以使用顯示器(streamingDF)做一些驗證。
在生產.collect()不應該被使用。代碼看起來就像你從批處理隻有第一行。所有的邏輯都可以使用火花dataframe行動實現df和轉換。
如果你有自己的函數getBusinessRulesforSource等可以轉化成火花udf比應用於datframe函數和。
我麵臨著同樣的問題,當我試圖運行forEachbatch Azure事件中心。誰能幫忙嗎?
對於我來說,我一直接收實時訂單azure事件中心,但我總是需要選擇最新的訂單和消除所有相同的曆史交易中心已經可用的內部事件。
為例:我可以接受數據集如下(忽略“a”的字段tx)
所以,在這個例子中,我需要選擇最新訂單最大tx和最大tx,最低process_timestamp有時相同的tx一兩次與兩個不同的過程時間戳。在活動中心你可以對一個特定的順序也有類似的數據集有多個行相同的批處理。
我在foreach批量使用下麵的代碼,但它不處理事件中心的所有行。
def upsertToMT4TradeDelta (microBatchOutputDF batchId): microBatchOutputDF = microBatchOutputDF.orderBy(坳(tx) .desc (), (“process_timestamp”) .asc上校())。dropDuplicates([“實例”、“order_id”]) microBatchOutputDF.createOrReplaceTempView(“更新”)microBatchOutputDF._jdf.sparkSession ()。sql(並入rdh“””。live_mt4_trade t t上使用更新u。實例= u。在stance and t.order_id = u.order_id WHEN MATCHED AND u.tx > t.tx THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT * """)