用戶定義的聚合函數- Scala

本文包含一個UDAF以及如何注冊它使用Apache火花SQL。看到用戶定義的聚合函數(UDAFs)為更多的細節。

實現一個UserDefinedAggregateFunction

進口orgapache火花sql表達式MutableAggregationBuffer進口orgapache火花sql表達式UserDefinedAggregateFunction進口orgapache火花sql進口orgapache火花sql類型_GeometricMean擴展UserDefinedAggregateFunction{/ /這是聚合函數的輸入字段。覆蓋definputSchema:orgapache火花sql類型StructType=StructType(StructField(“價值”,倍增式)::)/ /這是你保持的內部字段計算你的總。覆蓋defbufferSchema:StructType=StructType(StructField(“數”,LongType)::StructField(“產品”,倍增式)::)/ /這是你aggregatation函數的輸出類型。覆蓋def數據類型:數據類型=倍增式覆蓋def確定的:布爾=真正的/ /這是緩衝模式的初始值。覆蓋def初始化(緩衝:MutableAggregationBuffer):單位={緩衝(0)=0 l緩衝(1)=1.0}/ /這是如何更新給定一個輸入緩衝模式。覆蓋def更新(緩衝:MutableAggregationBuffer,輸入:):單位={緩衝(0)=緩衝木屐()(0)+1緩衝(1)=緩衝木屐()(1)*輸入木屐()(0)}/ /這是如何與bufferSchema合並兩個對象類型。覆蓋def合並(buffer1:MutableAggregationBuffer,buffer2:):單位={buffer1(0)=buffer1木屐()(0)+buffer2木屐()(0)buffer1(1)=buffer1木屐()(1)*buffer2木屐()(1)}/ /這是你輸出最終的價值,給你bufferSchema的最終值。覆蓋def評估(緩衝:):任何={數學戰俘(緩衝(1),1toDouble/緩衝getLong(0))}}

注冊與火花UDAF SQL

火花udf注冊(“通用汽車”,GeometricMean)

用你UDAF

/ /創建一個DataFrame並引發SQL表進口orgapache火花sql功能_瓦爾id=火花範圍(1,20.)idcreateOrReplaceTempView(“id”)瓦爾df=火花sql(“選擇id, id % 3從ids group_id”)dfcreateOrReplaceTempView(“簡單”)
——使用UDAF group_by聲明和調用。選擇group_id,通用汽車(id)簡單的集團通過group_id
/ /或者使用DataFrame語法調用聚合函數。/ /創建一個實例的UDAF GeometricMean。瓦爾通用汽車=GeometricMean/ /顯示的列值的幾何平均數“id”。dfgroupBy(“group_id”)。gg(通用汽車(上校(“id”))。作為(“GeometricMean”))。顯示()/ /調用UDAF由其指定的名字。dfgroupBy(“group_id”)。gg(expr(“通用汽車(id) GeometricMean”))。顯示()