取消
顯示的結果
而不是尋找
你的意思是:

事件流中心提高處理速度

Jreco
貢獻者

你好,

我工作與活動中心和數據磚實時處理和豐富的數據。

做一個“簡單的”測試,我得到一些奇怪的值(輸入速率和處理速率),我覺得我失去數據:

圖像如果你可以看到,有一個峰5 k記錄但從未加工後的5分鍾。

我使用的腳本:

conf配置[' eventhubs = {}。connectionString '] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt (connectionString_bb_stream) conf [' eventhubs。consumerGroup '] = ' adb_jr_tesst配置[' maxEventsPerTrigger '] = 350000年的會議(“maxRatePerPartition”) = ' 350000 '參看[' setStartingPosition '] = sc._jvm.org.apache.spark.eventhubs.EventPosition.fromEndOfStream df =(火花。readStream .format (eventhubs) .options (* * conf) .load json_df = df ())。withColumn(“身體”,from_json(坳(“身體”).cast(字符串),jsonSchema)) Final_df = json_df。選擇([“sequenceNumber”、“抵消”、“enqueuedTime”,上校(“。*”)])Final_df = Final_df。withColumn用戶(“關鍵”,sha2 (concat(坳(EquipmentId)坳(TagId)坳(“時間戳”)),256))Final_df.display ()

你能幫我理解為什麼我“失去”數據或如何可以改善這個過程嗎?

我使用的集群:

圖像

我認為是一個集群配置的問題,但是我不知道怎麼解決。

謝謝你的幫助,夥計們!

14日回複14

謝謝你的回答沃納。

你的意思是當你說“我有設置一個終止”?的腳本的一部分?

我不使用支票點因為我想看到什麼是德行為的過程開始時,試圖找出為什麼我失去信息。

werners1
尊敬的貢獻者三世

在集群中的終止時間設置

(不活動終止後60分鍾)

啊好吧,我隻有這個參數為dev集群。

可能相關的問題是這個嗎?

werners1
尊敬的貢獻者三世

也許,如果你的集群關閉流流動受阻。

但在你的情況中,可能是沒有問題的,因為它似乎你不是運行一個長期運行流查詢。

但是是什麼讓你認為你失蹤的記錄嗎?

你數#記錄傳入和傳出了嗎?

是的,我計算時間的記錄為一個特定的範圍(5分鍾)和有+ 4 k記錄失蹤……並與處理和輸入率的流圖……所以,如果我不失去的數據我不處理接近實時的記錄。

歡迎來到磚社區:讓學習、網絡和一起慶祝

加入我們的快速增長的數據專業人員和專家的80 k +社區成員,準備發現,幫助和合作而做出有意義的聯係。

點擊在這裏注冊今天,加入!

參與令人興奮的技術討論,加入一個組與你的同事和滿足我們的成員。

Baidu
map