pyspark.sql.functions.pandas_udf¶
-
pyspark.sql.functions。
pandas_udf
( f=沒有一個,returnType=沒有一個,functionType=沒有一個 ) ¶ -
創建一個熊貓用戶定義函數(又名矢量化用戶定義函數)。
熊貓udf是用戶定義的函數執行的火花使用箭頭來傳輸數據和熊貓來處理數據,使得矢量化操作。一個熊貓UDF使用定義pandas_udf作為裝飾或包裝的功能,不需要額外的配置。熊貓UDF的行為作為常規PySpark API函數。
- 參數
-
- f 功能,可選
-
用戶定義的函數。如果使用python函數作為一個獨立的函數
-
returnType
pyspark.sql.types.DataType
或str,可選 -
用戶定義的函數的返回類型。值可以是
pyspark.sql.types.DataType
對象或DDL-formatted類型字符串。 - functionType int,可選
-
枚舉值
pyspark.sql.functions.PandasUDFType
。默認值:標量。這個參數存在兼容性。鼓勵使用Python類型的暗示。
例子
為了使用這個API,通常低於進口:
> > >進口熊貓作為pd> > >從pyspark.sql.functions進口pandas_udf
從火花與Python 3.6 + 3.0,Python類型提示檢測函數類型如下:
> > >@pandas_udf(IntegerType())…defslen(年代:pd。係列)- >pd。係列:…返回年代。str。len()
火花3.0之前,熊貓UDF使用functionType決定執行類型如下:
> > >從pyspark.sql.functions進口PandasUDFType> > >從pyspark.sql.types進口IntegerType> > >@pandas_udf(IntegerType(),PandasUDFType。標量)…defslen(年代):…返回年代。str。len()
是首選指定類型提示熊貓UDF而不是明確的“熊貓通過UDF類型functionType這將在將來的版本中被棄用。
注意提示應該使用類型pandas.Series在所有情況下,但有一個變體pandas.DataFrame應該用於其輸入或輸出類型提示相反的輸入或輸出列是什麼時候
pyspark.sql.types.StructType
。下麵的示例顯示了一個熊貓UDF將長列,字符串列和結構列,並輸出一個struct列。它需要函數指定的類型pandas.Series和pandas.DataFrame如下:> > >@pandas_udf(“col1字符串,col2長”)> > >def函數(s1:pd。係列,s2:pd。係列,s3:pd。DataFrame)- >pd。DataFrame:…s3(“col2”]=s1+s2。str。len()…返回s3…> > >#創建一個火花DataFrame三列包括一個struct列。…df=火花。createDataFrame(…[[1,“字符串”,(“一個嵌套的字符串”)]],…“long_col長string_col字符串,struct_col struct < col1: string >”)> > >df。printSchema()根|——long_column:長(可空= true)|——string_column:字符串(nullable = true)|——struct_column:結構(可空= true)| |——col1:字符串(nullable = true)> > >df。選擇(函數(“long_col”,“string_col”,“struct_col”))。printSchema()|——func (long_col string_col struct_col):結構(可空= true)| |——col1:字符串(nullable = true)| | - col2:長(可空= true)
在下麵幾節中,它描述了組合支持類型的提示。為簡單起見,pandas.DataFrame變異是省略了。
-
- 係列,係列
-
pandas.Series,…- >pandas.Series
函數接受一個或多個pandas.Series和輸出一個pandas.Series。函數的輸出應該是相同的長度作為輸入。
> > >@pandas_udf(“字符串”)…defto_upper(年代:pd。係列)- >pd。係列:…返回年代。str。上()…> > >df=火花。createDataFrame(((“John Doe”),(“名稱”,))> > >df。選擇(to_upper(“名稱”))。顯示()+ - - - - - - - - - - - - - - - - +| | to_upper(名稱)+ - - - - - - - - - - - - - - - - +| JOHN DOE |+ - - - - - - - - - - - - - - - - +
> > >@pandas_udf(“姓字符串,字符串”)…defsplit_expand(年代:pd。係列)- >pd。DataFrame:…返回年代。str。分裂(擴大=真正的)…> > >df=火花。createDataFrame(((“John Doe”),(“名稱”,))> > >df。選擇(split_expand(“名稱”))。顯示()+ - - - - - - - - - - - - - - - - - - +| | split_expand(名稱)+ - - - - - - - - - - - - - - - - - - +(約翰,Doe) | |+ - - - - - - - - - - - - - - - - - - +
請注意
輸入的長度不是整個輸入的列,但是內部的長度是批用於每個調用函數。
-
- 迭代器係列的迭代器係列
-
迭代器(pandas.Series)- >迭代器(pandas.Series)
該函數的迭代器pandas.Series和輸出迭代器pandas.Series。在這種情況下,熊貓UDF創建實例時,需要一個輸入列這叫做PySpark列。整個輸出函數的長度應該是整個輸入的長度相同;因此,它可以預取數據從輸入迭代器隻要長度是相同的。
時也很有用的UDF執行需要初始化一些州雖然內部工作原理相同係列,係列案件。下麵的偽代碼演示了例子。
@pandas_udf(“長”)def計算(迭代器:迭代器(pd。係列])- >迭代器(pd。係列]:#做一些昂貴的初始化狀態狀態=very_expensive_initialization()為x在迭代器:#為整個迭代器使用狀態。收益率calculate_with_state(x,狀態)df。選擇(計算(“價值”))。顯示()
> > >從打字進口迭代器> > >@pandas_udf(“長”)…defplus_one(迭代器:迭代器(pd。係列])- >迭代器(pd。係列]:…為年代在迭代器:…收益率年代+1…> > >df=火花。createDataFrame(pd。DataFrame([1,2,3),列=(“v”)))> > >df。選擇(plus_one(df。v))。顯示()+ - - - - - - - - - - - - +| | plus_one (v)+ - - - - - - - - - - - - +| 2 || 3 || 4 |+ - - - - - - - - - - - - +
請注意
每個係列的長度的長度是一批內部使用。
-
- 迭代器的多個係列的迭代器係列
-
迭代器(元組(熊貓。係列,…]]- >迭代器(pandas.Series)
tuple函數迭代器的多pandas.Series和輸出迭代器pandas.Series。在這種情況下,熊貓UDF創建實例時,需要輸入列係列多達這叫做PySpark列。否則,它具有相同的特性和限製的迭代器係列的迭代器係列。
> > >從打字進口迭代器,元組> > >從pyspark.sql.functions進口結構體,上校> > >@pandas_udf(“長”)…def乘(迭代器:迭代器(元組(pd。係列,pd。DataFrame]])- >迭代器(pd。係列]:…為s1,df在迭代器:…收益率s1*df。v…> > >df=火花。createDataFrame(pd。DataFrame([1,2,3),列=(“v”)))> > >df。withColumn(“輸出”,乘(上校(“v”),結構體(上校(“v”))))。顯示()+ - - - + - - - +| | | v輸出+ - - - + - - - +| 1 | 1 || 2 | 4 || 3 | 9 |+ - - - + - - - +
請注意
每個係列的長度的長度是一批內部使用。
-
- 係列標量
-
pandas.Series,…- >任何
的函數pandas.Series並返回標量值。的returnType應該是一個基本數據類型和返回標量可以是一個python原始類型,例如,int或浮或numpy數據類型,例如,numpy.int64或numpy.float64。任何理想情況下應該相應特定的標量類型。
> > >@pandas_udf(“替身”)…defmean_udf(v:pd。係列)- >浮動:…返回v。的意思是()…> > >df=火花。createDataFrame(…((1,1.0),(1,2.0),(2,3.0),(2,5.0),(2,10.0)),(“id”,“v”))> > >df。groupby(“id”)。gg(mean_udf(df(“v”)))。顯示()+ - - - + - - - - - - - - - - - - +| | | id mean_udf (v)+ - - - + - - - - - - - - - - - - +| 1 | 1.5 || 2 | 6.0 |+ - - - + - - - - - - - - - - - - +
這個UDF也可以用作窗口函數如下:
> > >從pyspark.sql進口窗口> > >@pandas_udf(“替身”)…defmean_udf(v:pd。係列)- >浮動:…返回v。的意思是()…> > >df=火花。createDataFrame(…((1,1.0),(1,2.0),(2,3.0),(2,5.0),(2,10.0)),(“id”,“v”))> > >w=窗口。partitionBy(“id”)。orderBy(“v”)。rowsBetween(- - - - - -1,0)> > >df。withColumn(“mean_v”,mean_udf(“v”)。在(w))。顯示()+ - - - + - - - + - - - +v | | | id mean_v |+ - - - + - - - + - - - +| 1 | 1.0 | 1.0 || 1 | 2.0 | 1.5 || 2 | 3.0 | 3.0 || 2 | 5.0 | 4.0 || 2 | 10.0 | 7.5 |+ - - - + - - - + - - - +
請注意
由於性能原因,輸入係列窗函數並不是複製。因此,變異輸入係列是不允許的,會導致不正確的結果。出於同樣的原因,用戶也不應該依賴於輸入的指數係列。
請注意
用戶定義的函數不支持條件表達式或短路的布爾表達式,它最終被執行所有的內部。如果在特殊行,函數可以失敗的解決方法是將條件的函數。
請注意
用戶定義的函數不帶關鍵字參數調用。
請注意
返回的數據類型pandas.Series從用戶定義函數應符合定義returnType(見
types.to_arrow_type ()
和types.from_arrow_type ()
)。他們之間不匹配時,火花可能會對返回的數據進行轉換。轉換是不能保證是正確的用戶應該檢查和結果的準確性。請注意
目前,
pyspark.sql.types.ArrayType
的pyspark.sql.types.TimestampType
和嵌套pyspark.sql.types.StructType
目前不支持作為輸出類型。另請參閱
pyspark.sql.UDFRegistration.register ()