熊貓api函數

熊貓函數api使您能夠直接應用一個Python本機函數,熊貓實例並將其輸出到一個PySpark DataFrame。類似於熊貓用戶定義函數、功能的api也使用Apache箭頭傳輸數據和熊貓的數據;然而,Python類型提示在熊貓函數api中是可選的。

有三種類型的熊貓api函數:

  • 分組的地圖

  • 地圖

  • Cogrouped地圖

熊貓熊貓函數api利用相同的內部邏輯執行使用UDF。他們分享PyArrow等特點,支持SQL類型和配置。

有關更多信息,請參見博文新的熊貓udf和Python類型提示即將發布的Apache 3.0火花

分組的地圖

你改變你的分組數據使用.applyInPandas groupBy () ()實現“split-apply-combine”模式。Split-apply-combine包括三個步驟:

  • 把數據分成組使用DataFrame.groupBy

  • 應用一個函數在每個組。函數的輸入和輸出都是pandas.DataFrame。輸入數據包含每組的所有行和列。

  • 結合成一個新的結果DataFrame

使用.applyInPandas groupBy () (),您必須定義如下:

  • 一個Python函數,定義了每一組計算

  • 一個StructType對象或一個字符串,該字符串定義了輸出的模式DataFrame

返回的列標簽pandas.DataFrame必須匹配輸出模式如果定義的字段名稱指定為字符串,或匹配字段數據類型的位置如果不是字符串,例如,整數指數。看到pandas.DataFrame當構建一個如何標簽列pandas.DataFrame

所有數據一組應用函數之前被加載到內存中。這可能導致內存不足異常,特別是尺寸傾向。的配置maxRecordsPerBatch不應用於組織和它是由你來確保分組數據符合可用的內存。

下麵的例子展示了如何使用蘋果groupby () ()從每個值減去均值。

df=火花createDataFrame(((1,1.0),(1,2.0),(2,3.0),(2,5.0),(2,10.0)),(“id”,“v”))defsubtract_mean(pdf):# pdf pandas.DataFramev=pdfv返回pdf分配(v=v- - - - - -v的意思是())dfgroupby(“id”)applyInPandas(subtract_mean,模式=“id, v雙”)顯示()# + - - - + - - - +# | v | | id# + - - - + - - - +# | 1 | -0.5 |# | 1 | 0.5 |# | 2 | -3.0 |# | 2 | -1.0 |# | 2 | 4.0 |# + - - - + - - - +

詳細的用法,看到pyspark.sql.GroupedData.applyInPandas

地圖

你和熊貓執行映射操作實例DataFrame.mapInPandas ()以變換的迭代器pandas.DataFrame到另一個迭代器的pandas.DataFrame代表當前PySpark DataFrame並返回結果作為PySpark DataFrame。

的底層函數和輸出迭代器pandas.DataFrame。它可以返回輸出的任意長度與熊貓一些udf係列等係列。

下麵的例子展示了如何使用mapInPandas ():

df=火花createDataFrame(((1,21),(2,30.)),(“id”,“年齡”))deffilter_func(迭代器):pdf迭代器:收益率pdf(pdfid= =1]dfmapInPandas(filter_func,模式=df模式)顯示()# + - - - + - - - +# | | | id年齡# + - - - + - - - +# | 1 | | 21日# + - - - + - - - +

詳細的用法,看到pyspark.sql.DataFrame.mapInPandas

Cogrouped地圖

與熊貓cogrouped地圖操作實例,使用.applyInPandas .cogroup DataFrame.groupby () () ()兩個PySpark cogroupDataFrame年代,一個共同的關鍵,然後應用一個Python函數每個cogroup如圖所示:

  • 洗牌,這樣的數據組的每個DataFrame cogrouped在一起共享一個關鍵。

  • 一個函數應用於每個cogroup。函數的輸入是2pandas.DataFrame(有一個可選的元組表示的關鍵)。的輸出是一個函數pandas.DataFrame

  • 結合pandas.DataFrame從所有組織成一個新的PySpark年代DataFrame

使用.applyInPandas .cogroup groupBy () () (),您必須定義如下:

  • 一個Python函數,定義了每個cogroup計算。

  • 一個StructType對象或一個字符串,該字符串定義了模式的輸出PySparkDataFrame

返回的列標簽pandas.DataFrame必須匹配輸出模式如果定義的字段名稱指定為字符串,或匹配字段數據類型的位置如果不是字符串,例如,整數指數。看到pandas.DataFrame當構建一個如何標簽列pandas.DataFrame

cogroup所有數據加載到內存中之前的函數。這可能導致內存不足異常,特別是尺寸傾向。的配置maxRecordsPerBatch不是應用,它是由你來確保cogrouped數據符合可用內存。

下麵的例子展示了如何使用.applyInPandas .cogroup groupby () () ()執行一個asof加入兩個數據集。

進口熊貓作為pddf1=火花createDataFrame(((20000101,1,1.0),(20000101,2,2.0),(20000102,1,3.0),(20000102,2,4.0)),(“時間”,“id”,“v1”))df2=火花createDataFrame(((20000101,1,“x”),(20000101,2,“y”)),(“時間”,“id”,“v2”))defasof_join(l,r):返回pdmerge_asof(l,r,=“時間”,通過=“id”)df1groupby(“id”)cogroup(df2groupby(“id”))applyInPandas(asof_join,模式=“時間int, int id, v1加倍,v2字符串“)顯示()# + - - - - - - - - - - + - - - + - - - + - - - +# | | v1 v2 | | | id# + - - - - - - - - - - + - - - + - - - + - - - +# | 20000101 | 1 | 1.0 | | x# | 20000102 | 1 | 3.0 | | x# | 20000101 | 2 | 2.0 | |# | 20000102 | 2 | 4.0 | |# + - - - - - - - - - - + - - - + - - - + - - - +

詳細的用法,看到pyspark.sql.PandasCogroupedOps.applyInPandas