你好,
我工作與活動中心和數據磚實時處理和豐富的數據。
做一個“簡單的”測試,我得到一些奇怪的值(輸入速率和處理速率),我覺得我失去數據:
如果你可以看到,有一個峰5 k記錄但從未加工後的5分鍾。
我使用的腳本:
conf配置[' eventhubs = {}。connectionString '] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt (connectionString_bb_stream) conf [' eventhubs。consumerGroup '] = ' adb_jr_tesst配置[' maxEventsPerTrigger '] = 350000年的會議(“maxRatePerPartition”) = ' 350000 '參看[' setStartingPosition '] = sc._jvm.org.apache.spark.eventhubs.EventPosition.fromEndOfStream df =(火花。readStream .format (eventhubs) .options (* * conf) .load json_df = df ())。withColumn(“身體”,from_json(坳(“身體”).cast(字符串),jsonSchema)) Final_df = json_df。選擇([“sequenceNumber”、“抵消”、“enqueuedTime”,上校(“。*”)])Final_df = Final_df。withColumn用戶(“關鍵”,sha2 (concat(坳(EquipmentId)坳(TagId)坳(“時間戳”)),256))Final_df.display ()
你能幫我理解為什麼我“失去”數據或如何可以改善這個過程嗎?
我使用的集群:
我認為是一個集群配置的問題,但是我不知道怎麼解決。
謝謝你的幫助,夥計們!
好的,我唯一注意到你有設置一個終止時間為流媒體沒有必要(如果你在做實時)。
我也注意到你不設置一個檢查點的位置,你可能會考慮的事情。
你也可以嚐試刪除maxEvent maxRate配置。
文檔的片段:
以下是推薦的任務配置的細節。
https://docs.www.eheci.com/spark/latest/structured-streaming/production.html
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html