用戶定義聚合函數(udaf)<一個類="headerlink" href="//www.eheci.com/docs/sql/language-manual/#user-defined-aggregate-functions-udafs" title="">
適用於:磚運行時
用戶定義的聚合函數(UDAFs)是用戶可編程的例程,它同時作用於多行並返回單個聚合值。本文檔列出了創建和注冊udaf所需的類。它還包含了演示如何在Scala中定義和注冊udaf並在Spark SQL中調用它們的示例。
聚合器<一個類="headerlink" href="//www.eheci.com/docs/sql/language-manual/#aggregator" title="">
語法(——聚合器,緩衝區,出)
用戶定義聚合的基類,可在Dataset操作中使用,以獲取組的所有元素並將它們縮減為單個值。
在:聚合的輸入類型。
緩衝區:還原中間值的類型。
出:最終輸出結果的類型。
bufferEncoder:編碼器(BUF)
中間值類型的編碼器。
完成(還原:BUF): OUT
變換約簡的輸出。
merge(b1: BUF, b2: BUF): BUF
合並兩個中間值。
outputEncoder:編碼器(出)
最終輸出值類型的編碼器。
reduce(b: BUF, a: IN): BUF
總輸入值
一個
轉換為當前中間值。為了性能,函數可以修改b
並返回它,而不是構造新的對象b
.零:緩衝區
此聚合的中間結果的初始值。
例子<一個類="headerlink" href="//www.eheci.com/docs/sql/language-manual/#examples" title="">
類型安全的用戶定義聚合函數<一個類="headerlink" href="//www.eheci.com/docs/sql/language-manual/#type-safe-user-defined-aggregate-functions" title="">
強類型數據集的用戶定義聚合圍繞聚合器
抽象類。例如,一個類型安全的用戶定義平均值可以是這樣的:
無類型的用戶定義聚合函數<一個類="headerlink" href="//www.eheci.com/docs/sql/language-manual/#untyped-user-defined-aggregate-functions" title="">
如上所述,類型化聚合也可以注冊為非類型化聚合udf,以便與dataframe一起使用。例如,用戶定義的無類型dataframe的平均值可以是這樣的:
進口org.apache.火花.sql{。編碼器,編碼器,SparkSession}進口org.apache.火花.sql.表達式.聚合器進口org.apache.火花.sql.功能情況下類平均(var總和:長,var數:長)對象MyAverage擴展聚合器[長,平均,雙]{//該聚合的零值。滿足任意b + 0 = b的性質嗎def零:平均=平均(0 l,0 l)//將兩個值合並為一個新值。為了性能,函數可以修改' buffer '//並返回它,而不是構造一個新對象def減少(緩衝:平均,數據:長):平均={緩衝.總和+ =數據緩衝.數+ =1緩衝}//合並兩個中間值def合並(b1:平均,b2:平均):平均={b1.總和+ =b2.總和b1.數+ =b2.數b1}//轉換約簡的輸出def完成(減少:平均):雙=減少.總和.toDouble/減少.數//中間值類型的編碼器defbufferEncoder:編碼器[平均]=編碼器.產品//最終輸出值類型的編碼器defoutputEncoder:編碼器[雙]=編碼器.scalaDouble}//注冊函數來訪問它火花.udf.注冊(“myAverage”,功能.udaf(MyAverage))瓦爾df=火花.讀.格式(“json”).負載(“例子/ src / main /資源/ employees.json”)df.createOrReplaceTempView(“員工”)df.顯示()// +-------+------+// |姓名|工資| . txt// +-------+------+// |Michael| 3000|// | Andy| 4500| . //// |賈斯汀| 3500|// | Berta| 4000| .使用實例// +-------+------+瓦爾結果=火花.sql(SELECT myAverage(salary) as average_salary FROM employees)結果.顯示()// +--------------+/ / | average_salary |// +--------------+// | 3750.0| .// +--------------+
進口java.io.Serializable;進口org.apache.spark.sql.Dataset;進口org.apache.spark.sql.Encoder;進口org.apache.spark.sql.Encoders;進口org.apache.spark.sql.Row;進口org.apache.spark.sql.SparkSession;進口org.apache.spark.sql.expressions.Aggregator;進口org.apache.spark.sql.functions;公共靜態類平均實現了可序列化的{私人長總和;私人長數;//構造函數,getter, setter…}公共靜態類MyAverage擴展聚合器<長,平均,雙>{//該聚合的零值。滿足任意b + 0 = b的性質嗎公共平均零(){返回新平均(0l,0l);}//將兩個值合並為一個新值。為了性能,函數可以修改' buffer '//並返回它,而不是構造一個新對象公共平均減少(平均緩衝,長數據){長newSum=緩衝.getSum()+數據;長newCount=緩衝.getCount()+1;緩衝.setSum(newSum);緩衝.setCount(newCount);返回緩衝;}//合並兩個中間值公共平均合並(平均b1,平均b2){長mergedSum=b1.getSum()+b2.getSum();長mergedCount=b1.getCount()+b2.getCount();b1.setSum(mergedSum);b1.setCount(mergedCount);返回b1;}//轉換約簡的輸出公共雙完成(平均減少){返回((雙)減少.getSum())/減少.getCount();}//中間值類型的編碼器公共編碼器<平均>bufferEncoder(){返回編碼器.豆(平均.類);}//最終輸出值類型的編碼器公共編碼器<雙>outputEncoder(){返回編碼器.雙();}}//注冊函數來訪問它火花.udf().注冊(“myAverage”,功能.udaf(新MyAverage(),編碼器.長()));數據集<行>df=火花.讀().格式(“json”).負載(“例子/ src / main /資源/ employees.json”);df.createOrReplaceTempView(“員工”);df.顯示();// +-------+------+// |姓名|工資| . txt// +-------+------+// |Michael| 3000|// | Andy| 4500| . //// |賈斯汀| 3500|// | Berta| 4000| .使用實例// +-------+------+數據集<行>結果=火花.sql(SELECT myAverage(salary) as average_salary FROM employees);結果.顯示();// +--------------+/ / | average_salary |// +--------------+// | 3750.0| .// +--------------+
——編譯並將UDAF MyAverage放在/tmp目錄下名為MyAverage. JAR的JAR文件中。創建函數myAverage作為“MyAverage”使用JAR“/ tmp / MyAverage.jar”;顯示用戶功能;+------------------+|函數|+------------------+|默認的.myAverage|+------------------+創建臨時視圖員工使用org.apache.火花.sql.json選項(路徑“例子/ src / main /資源/ employees.json”);選擇*從員工;+-------+------+|的名字|工資|+-------+------+|邁克爾|3000||安迪|4500||賈斯汀|3500||貝爾塔|4000|+-------+------+選擇myAverage(工資)作為average_salary從員工;+--------------+|average_salary|+--------------+|3750.0|+--------------+