我新的實時場景,需要創建一個火花在磚結構化流工作。我想申請一些規則從後端配置驗證基於每個傳入JSON消息。我需要做以下操作傳入的JSON
我試著實現這種使用磚中的foreachBatch筆記本。我的代碼片段看起來如下:
streamingDF = spark.readStream.format (eventhubs) .options (* * ehConf) .load ()
streamingDF.writeStream.outputMode(“追加”)\ .option (“checkpointLocation”,“dbfs: / FileStore /檢查點/ rtdqchekpoint”) \ .foreachBatch (processStream) .start () .awaitTermination ()
msgDF = df.select (df.body.cast .alias(“字符串”)(“msgBody”)) msgDfVal = msgDF.collect()消息= msgDfVal[0] =[0]屬性df.select (df.properties) .collect () sourceFeedName =屬性[0][0][' sourceFeedName '] ruleConfigDF = getBusinessRulesforSource (sourceFeedName) dfJson = spark.read.json (sc.parallelize([信息]))srcDqDataDf =平(dfJson) errorDetails = applyBusinessRule (srcDataDf ruleConfigDF)如果(errorDetails ! =沒有和len (errorDetails) > 0): errCnt = len (errorDetails) saveErrorDetailsToSynapse (errorDetails) saveDatatoSynapse (srcDataDf)
然而在處理我發現隻有少數進入processStream函數傳入的消息。例如:如果10條消息隻有5或6隨機進入processStream消息。我試著把所有的處理邏輯。仍然隻有幾條消息。
因為我新到實時處理,有人能幫我為什麼這是發生在我做錯什麼。