熊貓api函數
熊貓函數api使您能夠直接應用一個Python本機函數,熊貓實例並將其輸出到一個PySpark DataFrame。類似於熊貓用戶定義函數、功能的api也使用Apache箭頭傳輸數據和熊貓的數據;然而,Python類型提示在熊貓函數api中是可選的。
有三種類型的熊貓api函數:
分組的地圖
地圖
Cogrouped地圖
熊貓熊貓函數api利用相同的內部邏輯執行使用UDF。他們分享PyArrow等特點,支持SQL類型和配置。
有關更多信息,請參見博文新的熊貓udf和Python類型提示即將發布的Apache 3.0火花。
分組的地圖
你改變你的分組數據使用.applyInPandas groupBy () ()
實現“split-apply-combine”模式。Split-apply-combine包括三個步驟:
把數據分成組使用
DataFrame.groupBy
。應用一個函數在每個組。函數的輸入和輸出都是
pandas.DataFrame
。輸入數據包含每組的所有行和列。結合成一個新的結果
DataFrame
。
使用.applyInPandas groupBy () ()
,您必須定義如下:
一個Python函數,定義了每一組計算
一個
StructType
對象或一個字符串,該字符串定義了輸出的模式DataFrame
返回的列標簽pandas.DataFrame
必須匹配輸出模式如果定義的字段名稱指定為字符串,或匹配字段數據類型的位置如果不是字符串,例如,整數指數。看到pandas.DataFrame當構建一個如何標簽列pandas.DataFrame
。
所有數據一組應用函數之前被加載到內存中。這可能導致內存不足異常,特別是尺寸傾向。的配置maxRecordsPerBatch不應用於組織和它是由你來確保分組數據符合可用的內存。
下麵的例子展示了如何使用蘋果groupby () ()
從每個值減去均值。
df=火花。createDataFrame(((1,1.0),(1,2.0),(2,3.0),(2,5.0),(2,10.0)),(“id”,“v”))defsubtract_mean(pdf):# pdf pandas.DataFramev=pdf。v返回pdf。分配(v=v- - - - - -v。的意思是())df。groupby(“id”)。applyInPandas(subtract_mean,模式=“id, v雙”)。顯示()# + - - - + - - - +# | v | | id# + - - - + - - - +# | 1 | -0.5 |# | 1 | 0.5 |# | 2 | -3.0 |# | 2 | -1.0 |# | 2 | 4.0 |# + - - - + - - - +
地圖
你和熊貓執行映射操作實例DataFrame.mapInPandas ()
以變換的迭代器pandas.DataFrame
到另一個迭代器的pandas.DataFrame
代表當前PySpark DataFrame並返回結果作為PySpark DataFrame。
的底層函數和輸出迭代器pandas.DataFrame
。它可以返回輸出的任意長度與熊貓一些udf係列等係列。
下麵的例子展示了如何使用mapInPandas ()
:
df=火花。createDataFrame(((1,21),(2,30.)),(“id”,“年齡”))deffilter_func(迭代器):為pdf在迭代器:收益率pdf(pdf。id= =1]df。mapInPandas(filter_func,模式=df。模式)。顯示()# + - - - + - - - +# | | | id年齡# + - - - + - - - +# | 1 | | 21日# + - - - + - - - +
詳細的用法,看到pyspark.sql.DataFrame.mapInPandas。
Cogrouped地圖
與熊貓cogrouped地圖操作實例,使用.applyInPandas .cogroup DataFrame.groupby () () ()
兩個PySpark cogroupDataFrame
年代,一個共同的關鍵,然後應用一個Python函數每個cogroup如圖所示:
洗牌,這樣的數據組的每個DataFrame cogrouped在一起共享一個關鍵。
一個函數應用於每個cogroup。函數的輸入是2
pandas.DataFrame
(有一個可選的元組表示的關鍵)。的輸出是一個函數pandas.DataFrame
。結合
pandas.DataFrame
從所有組織成一個新的PySpark年代DataFrame
。
使用.applyInPandas .cogroup groupBy () () ()
,您必須定義如下:
一個Python函數,定義了每個cogroup計算。
一個
StructType
對象或一個字符串,該字符串定義了模式的輸出PySparkDataFrame
。
返回的列標簽pandas.DataFrame
必須匹配輸出模式如果定義的字段名稱指定為字符串,或匹配字段數據類型的位置如果不是字符串,例如,整數指數。看到pandas.DataFrame當構建一個如何標簽列pandas.DataFrame
。
cogroup所有數據加載到內存中之前的函數。這可能導致內存不足異常,特別是尺寸傾向。的配置maxRecordsPerBatch不是應用,它是由你來確保cogrouped數據符合可用內存。
下麵的例子展示了如何使用.applyInPandas .cogroup groupby () () ()
執行一個asof加入
兩個數據集。
進口熊貓作為pddf1=火花。createDataFrame(((20000101,1,1.0),(20000101,2,2.0),(20000102,1,3.0),(20000102,2,4.0)),(“時間”,“id”,“v1”))df2=火花。createDataFrame(((20000101,1,“x”),(20000101,2,“y”)),(“時間”,“id”,“v2”))defasof_join(l,r):返回pd。merge_asof(l,r,在=“時間”,通過=“id”)df1。groupby(“id”)。cogroup(df2。groupby(“id”))。applyInPandas(asof_join,模式=“時間int, int id, v1加倍,v2字符串“)。顯示()# + - - - - - - - - - - + - - - + - - - + - - - +# | | v1 v2 | | | id# + - - - - - - - - - - + - - - + - - - + - - - +# | 20000101 | 1 | 1.0 | | x# | 20000102 | 1 | 3.0 | | x# | 20000101 | 2 | 2.0 | |# | 20000102 | 2 | 4.0 | |# + - - - - - - - - - - + - - - + - - - + - - - +