介紹熊貓PySpark UDF
注意:火花3.0引入了一個新的熊貓UDF。你可以找到更多的細節在接下來的博文:新的熊貓udf和Python類型提示即將發布的Apache 3.0火花
這是一個客人從李進社區後,軟件工程師在兩個σ投資,LP在紐約。這個博客也在兩個σ
更新:本博客更新2月22日,2018年,包括一些變化。
這篇文章介紹了大熊貓udf(也稱為矢量化udf)功能即將到來的Apache火花2.3版本中,顯著地提高了性能和可用性的Python用戶定義函數(udf)。
在過去的幾年中,Python已經成為了默認的語言數據科學家。包等熊貓,numpy,statsmodel,scikit-learn獲得了偉大的收養,成為主流的工具包。與此同時,Apache火花已成為事實上的標準在處理大數據。使數據科學家利用大數據的價值,火花在version 0.7中,添加了一個Python API支持用戶定義函數。這些用戶定義函數操作one-row-at-a-time,因此遭受高序列化和調用開銷。因此,許多數據管道定義在Java udf和Scala,然後從Python調用它們。
熊貓udf之上的Apache箭頭帶給你最好的兩個世界的能力定義低開銷,高性能udf完全在Python中。
在火花2.3中,將會有兩種類型的熊貓udf:標量和分組的地圖。接下來,我們使用四個示例程序說明它們的使用:+ 1,累積概率,減去的意思是,普通最小二乘線性回歸。
標量熊貓udf
標量熊貓udf用於vectorizing標量操作。定義一個標量熊貓UDF,簡單地使用@pandas_udf
注釋一個Python函數pandas.Series
作為參數,並返回另一個pandas.Series
相同的大小。下麵我們使用兩個例子說明:+ 1和累積概率。
+ 1
計算v + 1是一個簡單的示例演示差異row-at-a-time udf和標量熊貓udf。注意,內置列運營商可以執行在這個場景中快得多。
使用row-at-a-time udf:
從pyspark.sql.functions進口udf#使用udf來定義一個row-at-a-time udf@udf (“雙”)#輸入/輸出都是一個雙精度值defplus_one(v):返回v +1df.withColumn (“v2”plus_one (df.v))
使用熊貓udf:
從pyspark.sql.functions進口pandas_udf, PandasUDFType#使用pandas_udf定義一個熊貓UDF@pandas_udf (“雙”,PandasUDFType.SCALAR)#輸入/輸出都是熊貓。一係列的雙打defpandas_plus_one(v):返回v +1df.withColumn (“v2”pandas_plus_one (df.v))
上麵的示例定義一個row-at-a-time UDF“plus_one”和一個標量熊貓UDF“pandas_plus_one”執行相同的計算“+ 1”。UDF定義是相同的除了函數修飾符:“UDF”與“pandas_udf”。
row-at-a-time版本,用戶定義函數獲取雙“v”並返回的結果作為一個雙“v + 1”。在熊貓的版本中,用戶定義的函數pandas.Series
“v”,並返回“v + 1”的結果pandas.Series
。因為“v + 1”是矢量化pandas.Series
熊貓版本row-at-a-time版本要快得多。
請注意,有兩個重要的需求使用標量熊貓udf時:
- 輸入和輸出序列必須具有相同的大小。
- 一列是如何分成多個
pandas.Series
是內部引發,因此用戶定義函數的結果必須是獨立的分裂。
累積概率
這個例子顯示了一個更實用的使用標量熊貓UDF:計算累積概率一個值的正態分布N(0,1)使用scipy包中。
進口熊貓作為pd從scipy進口統計數據@pandas_udf (“雙”)def提供(v):返回pd.Series (stats.norm.cdf (v))
df.withColumn (“cumulative_probability”,cdf實驗組(df.v))
stats.norm.cdf
一個標量值和工作pandas.Series
,這個例子可以寫row-at-a-time udf。與前麵的示例相似,熊貓版本運行更快,見後麵的“性能比較”部分。
分組地圖熊貓udf
Python用戶相當熟悉split-apply-combine模式在數據分析。分組地圖熊貓udf設計對於這個場景,他們操作的所有數據組,例如,“對於每一個日期,應用此操作”。
分組地圖熊貓udf第一次分裂火花DataFrame
分成小組根據groupby操作符中指定的條件,適用於一個用戶定義的函數(pandas.DataFrame
- >pandas.DataFrame
每組),結合並返回結果作為一個新的火花DataFrame
。
分組地圖熊貓udf使用相同的函數修飾符pandas_udf
作為標量熊貓udf,但是他們有一些差異:
- 輸入的用戶定義函數:
- 標量:
pandas.Series
- 分組的地圖:
pandas.DataFrame
- 標量:
- 用戶定義函數的輸出:
- 標量:
pandas.Series
- 分組的地圖:
pandas.DataFrame
- 標量:
- 語義分組:
- 標量:沒有分組語義
- 分組地圖:定義為“groupby”條款
- 輸出大小:
- 標量:作為輸入的大小相同
- 分組地圖:任何大小
- 函數的返回類型修飾符:
- 數量:一個
數據類型
指定返回的類型pandas.Series
- 分組的地圖:
StructType
返回指定每一列的名稱和類型pandas.DataFrame
- 數量:一個
接下來,讓我們走進兩個例子來說明用例分組地圖熊貓udf。
減去的意思
這個例子顯示了一個簡單的使用分組地圖熊貓udf:從組中的每個值減去的意思。
@pandas_udf (df。模式,PandasUDFType.GROUPED_MAP)#輸入/輸出都是pandas.DataFramedefsubtract_mean(pdf):返回pdf.assign (v = pdf。v - pdf.v.mean ())df.groupby (“id”蘋果(subtract_mean)
在這個例子中,我們從每個值減去均值v v為每個組。分組的語義是“groupby”定義的函數,我。e,每個輸入pandas.DataFrame
用戶定義的函數有相同的“id”價值。這個用戶定義函數的輸入和輸出模式是相同的,所以我們通過“df。裝飾模式”pandas_udf
用於指定模式。
分組地圖熊貓udf也可以稱為獨立的Python函數驅動程序。這對調試非常有用,例如:
示例= df。過濾器(id= =1).toPandas ()#運行作為一個獨立的函數在一個熊貓。DataFrame並驗證結果subtract_mean.func(樣本)現在#運行與火花df.groupby (“id”蘋果(substract_mean)
在上麵的示例中,我們首先將火花的一個小子集DataFrame
到一個pandas.DataFrame
,然後運行subtract_mean作為一個獨立的Python函數。驗證邏輯函數後,我們可以調用UDF與火花在整個數據集。
普通最小二乘線性回歸
最後一個例子展示了如何使用statsmodels運行OLS線性回歸為每個組。對於每一組,我們計算βb = (b1, b2) X = (x1, x2)根據統計模型Y = bX + c。
進口statsmodels.api作為sm# df有四個列:id, y, x1, x2group_column =“id”y_column =“y”x_columns = [x1的,“x2”]模式= df。選擇(group_column * x_columns) . schema@pandas_udf (模式,PandasUDFType.GROUPED_MAP)#輸入/輸出都是pandas.DataFramedefols(pdf):group_key = pdf [group_column] .iloc [0]y = pdf (y_column)X = pdf (x_columns)X = sm.add_constant (X)= sm模型。OLS (y, X) .fit ()返回pd。DataFrame ([[group_key] + [model.params[我]為我在x_columns]],列= [group_column] + x_columns)
β= df.groupby (group_column)蘋果(ols)
這個例子表明,分組地圖熊貓udf可以用於任意python函數:pandas.DataFrame- >pandas.DataFrame
。返回的pandas.DataFrame
可以有不同的行和列數作為輸入。
性能比較
最後,我們想要展示row-at-a-time udf和熊貓udf之間的性能比較。我們跑微基準的三個上麵的示例(+ 1,累積概率和減去的意思)。
配置和方法論
我們跑的基準在單個節點上火花磚community edition的集群。
配置信息:
數據:10 m-row DataFrame Int列和兩列
集群:6.0 GB內存,0.88內核,1 DBU
磚運行時Scala版本:最新RC (4.0, 2.11)
詳細的實施的基準,檢查熊貓UDF筆記本。
如圖表所示,熊貓udf執行比row-at-a-time udf,從3 x / 100 x。
結論和未來的工作
即將到來的火花2.3版本放下大幅改善的基礎在Python中用戶自定義函數的功能和性能。在未來,我們打算引入支持大熊貓udf聚合和窗口函數。可以跟蹤相關工作火星- 22216。
熊貓udf是一個很好的例子引發社會的努力。我們要感謝布賴恩•卡特勒Hyukjin Kwon傑夫貝克Liang-Chi謝長廷,列夫沃爾什,李進,雷諾鑫,Takuya Ueshin, Wenchen粉絲,韋斯·麥金尼,小李和許多其他人對他們的貢獻。最後,特別感謝Apache箭頭社區使這項工作成為可能。