用戶定義聚合函數- Scala
本文包含一個UDAF示例,以及如何注冊它以便在Apache Spark SQL中使用。看到用戶定義聚合函數(UDAFs)為更多的細節。
實現一個UserDefinedAggregateFunction
進口org.apache.火花.sql.表達式.MutableAggregationBuffer進口org.apache.火花.sql.表達式.UserDefinedAggregateFunction進口org.apache.火花.sql.行進口org.apache.火花.sql.類型._類GeometricMean擴展UserDefinedAggregateFunction{//這是聚合函數的輸入字段。覆蓋definputSchema:org.apache.火花.sql.類型.StructType=StructType(StructField(“價值”,倍增式)::零)//這是用於計算聚合的內部字段。覆蓋defbufferSchema:StructType=StructType(StructField(“數”,LongType)::StructField(“產品”,倍增式)::零)//這是聚合函數的輸出類型。覆蓋def數據類型:數據類型=倍增式覆蓋def確定的:布爾=真正的//這是緩衝區模式的初始值。覆蓋def初始化(緩衝:MutableAggregationBuffer):單位={緩衝(0)=0 l緩衝(1)=1.0}這是如何更新你的緩衝區模式給定的輸入。覆蓋def更新(緩衝:MutableAggregationBuffer,輸入:行):單位={緩衝(0)=緩衝.木屐[長](0)+1緩衝(1)=緩衝.木屐[雙](1)*輸入.木屐[雙](0)}這是如何合並兩個bufferSchema類型的對象。覆蓋def合並(buffer1:MutableAggregationBuffer,buffer2:行):單位={buffer1(0)=buffer1.木屐[長](0)+buffer2.木屐[長](0)buffer1(1)=buffer1.木屐[雙](1)*buffer2.木屐[雙](1)}//在給定bufferSchema的最終值的情況下,在這裏輸出最終值。覆蓋def評估(緩衝:行):任何={數學.戰俘(緩衝.用(1),1.toDouble/緩衝.getLong(0))}}
用你UDAF
//創建DataFrame和Spark SQL表進口org.apache.火花.sql.功能._瓦爾id=火花.範圍(1,20.)id.createOrReplaceTempView(“id”)瓦爾df=火花.sql("select id, id % 3 as group_id from ids")df.createOrReplaceTempView(“簡單”)
——使用group_by語句並調用UDAF。選擇group_id,通用汽車(id)從簡單的集團通過group_id
//或者使用DataFrame語法調用聚合函數。//創建UDAF GeometricMean實例。瓦爾通用汽車=新GeometricMean//顯示列id值的幾何平均值。df.groupBy(“group_id”).gg(通用汽車(上校(“id”))。作為(“GeometricMean”))。顯示()//通過指定的名稱調用UDAF。df.groupBy(“group_id”).gg(expr(“通用汽車(id) GeometricMean”))。顯示()