熊貓用戶定義函數

一個熊貓用戶定義函數(UDF)也稱為矢量化UDF是一個用戶定義函數,使用Apache箭頭傳輸數據和熊貓來處理數據。熊貓udf允許矢量化操作,可以提高性能比row-at-a-time 100 xPython udf

博客文章的背景信息,請參閱新的熊貓udf和Python類型提示即將發布的Apache 3.0火花優化PySpark和熊貓DataFrames之間的轉換

你定義一個熊貓UDF使用關鍵字pandas_udf作為裝飾和包裝的功能Python類型提示。本文描述了不同類型的熊貓熊貓udf和展示了如何使用udf型提示。

係列,係列UDF

使用一係列係列熊貓UDF標量進行向量化操作。您可以使用api等選擇withColumn

Python函數應該采取一個熊貓係列作為輸入並返回一個熊貓係列相同的長度,和你應該在Python中指定這些類型提示。火花運行一個熊貓UDF列分割到批次,調用函數為每個批處理數據的一個子集,然後連接結果。

下麵的例子顯示了如何創建一個熊貓UDF,計算2列的產物。

進口熊貓作為pdpyspark.sql.functions進口上校,pandas_udfpyspark.sql.types進口LongType#聲明函數和創建UDFdefmultiply_func(一個:pd係列,b:pd係列)- >pd係列:返回一個*b=pandas_udf(multiply_func,returnType=LongType())# pandas_udf應該能夠執行的功能與當地大熊貓數據x=pd係列([1,2,3])打印(multiply_func(x,x))# 0 14 # 1# 2 9# dtype: int64#創建一個火花DataFrame,“火花”是現有SparkSessiondf=火花createDataFrame(pdDataFrame(x,=(“x”)))#執行函數作為引發矢量化UDFdf選擇((上校(“x”),上校(“x”)))顯示()# + - - - - - - - - - - - - - - - - - - - +# | multiply_func (x, x) |# + - - - - - - - - - - - - - - - - - - - +# | 1 |# | 4 |9 # | |# + - - - - - - - - - - - - - - - - - - - +

迭代器係列的迭代器係列的UDF

迭代器UDF是一樣的一個標量熊貓UDF除外:

  • Python函數

    • 需要一個迭代器的批次,而不是單個輸入批處理作為輸入。

    • 返回一個迭代器的輸出批次而不是單個批處理輸出。

  • 整個輸出迭代器的長度應該與整個輸入的長度相同。

  • 包裝的熊貓UDF星星之火列作為輸入。

你應該指定Python類型提示迭代器(pandas.Series)- >迭代器(pandas.Series)

這熊貓UDF UDF執行需要初始化時有用的一些國家,例如,加載機器學習模型推理應用於每個輸入批處理文件。

下麵的例子顯示了如何創建一個熊貓UDF與迭代器支持。

進口熊貓作為pd打字進口迭代器pyspark.sql.functions進口上校,pandas_udf,結構體pdf=pdDataFrame([1,2,3),=(“x”])df=火花createDataFrame(pdf)#當調用UDF的列,#輸入pd.Series底層函數迭代器。@pandas_udf(“長”)defplus_one(batch_iter:迭代器(pd係列])- >迭代器(pd係列]:xbatch_iter:收益率x+1df選擇(plus_one(上校(“x”)))顯示()# + - - - - - - - - - - - - +# | plus_one (x) |# + - - - - - - - - - - - - +# | 2 |# | 3 |# | 4 |# + - - - - - - - - - - - - +# UDF,您可以初始化一些國家在處理之前批次。#包裝代碼使用try / finally或使用上下文管理器來確保#最後釋放資源。y_bc=火花sparkContext廣播(1)@pandas_udf(“長”)defplus_y(batch_iter:迭代器(pd係列])- >迭代器(pd係列]:y=y_bc價值#初始化狀態試一試:xbatch_iter:收益率x+y最後:通過#釋放資源,如果任何df選擇(plus_y(上校(“x”)))顯示()# + - - - - - - - - - - - - +# | plus_y (x) |# + - - - - - - - - - - - - +# | 2 |# | 3 |# | 4 |# + - - - - - - - - - - - - +

多個係列的迭代器係列UDF的迭代器

迭代器的多個係列的迭代器係列UDF和限製也有類似的特征迭代器係列的迭代器係列的UDF。指定的函數迭代器批次的批次和輸出迭代器。UDF執行需要初始化時也很有用的一些狀態。

的差異是:

  • 底層的Python函數的迭代器元組熊貓係列的。

  • 包裝的熊貓UDF多個火花列作為輸入。

你指定類型的暗示迭代器(元組(pandas.Series,……]- >迭代器(pandas.Series)

打字進口迭代器,元組進口熊貓作為pdpyspark.sql.functions進口上校,pandas_udf,結構體pdf=pdDataFrame([1,2,3),=(“x”])df=火花createDataFrame(pdf)@pandas_udf(“長”)defmultiply_two_cols(迭代器:迭代器(元組(pd係列,pd係列]])- >迭代器(pd係列]:一個,b迭代器:收益率一個*bdf選擇(multiply_two_cols(“x”,“x”))顯示()# + - - - - - - - - - - - - - - - - - - - - - - - - +# | multiply_two_cols (x, x) |# + - - - - - - - - - - - - - - - - - - - - - - - - +# | 1 |# | 4 |9 # | |# + - - - - - - - - - - - - - - - - - - - - - - - - +

係列標量UDF

係列標量熊貓udf類似於引發聚合函數。一係列標量熊貓UDF定義了一個聚合從一個或多個熊貓係列一個標量值,其中每個熊貓係列代表了一個火花列。你用一係列與api等標量熊貓UDF選擇,withColumn,groupBy.agg,pyspark.sql.Window

你表達類型提示pandas.Series,- >任何。返回類型應該是一個原始的數據類型,並返回標量可以是一個Python原始類型,例如,int浮動或NumPy數據類型等numpy.int64numpy.float64任何最好是一個特定的標量類型。

這種類型的UDF每組支持部分聚合和所有數據加載到內存中。

下麵的例子展示了如何使用這種類型的UDF來計算的意思選擇,groupBy,窗口操作:

進口熊貓作為pdpyspark.sql.functions進口pandas_udfpyspark.sql進口窗口df=火花createDataFrame(((1,1.0),(1,2.0),(2,3.0),(2,5.0),(2,10.0)),(“id”,“v”))#聲明函數和創建UDF@pandas_udf(“替身”)defmean_udf(v:pd係列)- >浮動:返回v的意思是()df選擇(mean_udf(df(“v”)))顯示()# + - - - - - - - - - - - - +# | mean_udf (v) |# + - - - - - - - - - - - - +# | 4.2 |# + - - - - - - - - - - - - +dfgroupby(“id”)gg(mean_udf(df(“v”)))顯示()# + - - - + - - - - - - - - - - - - +# | | id mean_udf (v) |# + - - - + - - - - - - - - - - - - +# | 1 | 1.5 |# | 2 | 6.0 |# + - - - + - - - - - - - - - - - - +w=窗口\partitionBy(“id”)\rowsBetween(窗口unboundedPreceding,窗口unboundedFollowing)dfwithColumn(“mean_v”,mean_udf(df(“v”])(w))顯示()# + - - - + - - - + - - - +# | | id v | mean_v |# + - - - + - - - + - - - +# | 1 | 1.0 | 1.5 |# | 1 | 2.0 | 1.5 |# | 2 | 3.0 | 6.0 |# | 2 | 5.0 | 6.0 |# | 2 | 10.0 | 6.0 |# + - - - + - - - + - - - +

詳細的用法,看到pyspark.sql.functions.pandas_udf

使用

設置箭頭批大小

引發的數據分區轉換成箭頭批次記錄,可以暫時導致高JVM的內存使用。為了避免可能的內存不足異常,您可以調整大小的箭頭記錄批次通過設置spark.sql.execution.arrow.maxRecordsPerBatch配置一個整數決定為每個批處理的最大行數。默認值是10000每批記錄。如果列數很大,應相應調整。使用這個極限,每個數據分區分為1或多個記錄批次進行處理。

時間戳與時區語義

引發內部商店UTC時間戳的值,沒有指定的時區和時間戳數據轉換為本地時間與微秒UTC決議。

時間戳數據導出或顯示在火花時,會話時區是用來定位的時間戳值。會話時區設置的spark.sql.session.timeZone配置和JVM係統默認為本地時區。大熊貓使用datetime64納秒精度的類型,datetime64 (ns)在每列的基礎上,可選的時區。

當時間戳數據從火花轉移到熊貓它轉換為納秒,每列轉換為引發會話時區的時區然後本地化,消除了時區和顯示當地時間值。這發生在調用toPandas ()pandas_udf時間戳列。

當時間戳數據從熊貓轉移到火花,它轉化為UTC微秒。這發生在調用createDataFrame熊貓DataFrame或當熊貓UDF返回一個時間戳。這些轉換是自動完成,以確保火花在預期的數據格式,所以沒有必要做這些轉換自己。納秒值截斷。

標準的UDF加載時間戳數據作為Python datetime對象,這是不同於一個熊貓時間戳。為了獲得最佳的性能,我們建議您使用熊貓時間序列功能在處理時間戳在熊貓UDF。有關詳細信息,請參見時間序列/日期功能

例如筆記本電腦

以下筆記本說明了性能改進與熊貓udf可以實現:

熊貓udf基準筆記本

在新標簽頁打開筆記本