跳轉到主要內容
工程的博客

介紹熊貓PySpark UDF

如何運行與PySpark本機Python代碼,快。
通過李進

2017年10月30日 工程的博客

分享這篇文章
注意:火花3.0引入了一個新的熊貓UDF。你可以找到更多的細節在接下來的博文:新的熊貓udf和Python類型提示即將發布的Apache 3.0火花

這是一個客人從李進社區後,軟件工程師在兩個σ投資,LP在紐約。這個博客也在兩個σ

在磚試試這個筆記本

更新:本博客更新2月22日,2018年,包括一些變化。

這篇文章介紹了大熊貓udf(也稱為矢量化udf)功能即將到來的Apache火花2.3版本中,顯著地提高了性能和可用性的Python用戶定義函數(udf)。

在過去的幾年中,Python已經成為了默認的語言數據科學家。包等熊貓,numpy,statsmodel,scikit-learn獲得了偉大的收養,成為主流的工具包。與此同時,Apache火花已成為事實上的標準在處理大數據。使數據科學家利用大數據的價值,火花在version 0.7中,添加了一個Python API支持用戶定義函數。這些用戶定義函數操作one-row-at-a-time,因此遭受高序列化和調用開銷。因此,許多數據管道定義在Java udf和Scala,然後從Python調用它們。

熊貓udf之上的Apache箭頭帶給你最好的兩個世界的能力定義低開銷,高性能udf完全在Python中。

在火花2.3中,將會有兩種類型的熊貓udf:標量和分組的地圖。接下來,我們使用四個示例程序說明它們的使用:+ 1,累積概率,減去的意思是,普通最小二乘線性回歸。

標量熊貓udf

標量熊貓udf用於vectorizing標量操作。定義一個標量熊貓UDF,簡單地使用@pandas_udf注釋一個Python函數pandas.Series作為參數,並返回另一個pandas.Series相同的大小。下麵我們使用兩個例子說明:+ 1和累積概率。

+ 1

計算v + 1是一個簡單的示例演示差異row-at-a-time udf和標量熊貓udf。注意,內置列運營商可以執行在這個場景中快得多。

使用row-at-a-time udf:

pyspark.sql.functions進口udf#使用udf來定義一個row-at-a-time udf@udf (“雙”)#輸入/輸出都是一個雙精度值defplus_one(v):返回v +1df.withColumn (“v2”plus_one (df.v))

使用熊貓udf:

pyspark.sql.functions進口pandas_udf, PandasUDFType#使用pandas_udf定義一個熊貓UDF@pandas_udf (“雙”,PandasUDFType.SCALAR)#輸入/輸出都是熊貓。一係列的雙打defpandas_plus_one(v):返回v +1df.withColumn (“v2”pandas_plus_one (df.v))

上麵的示例定義一個row-at-a-time UDF“plus_one”和一個標量熊貓UDF“pandas_plus_one”執行相同的計算“+ 1”。UDF定義是相同的除了函數修飾符:“UDF”與“pandas_udf”。

row-at-a-time版本,用戶定義函數獲取雙“v”並返回的結果作為一個雙“v + 1”。在熊貓的版本中,用戶定義的函數pandas.Series“v”,並返回“v + 1”的結果pandas.Series。因為“v + 1”是矢量化pandas.Series熊貓版本row-at-a-time版本要快得多。

請注意,有兩個重要的需求使用標量熊貓udf時:

  • 輸入和輸出序列必須具有相同的大小。
  • 一列是如何分成多個pandas.Series是內部引發,因此用戶定義函數的結果必須是獨立的分裂。

累積概率

這個例子顯示了一個更實用的使用標量熊貓UDF:計算累積概率一個值的正態分布N(0,1)使用scipy包中。

進口熊貓作為pdscipy進口統計數據@pandas_udf (“雙”)def提供(v):返回pd.Series (stats.norm.cdf (v))
              df.withColumn (“cumulative_probability”,cdf實驗組(df.v))

stats.norm.cdf一個標量值和工作pandas.Series,這個例子可以寫row-at-a-time udf。與前麵的示例相似,熊貓版本運行更快,見後麵的“性能比較”部分。

分組地圖熊貓udf

Python用戶相當熟悉split-apply-combine模式在數據分析。分組地圖熊貓udf設計對於這個場景,他們操作的所有數據組,例如,“對於每一個日期,應用此操作”。

分組地圖熊貓udf第一次分裂火花DataFrame分成小組根據groupby操作符中指定的條件,適用於一個用戶定義的函數(pandas.DataFrame- >pandas.DataFrame每組),結合並返回結果作為一個新的火花DataFrame

分組地圖熊貓udf使用相同的函數修飾符pandas_udf作為標量熊貓udf,但是他們有一些差異:

  • 輸入的用戶定義函數:
    • 標量:pandas.Series
    • 分組的地圖:pandas.DataFrame
  • 用戶定義函數的輸出:
    • 標量:pandas.Series
    • 分組的地圖:pandas.DataFrame
  • 語義分組:
    • 標量:沒有分組語義
    • 分組地圖:定義為“groupby”條款
  • 輸出大小:
    • 標量:作為輸入的大小相同
    • 分組地圖:任何大小
  • 函數的返回類型修飾符:
    • 數量:一個數據類型指定返回的類型pandas.Series
    • 分組的地圖:StructType返回指定每一列的名稱和類型pandas.DataFrame

接下來,讓我們走進兩個例子來說明用例分組地圖熊貓udf。

減去的意思

這個例子顯示了一個簡單的使用分組地圖熊貓udf:從組中的每個值減去的意思。

@pandas_udf (df。模式,PandasUDFType.GROUPED_MAP)#輸入/輸出都是pandas.DataFramedefsubtract_mean(pdf):返回pdf.assign (v = pdf。v - pdf.v.mean ())df.groupby (“id”蘋果(subtract_mean)

在這個例子中,我們從每個值減去均值v v為每個組。分組的語義是“groupby”定義的函數,我。e,每個輸入pandas.DataFrame用戶定義的函數有相同的“id”價值。這個用戶定義函數的輸入和輸出模式是相同的,所以我們通過“df。裝飾模式”pandas_udf用於指定模式。

分組地圖熊貓udf也可以稱為獨立的Python函數驅動程序。這對調試非常有用,例如:

示例= df。過濾器(id= =1).toPandas ()#運行作為一個獨立的函數在一個熊貓。DataFrame並驗證結果subtract_mean.func(樣本)現在#運行與火花df.groupby (“id”蘋果(substract_mean)

在上麵的示例中,我們首先將火花的一個小子集DataFrame到一個pandas.DataFrame,然後運行subtract_mean作為一個獨立的Python函數。驗證邏輯函數後,我們可以調用UDF與火花在整個數據集。

普通最小二乘線性回歸

最後一個例子展示了如何使用statsmodels運行OLS線性回歸為每個組。對於每一組,我們計算βb = (b1, b2) X = (x1, x2)根據統計模型Y = bX + c

進口statsmodels.api作為sm# df有四個列:id, y, x1, x2group_column =“id”y_column =“y”x_columns = [x1的,“x2”]模式= df。選擇(group_column * x_columns) . schema@pandas_udf (模式,PandasUDFType.GROUPED_MAP)#輸入/輸出都是pandas.DataFramedefols(pdf):group_key = pdf [group_column] .iloc [0]y = pdf (y_column)X = pdf (x_columns)X = sm.add_constant (X)= sm模型。OLS (y, X) .fit ()返回pd。DataFrame ([[group_key] + [model.params[我]x_columns]],列= [group_column] + x_columns)
              β= df.groupby (group_column)蘋果(ols)

這個例子表明,分組地圖熊貓udf可以用於任意python函數:pandas.DataFrame- >pandas.DataFrame。返回的pandas.DataFrame可以有不同的行和列數作為輸入。

性能比較

最後,我們想要展示row-at-a-time udf和熊貓udf之間的性能比較。我們跑微基準的三個上麵的示例(+ 1,累積概率和減去的意思)。

配置和方法論

我們跑的基準在單個節點上火花磚community edition的集群。

配置信息:
數據:10 m-row DataFrame Int列和兩列
集群:6.0 GB內存,0.88內核,1 DBU
磚運行時Scala版本:最新RC (4.0, 2.11)

詳細的實施的基準,檢查熊貓UDF筆記本

如圖表所示,熊貓udf執行比row-at-a-time udf,從3 x / 100 x。

結論和未來的工作

即將到來的火花2.3版本放下大幅改善的基礎在Python中用戶自定義函數的功能和性能。在未來,我們打算引入支持大熊貓udf聚合和窗口函數。可以跟蹤相關工作火星- 22216

熊貓udf是一個很好的例子引發社會的努力。我們要感謝布賴恩•卡特勒Hyukjin Kwon傑夫貝克Liang-Chi謝長廷,列夫沃爾什,李進,雷諾鑫,Takuya Ueshin, Wenchen粉絲,韋斯·麥金尼,小李和許多其他人對他們的貢獻。最後,特別感謝Apache箭頭社區使這項工作成為可能。

接下來是什麼

你可以試試的熊貓UDF筆記本現在這個功能是可用的磚4.0運行時β

免費試著磚

相關的帖子

看到所有工程的博客的帖子
Baidu
map