問題
您正在使用追加模式執行聚合,並返回一條異常錯誤消息。
當流數據幀/數據集上有流聚合時,不支持附加輸出模式
導致
不能在沒有水印的聚合數據幀上使用追加模式。這是有意為之。
解決方案
如果要在聚合的數據幀上使用附加模式,則必須向數據幀應用水印。
聚合必須有一個事件時間列,或者事件時間列上有一個窗口。
將數據按窗口和字進行分組,並計算每組的計數。.withWatermark ()必須在與聚合中使用的時間戳列相同的列上調用。示例代碼展示了如何做到這一點。
替換值<類型>您正在處理的元素類型。例如,如果按行處理,則使用Row。
替換值<詞>使用schema {timestamp: timestamp, word: String}的流數據幀。
%java數據集windowedCounts = .withWatermark("timestamp", "10分鍾").groupBy(functions.window(words.col("timestamp"), "10分鍾","5分鍾"),words.col("word")) .count();
%python windowedCounts =\ . withwatermark ("timestamp", "10 minutes") \ . groupby (window(words. withwatermark)時間戳,"10分鍾","5分鍾"),words.word) \ .count()
%scala導入spark.implicit。_ val windowedCounts =.withWatermark("timestamp", "10分鍾").groupBy(window($"timestamp", "10分鍾","5分鍾"),$"word") .count()
你必須打電話.withWatermark ()在進行聚合之前。否則嚐試失敗,並顯示錯誤消息。例如,df.groupBy(“時間”).count()。withWatermark("time", "1 min")返回一個異常。
請參考Apache Spark文檔清除聚合板的水印條件獲取更多信息。