用戶定義聚合函數(udaf)<一個類="headerlink" href="//www.eheci.com/docs/sql/language-manual/#user-defined-aggregate-functions-udafs" title="">

適用於:勾選“是”磚運行時

用戶定義的聚合函數(UDAFs)是用戶可編程的例程,它同時作用於多行並返回單個聚合值。本文檔列出了創建和注冊udaf所需的類。它還包含了演示如何在Scala中定義和注冊udaf並在Spark SQL中調用它們的示例。

聚合器<一個類="headerlink" href="//www.eheci.com/docs/sql/language-manual/#aggregator" title="">

語法(——聚合器,緩衝區,出)

用戶定義聚合的基類,可在Dataset操作中使用,以獲取組的所有元素並將它們縮減為單個值。

  • :聚合的輸入類型。

  • 緩衝區:還原中間值的類型。

  • :最終輸出結果的類型。

  • bufferEncoder:編碼器(BUF)

    中間值類型的編碼器。

  • 完成(還原:BUF): OUT

    變換約簡的輸出。

  • merge(b1: BUF, b2: BUF): BUF

    合並兩個中間值。

  • outputEncoder:編碼器(出)

    最終輸出值類型的編碼器。

  • reduce(b: BUF, a: IN): BUF

    總輸入值一個轉換為當前中間值。為了性能,函數可以修改b並返回它,而不是構造新的對象b

  • 零:緩衝區

    此聚合的中間結果的初始值。

例子<一個類="headerlink" href="//www.eheci.com/docs/sql/language-manual/#examples" title="">

類型安全的用戶定義聚合函數<一個類="headerlink" href="//www.eheci.com/docs/sql/language-manual/#type-safe-user-defined-aggregate-functions" title="">

強類型數據集的用戶定義聚合圍繞聚合器抽象類。例如,一個類型安全的用戶定義平均值可以是這樣的:

無類型的用戶定義聚合函數<一個類="headerlink" href="//www.eheci.com/docs/sql/language-manual/#untyped-user-defined-aggregate-functions" title="">

如上所述,類型化聚合也可以注冊為非類型化聚合udf,以便與dataframe一起使用。例如,用戶定義的無類型dataframe的平均值可以是這樣的:

進口orgapache火花sql{。編碼器編碼器SparkSession進口orgapache火花sql表達式聚合器進口orgapache火花sql功能情況下平均var總和var對象MyAverage擴展聚合器平均//該聚合的零值。滿足任意b + 0 = b的性質嗎def平均平均0 l0 l//將兩個值合並為一個新值。為了性能,函數可以修改' buffer '//並返回它,而不是構造一個新對象def減少緩衝平均數據):平均緩衝總和+ =數據緩衝+ =1緩衝//合並兩個中間值def合並b1平均b2平均):平均b1總和+ =b2總和b1+ =b2b1//轉換約簡的輸出def完成減少平均):減少總和toDouble/減少//中間值類型的編碼器defbufferEncoder編碼器平均編碼器產品//最終輸出值類型的編碼器defoutputEncoder編碼器編碼器scalaDouble//注冊函數來訪問它火花udf注冊“myAverage”功能udafMyAverage))瓦爾df火花格式“json”).負載“例子/ src / main /資源/ employees.json”dfcreateOrReplaceTempView“員工”df顯示()// +-------+------+// |姓名|工資| . txt// +-------+------+// |Michael| 3000|// | Andy| 4500| . //// |賈斯汀| 3500|// | Berta| 4000| .使用實例// +-------+------+瓦爾結果火花sqlSELECT myAverage(salary) as average_salary FROM employees結果顯示()// +--------------+/ / | average_salary |// +--------------+// | 3750.0| .// +--------------+
進口java.io.Serializable進口org.apache.spark.sql.Dataset進口org.apache.spark.sql.Encoder進口org.apache.spark.sql.Encoders進口org.apache.spark.sql.Row進口org.apache.spark.sql.SparkSession進口org.apache.spark.sql.expressions.Aggregator進口org.apache.spark.sql.functions公共靜態平均實現了可序列化的私人總和私人//構造函數,getter, setter…公共靜態MyAverage擴展聚合器<平均>//該聚合的零值。滿足任意b + 0 = b的性質嗎公共平均()返回平均0l0l);//將兩個值合並為一個新值。為了性能,函數可以修改' buffer '//並返回它,而不是構造一個新對象公共平均減少平均緩衝數據newSum緩衝getSum()+數據newCount緩衝getCount()+1緩衝setSumnewSum);緩衝setCountnewCount);返回緩衝//合並兩個中間值公共平均合並平均b1平均b2mergedSumb1getSum()+b2getSum();mergedCountb1getCount()+b2getCount();b1setSummergedSum);b1setCountmergedCount);返回b1//轉換約簡的輸出公共完成平均減少返回((減少getSum())/減少getCount();//中間值類型的編碼器公共編碼器<平均>bufferEncoder()返回編碼器平均);//最終輸出值類型的編碼器公共編碼器<>outputEncoder()返回編碼器();//注冊函數來訪問它火花udf().注冊“myAverage”功能udafMyAverage(),編碼器()));數據集<>df火花().格式“json”).負載“例子/ src / main /資源/ employees.json”);dfcreateOrReplaceTempView“員工”);df顯示();// +-------+------+// |姓名|工資| . txt// +-------+------+// |Michael| 3000|// | Andy| 4500| . //// |賈斯汀| 3500|// | Berta| 4000| .使用實例// +-------+------+數據集<>結果火花sqlSELECT myAverage(salary) as average_salary FROM employees);結果顯示();// +--------------+/ / | average_salary |// +--------------+// | 3750.0| .// +--------------+
——編譯並將UDAF MyAverage放在/tmp目錄下名為MyAverage. JAR的JAR文件中。創建函數myAverage作為“MyAverage”使用JAR“/ tmp / MyAverage.jar”顯示用戶功能+------------------+|函數|+------------------+|默認的myAverage|+------------------+創建臨時視圖員工使用orgapache火花sqljson選項路徑“例子/ src / main /資源/ employees.json”);選擇員工+-------+------+|的名字|工資|+-------+------+|邁克爾|3000||安迪|4500||賈斯汀|3500||貝爾塔|4000|+-------+------+選擇myAverage工資作為average_salary員工+--------------+|average_salary|+--------------+|37500|+--------------+