用戶定義的聚合函數(UDAFs)<一個類="headerlink" href="//www.eheci.com/docs.gcp/spark/latest/spark-sql/language-manual/#user-defined-aggregate-functions-udafs" title="">

適用於:檢查標記是的磚運行時

用戶定義的聚合函數(UDAFs)可一次例程,作用於多個行,並返回一個聚合值。這個文檔列出了所需的類創建和注冊UDAFs。它還包含示例,演示如何定義和寄存器UDAFs在Scala中,在火花SQL調用它們。

聚合器<一個類="headerlink" href="//www.eheci.com/docs.gcp/spark/latest/spark-sql/language-manual/#aggregator" title="">

語法(——聚合器,緩衝區,出)

基類定義的聚合,可用於數據集操作所有的元素的一組,並減少一個值。

  • :輸入類型的聚合。

  • 緩衝區:減少的中間值的類型。

  • :最終的輸出結果的類型。

  • bufferEncoder:編碼器(BUF)

    中間值類型的編碼器。

  • 完成(減少:BUF):

    轉換的輸出。

  • 合並(b1:緩衝區,b2: BUF):緩衝區

    合並兩個中間值。

  • outputEncoder:編碼器(出)

    最終的輸出值類型的編碼器。

  • 減少(BUF,答:):緩衝區

    總輸入值一個到目前的中間值。性能、功能修改b並返回它而不是構建新的對象b

  • 零:緩衝區

    的初始值的中間結果聚合。

例子<一個類="headerlink" href="//www.eheci.com/docs.gcp/spark/latest/spark-sql/language-manual/#examples" title="">

類型安全的用戶定義的聚合函數<一個類="headerlink" href="//www.eheci.com/docs.gcp/spark/latest/spark-sql/language-manual/#type-safe-user-defined-aggregate-functions" title="">

強類型數據集圍繞著用戶定義的聚合聚合器抽象類。例如,一個用戶定義類型安全的平均可以看起來像:

無類型定義的聚合函數<一個類="headerlink" href="//www.eheci.com/docs.gcp/spark/latest/spark-sql/language-manual/#untyped-user-defined-aggregate-functions" title="">

類型的聚合,如上所述,也可以注冊為無類型的聚合與DataFrames udf使用。例如,一個用戶定義的平均無類型DataFrames可以看起來像:

進口orgapache火花sql{。編碼器,編碼器,SparkSession}進口orgapache火花sql表達式聚合器進口orgapache火花sql功能情況下平均(var總和:,var:)對象MyAverage擴展聚合器(,平均,]{/ /一個零值聚合。應該滿足屬性+ 0 = b嗎def:平均=平均(0 l,0 l)/ /合並兩個值來產生一個新值。針對性能、功能修改“緩衝”/ /並返回它,而不是構造一個新對象def減少(緩衝:平均,數據:):平均={緩衝總和+ =數據緩衝+ =1緩衝}/ /合並兩個中間值def合並(b1:平均,b2:平均):平均={b1總和+ =b2總和b1+ =b2b1}/ /轉換的輸出def完成(減少:平均):=減少總和toDouble/減少/ /中間值類型的編碼器defbufferEncoder:編碼器(平均]=編碼器產品/ /最終的輸出值類型的編碼器defoutputEncoder:編碼器(]=編碼器scalaDouble}/ /注冊函數來訪問它火花udf注冊(“myAverage”,功能udaf(MyAverage))瓦爾df=火花格式(“json”)。負載(“例子/ src / main /資源/ employees.json”)dfcreateOrReplaceTempView(“員工”)df顯示()/ / + - - - - - - - - - - - - - +/ /工資| | |名稱/ / + - - - - - - - - - - - - - +/ / |邁克爾| 3000 |/ / |安迪| 4500 |/ / |賈斯汀| 3500 |/ / | Berta | 4000 |/ / + - - - - - - - - - - - - - +瓦爾結果=火花sql(“選擇myAverage(工資)作為average_salary從員工”)結果顯示()/ / + - - - - - - - - - - - - - - - - +/ / | average_salary |/ / + - - - - - - - - - - - - - - - - +/ / | 3750.0 |/ / + - - - - - - - - - - - - - - - - +
進口java.io.Serializable;進口org.apache.spark.sql.Dataset;進口org.apache.spark.sql.Encoder;進口org.apache.spark.sql.Encoders;進口org.apache.spark.sql.Row;進口org.apache.spark.sql.SparkSession;進口org.apache.spark.sql.expressions.Aggregator;進口org.apache.spark.sql.functions;公共靜態平均實現了可序列化的{私人總和;私人;/ /構造函數,getter、setter……}公共靜態MyAverage擴展聚合器<,平均,>{/ /一個零值聚合。應該滿足屬性+ 0 = b嗎公共平均(){返回平均(0l,0l);}/ /合並兩個值來產生一個新值。針對性能、功能修改“緩衝”/ /並返回它,而不是構造一個新對象公共平均減少(平均緩衝,數據){newSum=緩衝getSum()+數據;newCount=緩衝getCount()+1;緩衝setSum(newSum);緩衝setCount(newCount);返回緩衝;}/ /合並兩個中間值公共平均合並(平均b1,平均b2){mergedSum=b1getSum()+b2getSum();mergedCount=b1getCount()+b2getCount();b1setSum(mergedSum);b1setCount(mergedCount);返回b1;}/ /轉換的輸出公共完成(平均減少){返回(()減少getSum())/減少getCount();}/ /中間值類型的編碼器公共編碼器<平均>bufferEncoder(){返回編碼器(平均);}/ /最終的輸出值類型的編碼器公共編碼器<>outputEncoder(){返回編碼器();}}/ /注冊函數來訪問它火花udf()。注冊(“myAverage”,功能udaf(MyAverage(),編碼器()));數據集<>df=火花()。格式(“json”)。負載(“例子/ src / main /資源/ employees.json”);dfcreateOrReplaceTempView(“員工”);df顯示();/ / + - - - - - - - - - - - - - +/ /工資| | |名稱/ / + - - - - - - - - - - - - - +/ / |邁克爾| 3000 |/ / |安迪| 4500 |/ / |賈斯汀| 3500 |/ / | Berta | 4000 |/ / + - - - - - - - - - - - - - +數據集<>結果=火花sql(“選擇myAverage(工資)作為average_salary從員工”);結果顯示();/ / + - - - - - - - - - - - - - - - - +/ / | average_salary |/ / + - - - - - - - - - - - - - - - - +/ / | 3750.0 |/ / + - - - - - - - - - - - - - - - - +
——編譯和地方UDAF MyAverage在一個名為“MyAverage的JAR文件。jar在/ tmp。創建函數myAverage作為“MyAverage”使用JAR“/ tmp / MyAverage.jar”;顯示用戶功能;+- - - - - - - - - - - - - - - - - - +|函數|+- - - - - - - - - - - - - - - - - - +|默認的myAverage|+- - - - - - - - - - - - - - - - - - +創建臨時視圖員工使用orgapache火花sqljson選項(路徑“例子/ src / main /資源/ employees.json”);選擇*員工;+- - - - - - - - - - - - - + +|的名字|工資|+- - - - - - - - - - - - - + +|邁克爾|3000年||安迪|4500年||賈斯汀|3500年||貝爾塔|4000年|+- - - - - - - - - - - - - + +選擇myAverage(工資)作為average_salary員工;+- - - - - - - - - - - - - - - - +|average_salary|+- - - - - - - - - - - - - - - - +|3750年0|+- - - - - - - - - - - - - - - - +