我是相對較新的Scala。在過去,我能夠做以下python:
def foo (p1, p2):進口datetime dt dt。datetime(2014 4, 17日,12日,34)結果= [(1," 1 ",1.1,dt。datetime(2014 4, 17歲,1,0)),(2,“2”,2.2,dt。datetime(2014 4, 17歲,2 0)),(3,“3”,3.3,dt。datetime(2014 4 17 3 0)))返回結果
現在我注冊一個UDF:
從pyspark.sql。類型導入*模式= ArrayType (StructType ([StructField (int, IntegerType(),假),StructField(“字符串”,StringType(),假),StructField(“浮動”,IntegerType(),假),StructField (datetime, TimestampType(),假))))sqlContext。registerFunction (foo,“foo”模式)
最後,我打算如何使用它:
sqlContext。sql(" " "選擇a.foo_output.int f_int, a.foo_output。浮動f_float, a.foo_output。字符串作為f_string a.foo_output。datetime f_datetime從(選擇爆炸(foo (7)) foo_output)”“”),告訴()
這實際工作pyspark如上所示。看到
我沒能得到同樣的事情在scala中工作。誰能告訴我正確的方法在scala中/火花。當我試圖注冊模式:
def foo (p1:整數,p2:整數):數組(Tuple4 [Int、字符串、浮點數、時間戳]]= {val數組結果= ((1“1”1.1 f,新的時間戳(1 2014 4,17日,0,0,0)),(2,“2”,2.2 f,新的時間戳(2 2014,17日,0,0,0)),(3“3”3.3 f,新的時間戳(3 2014,17日,0,0,0)));返回結果;}/ /注冊火花val foo_schema = ArrayType (StructType(數組(StructField (“int”、IntegerType假),StructField (StringType“字符串”,假),StructField(“浮動”,FloatType假),StructField (datetime, TimestampType、虛假))));sql_context.udf。注冊(" foo " foo _);
我得到了一個運行時錯誤:
org.apache.spark.sql。AnalysisException:沒有這種結構體字段int _1, _2、離子、_4;線2 pos 4
從錯誤消息,很明顯我沒有附上正確的模式確實在上麵的代碼中,我告訴火花。所以我試著以下:
sql_context.udf。注冊(“foo”, foo _ foo_schema);
然而,它給了我一個編譯器錯誤:
(錯誤)/用戶/ rykelley /開發/ rovi / IntegralReach-Main / ADW / rovi-master-schedule / src / main / scala /com/rovicorp/adw/RoviMasterSchedule/BuildRoviMasterSchedule。scala: 247:錯誤:重載方法價值登記的替代品:[信息](名字:字符串,f: org.apache.spark.sql.api.java。UDF22 [_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], returnType org.apache.spark.sql.types.DataType): <和>[信息](單位名稱:字符串,f: org.apache.spark.sql.api.java。UDF21 [_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], returnType org.apache.spark.sql.types.DataType): <和>[信息](單位名稱:字符串,f: org.apache.spark.sql.api.java。UDF20 [_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], returnType org.apache.spark.sql.types.DataType): <和>[信息](單位名稱:字符串,f: org.apache.spark.sql.api.java。UDF19 [_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], returnType org.apache.spark.sql.types.DataType): <和>[信息](單位名稱:字符串,f: org.apache.spark.sql.api.java。UDF18 [_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], returnType org.apache.spark.sql.types.DataType): <和>[信息](單位名稱:字符串,f: org.apache.spark.sql.api.java。UDF17 [_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], returnType org.apache.spark.sql.types.DataType): <和>[信息](單位名稱:字符串,f: org.apache.spark.sql.api.java。UDF16 [_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], returnType org.apache.spark.sql.types.DataType): <和>[信息](單位名稱:字符串,f: org.apache.spark.sql.api.java。UDF15 [_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _], returnType org.apache.spark.sql.types.DataType): <和>[信息](單位名稱:字符串,f: org.apache.spark.sql.api.java。UDF14 [_, _, _, _, _, _, _, _, _, _, _, _, _, _, _], returnType org.apache.spark.sql.types.DataType): <和>[信息](單位名稱:字符串,f: org.apache.spark.sql.api.java。UDF13 [_, _, _, _, _, _, _, _, _, _, _, _, _, _], returnType org.apache.spark.sql.types.DataType): <和>[信息](單位名稱:字符串,f: org.apache.spark.sql.api.java。UDF12 [_, _, _, _, _, _, _, _, _, _, _, _, _], returnType org.apache.spark.sql.types.DataType): <和>[信息](單位名稱:字符串,f: org.apache.spark.sql.api.java。UDF11 [_, _, _, _, _, _, _, _, _, _, _, _], returnType org.apache.spark.sql.types.DataType): <和>[信息](單位名稱:字符串,f: org.apache.spark.sql.api.java。UDF10 [_, _, _, _, _, _, _, _, _, _, _], returnType org.apache.spark.sql.types.DataType): <和>[信息](單位名稱:字符串,f: org.apache.spark.sql.api.java。UDF9 [_, _, _, _, _, _, _, _, _, _], returnType org.apache.spark.sql.types.DataType): <和>[信息](單位名稱:字符串,f: org.apache.spark.sql.api.java。UDF8 [_, _, _, _, _, _, _, _, _], returnType org.apache.spark.sql.types.DataType): <和>[信息](單位名稱:字符串,f: org.apache.spark.sql.api.java。UDF7 [_, _, _, _, _, _, _, _], returnType org.apache.spark.sql.types.DataType): <和>[信息](單位名稱:字符串,f: org.apache.spark.sql.api.java。UDF6 [_, _, _, _, _, _, _], returnType org.apache.spark.sql.types.DataType): <和>[信息](單位名稱:字符串,f: org.apache.spark.sql.api.java。UDF5 [_, _, _, _, _, _], returnType org.apache.spark.sql.types.DataType): <和>[信息](單位名稱:字符串,f: org.apache.spark.sql.api.java。UDF4 [_, _, _, _, _], returnType org.apache.spark.sql.types.DataType): <和>[信息](單位名稱:字符串,f: org.apache.spark.sql.api.java。UDF3 _, _, _, _, returnType org.apache.spark.sql.types.DataType): <和>[信息](單位名稱:字符串,f: org.apache.spark.sql.api.java。UDF2 _, _, _, returnType: org.apache.spark.sql.types.DataType) <和>[信息](單位名稱:字符串,f: org.apache.spark.sql.api.java。UDF1 _, _, returnType: org.apache.spark.sql.types.DataType)單元[信息]不能用於(字符串、數組(整數,整數)= >[(整數、字符串、浮點數、java.sql.Timestamp)], org.apache.spark.sql.types.ArrayType)
誰能告訴我正確的方向嗎?
注意:使用火花1.6.1。
謝謝
瑞安
我假設你已經找到了你的答案,但因為這是頂部的結果出現在google上搜索這個問題有待解答,我將添加2美分。
據我所知,所有的元素在你ArrayType必須是相同類型的。
例如,你可以注冊一個簡單的函數返回一個字符串列表,使用下麵的語法:
sqlContext.udf。注冊(“your_func_name your_func_name ArrayType (StringType ()))
我認為PySpark代碼工作的原因是因為defininf數組元素“StructTypes”提供了一個解決方案,這個限製,這可能不是Scala中的相同的工作。
你好,
以防,這裏是一個例子上麵建議的解決方案:
進口org.apache.spark.sql.functions。_進口org.apache.spark.sql.expressions。_進口org.apache.spark.sql.types。_ val data = Seq (“A”, Seq ((3、4), (5、6), (7, 10))), (“B”, Seq ((1,1))))。托德data.printSchema
根|——_1:string (nullable = true) |——_2:數組(nullable = true) | |——元素:結構(containsNull = true) | | | - _1:整數(nullable = false) | | |——_2:整數(nullable = false)
def樂趣(s: Seq[行]):Seq [(Int, Int)] ={年代。過濾器(元組= > tuple.getInt (0) > 0) . map(元組= > (tuple.getInt (0) tuple.getInt (1)))} val funUdf = udf(好玩_)數據。選擇(“_1”_2,funUdf(“_2)作為“過濾”),告訴(假)
+ - - - + - - - - - - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - - - - - + | _1 | _2 | |過濾+ - - - + - - - - - - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - - - - - + | | [[3,4],[5,6],[7,10]]| [[3,4],[5,6],[7,10]]| | | B [[1]] | [] | + - - - + - - - - - - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - - - - - +
最好的問候,
馬克西姆Gekk