用戶定義的聚合函數(UDAFs)<一個類="headerlink" href="//www.eheci.com/docs.gcp/spark/latest/spark-sql/language-manual/#user-defined-aggregate-functions-udafs" title="">
適用於:磚運行時
用戶定義的聚合函數(UDAFs)可一次例程,作用於多個行,並返回一個聚合值。這個文檔列出了所需的類創建和注冊UDAFs。它還包含示例,演示如何定義和寄存器UDAFs在Scala中,在火花SQL調用它們。
聚合器<一個類="headerlink" href="//www.eheci.com/docs.gcp/spark/latest/spark-sql/language-manual/#aggregator" title="">
語法(——聚合器,緩衝區,出)
基類定義的聚合,可用於數據集操作所有的元素的一組,並減少一個值。
在:輸入類型的聚合。
緩衝區:減少的中間值的類型。
出:最終的輸出結果的類型。
bufferEncoder:編碼器(BUF)
中間值類型的編碼器。
完成(減少:BUF):
轉換的輸出。
合並(b1:緩衝區,b2: BUF):緩衝區
合並兩個中間值。
outputEncoder:編碼器(出)
最終的輸出值類型的編碼器。
減少(BUF,答:):緩衝區
總輸入值
一個
到目前的中間值。性能、功能修改b
並返回它而不是構建新的對象b
。零:緩衝區
的初始值的中間結果聚合。
例子<一個類="headerlink" href="//www.eheci.com/docs.gcp/spark/latest/spark-sql/language-manual/#examples" title="">
類型安全的用戶定義的聚合函數<一個類="headerlink" href="//www.eheci.com/docs.gcp/spark/latest/spark-sql/language-manual/#type-safe-user-defined-aggregate-functions" title="">
強類型數據集圍繞著用戶定義的聚合聚合器
抽象類。例如,一個用戶定義類型安全的平均可以看起來像:
無類型定義的聚合函數<一個類="headerlink" href="//www.eheci.com/docs.gcp/spark/latest/spark-sql/language-manual/#untyped-user-defined-aggregate-functions" title="">
類型的聚合,如上所述,也可以注冊為無類型的聚合與DataFrames udf使用。例如,一個用戶定義的平均無類型DataFrames可以看起來像:
進口org。apache。火花。sql{。編碼器,編碼器,SparkSession}進口org。apache。火花。sql。表達式。聚合器進口org。apache。火花。sql。功能情況下類平均(var總和:長,var數:長)對象MyAverage擴展聚合器(長,平均,雙]{/ /一個零值聚合。應該滿足屬性+ 0 = b嗎def零:平均=平均(0 l,0 l)/ /合並兩個值來產生一個新值。針對性能、功能修改“緩衝”/ /並返回它,而不是構造一個新對象def減少(緩衝:平均,數據:長):平均={緩衝。總和+ =數據緩衝。數+ =1緩衝}/ /合並兩個中間值def合並(b1:平均,b2:平均):平均={b1。總和+ =b2。總和b1。數+ =b2。數b1}/ /轉換的輸出def完成(減少:平均):雙=減少。總和。toDouble/減少。數/ /中間值類型的編碼器defbufferEncoder:編碼器(平均]=編碼器。產品/ /最終的輸出值類型的編碼器defoutputEncoder:編碼器(雙]=編碼器。scalaDouble}/ /注冊函數來訪問它火花。udf。注冊(“myAverage”,功能。udaf(MyAverage))瓦爾df=火花。讀。格式(“json”)。負載(“例子/ src / main /資源/ employees.json”)df。createOrReplaceTempView(“員工”)df。顯示()/ / + - - - - - - - - - - - - - +/ /工資| | |名稱/ / + - - - - - - - - - - - - - +/ / |邁克爾| 3000 |/ / |安迪| 4500 |/ / |賈斯汀| 3500 |/ / | Berta | 4000 |/ / + - - - - - - - - - - - - - +瓦爾結果=火花。sql(“選擇myAverage(工資)作為average_salary從員工”)結果。顯示()/ / + - - - - - - - - - - - - - - - - +/ / | average_salary |/ / + - - - - - - - - - - - - - - - - +/ / | 3750.0 |/ / + - - - - - - - - - - - - - - - - +
進口java.io.Serializable;進口org.apache.spark.sql.Dataset;進口org.apache.spark.sql.Encoder;進口org.apache.spark.sql.Encoders;進口org.apache.spark.sql.Row;進口org.apache.spark.sql.SparkSession;進口org.apache.spark.sql.expressions.Aggregator;進口org.apache.spark.sql.functions;公共靜態類平均實現了可序列化的{私人長總和;私人長數;/ /構造函數,getter、setter……}公共靜態類MyAverage擴展聚合器<長,平均,雙>{/ /一個零值聚合。應該滿足屬性+ 0 = b嗎公共平均零(){返回新平均(0l,0l);}/ /合並兩個值來產生一個新值。針對性能、功能修改“緩衝”/ /並返回它,而不是構造一個新對象公共平均減少(平均緩衝,長數據){長newSum=緩衝。getSum()+數據;長newCount=緩衝。getCount()+1;緩衝。setSum(newSum);緩衝。setCount(newCount);返回緩衝;}/ /合並兩個中間值公共平均合並(平均b1,平均b2){長mergedSum=b1。getSum()+b2。getSum();長mergedCount=b1。getCount()+b2。getCount();b1。setSum(mergedSum);b1。setCount(mergedCount);返回b1;}/ /轉換的輸出公共雙完成(平均減少){返回((雙)減少。getSum())/減少。getCount();}/ /中間值類型的編碼器公共編碼器<平均>bufferEncoder(){返回編碼器。豆(平均。類);}/ /最終的輸出值類型的編碼器公共編碼器<雙>outputEncoder(){返回編碼器。雙();}}/ /注冊函數來訪問它火花。udf()。注冊(“myAverage”,功能。udaf(新MyAverage(),編碼器。長()));數據集<行>df=火花。讀()。格式(“json”)。負載(“例子/ src / main /資源/ employees.json”);df。createOrReplaceTempView(“員工”);df。顯示();/ / + - - - - - - - - - - - - - +/ /工資| | |名稱/ / + - - - - - - - - - - - - - +/ / |邁克爾| 3000 |/ / |安迪| 4500 |/ / |賈斯汀| 3500 |/ / | Berta | 4000 |/ / + - - - - - - - - - - - - - +數據集<行>結果=火花。sql(“選擇myAverage(工資)作為average_salary從員工”);結果。顯示();/ / + - - - - - - - - - - - - - - - - +/ / | average_salary |/ / + - - - - - - - - - - - - - - - - +/ / | 3750.0 |/ / + - - - - - - - - - - - - - - - - +
——編譯和地方UDAF MyAverage在一個名為“MyAverage的JAR文件。jar在/ tmp。創建函數myAverage作為“MyAverage”使用JAR“/ tmp / MyAverage.jar”;顯示用戶功能;+- - - - - - - - - - - - - - - - - - +|函數|+- - - - - - - - - - - - - - - - - - +|默認的。myAverage|+- - - - - - - - - - - - - - - - - - +創建臨時視圖員工使用org。apache。火花。sql。json選項(路徑“例子/ src / main /資源/ employees.json”);選擇*從員工;+- - - - - - - - - - - - - + +|的名字|工資|+- - - - - - - - - - - - - + +|邁克爾|3000年||安迪|4500年||賈斯汀|3500年||貝爾塔|4000年|+- - - - - - - - - - - - - + +選擇myAverage(工資)作為average_salary從員工;+- - - - - - - - - - - - - - - - +|average_salary|+- - - - - - - - - - - - - - - - +|3750年。0|+- - - - - - - - - - - - - - - - +