熊貓用戶定義函數
一個熊貓用戶定義函數(UDF)也稱為矢量化UDF是一個用戶定義函數,使用Apache箭頭傳輸數據和熊貓來處理數據。熊貓udf允許矢量化操作,可以提高性能比row-at-a-time 100 xPython udf。
博客文章的背景信息,請參閱新的熊貓udf和Python類型提示即將發布的Apache 3.0火花和優化PySpark和熊貓DataFrames之間的轉換。
你定義一個熊貓UDF使用關鍵字pandas_udf
作為裝飾和包裝的功能Python類型提示。本文描述了不同類型的熊貓熊貓udf和展示了如何使用udf型提示。
係列,係列UDF
使用一係列係列熊貓UDF標量進行向量化操作。您可以使用api等選擇
和withColumn
。
Python函數應該采取一個熊貓係列作為輸入並返回一個熊貓係列相同的長度,和你應該在Python中指定這些類型提示。火花運行一個熊貓UDF列分割到批次,調用函數為每個批處理數據的一個子集,然後連接結果。
下麵的例子顯示了如何創建一個熊貓UDF,計算2列的產物。
進口熊貓作為pd從pyspark.sql.functions進口上校,pandas_udf從pyspark.sql.types進口LongType#聲明函數和創建UDFdefmultiply_func(一個:pd。係列,b:pd。係列)- >pd。係列:返回一個*b乘=pandas_udf(multiply_func,returnType=LongType())# pandas_udf應該能夠執行的功能與當地大熊貓數據x=pd。係列([1,2,3])打印(multiply_func(x,x))# 0 14 # 1# 2 9# dtype: int64#創建一個火花DataFrame,“火花”是現有SparkSessiondf=火花。createDataFrame(pd。DataFrame(x,列=(“x”)))#執行函數作為引發矢量化UDFdf。選擇(乘(上校(“x”),上校(“x”)))。顯示()# + - - - - - - - - - - - - - - - - - - - +# | multiply_func (x, x) |# + - - - - - - - - - - - - - - - - - - - +# | 1 |# | 4 |9 # | |# + - - - - - - - - - - - - - - - - - - - +
迭代器係列的迭代器係列的UDF
迭代器UDF是一樣的一個標量熊貓UDF除外:
Python函數
需要一個迭代器的批次,而不是單個輸入批處理作為輸入。
返回一個迭代器的輸出批次而不是單個批處理輸出。
整個輸出迭代器的長度應該與整個輸入的長度相同。
包裝的熊貓UDF星星之火列作為輸入。
你應該指定Python類型提示迭代器(pandas.Series)
- >迭代器(pandas.Series)
。
這熊貓UDF UDF執行需要初始化時有用的一些國家,例如,加載機器學習模型推理應用於每個輸入批處理文件。
下麵的例子顯示了如何創建一個熊貓UDF與迭代器支持。
進口熊貓作為pd從打字進口迭代器從pyspark.sql.functions進口上校,pandas_udf,結構體pdf=pd。DataFrame([1,2,3),列=(“x”])df=火花。createDataFrame(pdf)#當調用UDF的列,#輸入pd.Series底層函數迭代器。@pandas_udf(“長”)defplus_one(batch_iter:迭代器(pd。係列])- >迭代器(pd。係列]:為x在batch_iter:收益率x+1df。選擇(plus_one(上校(“x”)))。顯示()# + - - - - - - - - - - - - +# | plus_one (x) |# + - - - - - - - - - - - - +# | 2 |# | 3 |# | 4 |# + - - - - - - - - - - - - +# UDF,您可以初始化一些國家在處理之前批次。#包裝代碼使用try / finally或使用上下文管理器來確保#最後釋放資源。y_bc=火花。sparkContext。廣播(1)@pandas_udf(“長”)defplus_y(batch_iter:迭代器(pd。係列])- >迭代器(pd。係列]:y=y_bc。價值#初始化狀態試一試:為x在batch_iter:收益率x+y最後:通過#釋放資源,如果任何df。選擇(plus_y(上校(“x”)))。顯示()# + - - - - - - - - - - - - +# | plus_y (x) |# + - - - - - - - - - - - - +# | 2 |# | 3 |# | 4 |# + - - - - - - - - - - - - +
多個係列的迭代器係列UDF的迭代器
迭代器的多個係列的迭代器係列UDF和限製也有類似的特征迭代器係列的迭代器係列的UDF。指定的函數迭代器批次的批次和輸出迭代器。UDF執行需要初始化時也很有用的一些狀態。
的差異是:
底層的Python函數的迭代器元組熊貓係列的。
包裝的熊貓UDF多個火花列作為輸入。
你指定類型的暗示迭代器(元組(pandas.Series,……]
- >迭代器(pandas.Series)
。
從打字進口迭代器,元組進口熊貓作為pd從pyspark.sql.functions進口上校,pandas_udf,結構體pdf=pd。DataFrame([1,2,3),列=(“x”])df=火花。createDataFrame(pdf)@pandas_udf(“長”)defmultiply_two_cols(迭代器:迭代器(元組(pd。係列,pd。係列]])- >迭代器(pd。係列]:為一個,b在迭代器:收益率一個*bdf。選擇(multiply_two_cols(“x”,“x”))。顯示()# + - - - - - - - - - - - - - - - - - - - - - - - - +# | multiply_two_cols (x, x) |# + - - - - - - - - - - - - - - - - - - - - - - - - +# | 1 |# | 4 |9 # | |# + - - - - - - - - - - - - - - - - - - - - - - - - +
係列標量UDF
係列標量熊貓udf類似於引發聚合函數。一係列標量熊貓UDF定義了一個聚合從一個或多個熊貓係列一個標量值,其中每個熊貓係列代表了一個火花列。你用一係列與api等標量熊貓UDF選擇
,withColumn
,groupBy.agg
,pyspark.sql.Window。
你表達類型提示pandas.Series,…
- >任何
。返回類型應該是一個原始的數據類型,並返回標量可以是一個Python原始類型,例如,int
或浮動
或NumPy數據類型等numpy.int64
或numpy.float64
。任何
最好是一個特定的標量類型。
這種類型的UDF不每組支持部分聚合和所有數據加載到內存中。
下麵的例子展示了如何使用這種類型的UDF來計算的意思選擇
,groupBy
,窗口
操作:
進口熊貓作為pd從pyspark.sql.functions進口pandas_udf從pyspark.sql進口窗口df=火花。createDataFrame(((1,1.0),(1,2.0),(2,3.0),(2,5.0),(2,10.0)),(“id”,“v”))#聲明函數和創建UDF@pandas_udf(“替身”)defmean_udf(v:pd。係列)- >浮動:返回v。的意思是()df。選擇(mean_udf(df(“v”)))。顯示()# + - - - - - - - - - - - - +# | mean_udf (v) |# + - - - - - - - - - - - - +# | 4.2 |# + - - - - - - - - - - - - +df。groupby(“id”)。gg(mean_udf(df(“v”)))。顯示()# + - - - + - - - - - - - - - - - - +# | | id mean_udf (v) |# + - - - + - - - - - - - - - - - - +# | 1 | 1.5 |# | 2 | 6.0 |# + - - - + - - - - - - - - - - - - +w=窗口\。partitionBy(“id”)\。rowsBetween(窗口。unboundedPreceding,窗口。unboundedFollowing)df。withColumn(“mean_v”,mean_udf(df(“v”])。在(w))。顯示()# + - - - + - - - + - - - +# | | id v | mean_v |# + - - - + - - - + - - - +# | 1 | 1.0 | 1.5 |# | 1 | 2.0 | 1.5 |# | 2 | 3.0 | 6.0 |# | 2 | 5.0 | 6.0 |# | 2 | 10.0 | 6.0 |# + - - - + - - - + - - - +
詳細的用法,看到pyspark.sql.functions.pandas_udf。
使用
設置箭頭批大小
引發的數據分區轉換成箭頭批次記錄,可以暫時導致高JVM的內存使用。為了避免可能的內存不足異常,您可以調整大小的箭頭記錄批次通過設置spark.sql.execution.arrow.maxRecordsPerBatch
配置一個整數決定為每個批處理的最大行數。默認值是10000每批記錄。如果列數很大,應相應調整。使用這個極限,每個數據分區分為1或多個記錄批次進行處理。
時間戳與時區語義
引發內部商店UTC時間戳的值,沒有指定的時區和時間戳數據轉換為本地時間與微秒UTC決議。
時間戳數據導出或顯示在火花時,會話時區是用來定位的時間戳值。會話時區設置的spark.sql.session.timeZone
配置和JVM係統默認為本地時區。大熊貓使用datetime64
納秒精度的類型,datetime64 (ns)
在每列的基礎上,可選的時區。
當時間戳數據從火花轉移到熊貓它轉換為納秒,每列轉換為引發會話時區的時區然後本地化,消除了時區和顯示當地時間值。這發生在調用toPandas ()
或pandas_udf
時間戳列。
當時間戳數據從熊貓轉移到火花,它轉化為UTC微秒。這發生在調用createDataFrame
熊貓DataFrame或當熊貓UDF返回一個時間戳。這些轉換是自動完成,以確保火花在預期的數據格式,所以沒有必要做這些轉換自己。納秒值截斷。
標準的UDF加載時間戳數據作為Python datetime對象,這是不同於一個熊貓時間戳。為了獲得最佳的性能,我們建議您使用熊貓時間序列功能在處理時間戳在熊貓UDF。有關詳細信息,請參見時間序列/日期功能。