新的熊貓udf和Python類型提示即將發布的Apache 3.0火花
2020年5月20日 在工程的博客
熊貓用戶定義函數(udf)是一個最重要的增強Apache火花TM對數據的科學。他們帶來很多好處,比如讓用戶使用熊貓api和提高性能。
然而,熊貓udf演變有機地隨著時間的推移,這導致了一些不一致和在用戶中製造混亂。Apache火花的完整版本3.0,預計很快就會公布),將引入一個新的接口利用的熊貓udfPython類型提示解決熊貓UDF的擴散,幫助他們變得更加神諭的,自描述的類型。
這篇文章介紹了新的熊貓udf Python類型提示,和新的熊貓函數api包括分組地圖,地圖,co-grouped地圖。
熊貓udf
熊貓udf在火花2.3中引入的,見也介紹熊貓PySpark UDF。熊貓是眾所周知的科學家和數據無縫集成了許多Python庫和包等NumPy,statsmodel,scikit-learn,熊貓udf允許數據科學家不僅擴展他們的工作負載,還利用熊貓api在Apache火花。
用戶定義的函數執行的:
- Apache箭頭,JVM之間直接交換數據和Python驅動程序/執行人的(反)序列化成本幾乎為零。
- 熊貓在函數內部,與熊貓實例和api。
熊貓熊貓udf使用api函數內部和Apache箭頭交換數據。它允許矢量化操作,可以提高性能100 x,而row-at-a-time Python udf。
下麵的例子展示了一個熊貓UDF之間簡單地添加一個值,它與調用的函數定義pandas_plus_one
裝飾的pandas_udf
與熊貓UDF類型指定為PandasUDFType.SCALAR
。
從pyspark.sql.functions進口pandas_udf, PandasUDFType@pandas_udf (“雙”,PandasUDFType.SCALAR)defpandas_plus_one(v):#“v”是一個熊貓係列返回v.add (1)#輸出一個熊貓係列火花。範圍(10).select (pandas_plus_one (“id”)),告訴()
Python函數和輸出一個熊貓係列。您可以執行一個矢量化操作添加一個值通過使用豐富的熊貓在這個函數api。(反)序列化也自動矢量化利用Apache箭頭。
Python類型提示
Python類型提示被官方介紹PEP 484Python 3.5。靜態類型提示是一個官方的方式顯示在Python中值的類型。看下麵的例子。
def問候(名稱:str)- - - >str:返回“你好”+名字
名稱:str
顯示參數是str的類型和名稱- >
語法表示問候()
函數返回一個字符串。
Python類型提示兩個重要造福PySpark和熊貓UDF上下文。
- 它給一個明確的定義的函數應該做什麼,讓用戶更容易理解的代碼。例如,除非它是記錄,用戶無法知道
問候
可以采取沒有一個
如果沒有提示類型。它可以避免需要文檔這樣微妙的情況下的測試用例和/或為用戶測試和自己弄清楚。 - 它能讓我們更容易執行靜態分析。ide,比如PyCharm和Visual Studio代碼可以利用類型注解提供代碼完成,顯示錯誤,並支持更好的轉向定義功能。
熊貓UDF擴散類型
Apache火花2.3發布以來,一些新熊貓udf已經實現,使得用戶很難學習新規範和如何使用它們。例如,這裏有三個熊貓udf,輸出幾乎相同的結果:
從pyspark.sql.functions進口pandas_udf, PandasUDFType@pandas_udf (“長”,PandasUDFType.SCALAR)defpandas_plus_one(v):#“v”是一個熊貓係列返回v +1#輸出一個熊貓係列火花。範圍(10).select (pandas_plus_one (“id”)),告訴()
從pyspark.sql.functions進口pandas_udf, PandasUDFType
#新型3.0熊貓UDF的火花。@pandas_udf (“長”,PandasUDFType.SCALAR_ITER)defpandas_plus_one(itr):#“迭代器”是一個熊貓係列的迭代器。返回地圖(λv: +1itr)#熊貓係列的輸出迭代器。火花。範圍(10).select (pandas_plus_one (“id”)),告訴()
從pyspark.sql.functions進口pandas_udf, PandasUDFType@pandas_udf (“id”,PandasUDFType.GROUPED_MAP)defpandas_plus_one(pdf):#是一個熊貓DataFrame pdf返回pdf +1#輸出一個熊貓DataFrame#“pandas_plus_one”_only_可以用於“groupby(…)蘋果(…)”火花。範圍(10).groupby (“id”蘋果(pandas_plus_one),告訴()
盡管這些UDF類型有不同的目的,一些可以適用。在這個簡單的例子中,你可以使用任何的三個。然而,每個熊貓udf預計不同的輸入和輸出類型,和工作在不同的方式不同的語義和不同的性能。它使用戶感到困惑關於哪一個使用和學習,以及每個是如何運作的。
此外,pandas_plus_one
在第一和第二的情況下可以使用常規PySpark列。考慮的參數withColumn
或函數等其他表達式的組合pandas_plus_one (" id ") + 1
。然而,過去的pandas_plus_one
隻能使用嗎groupby(…)蘋果(pandas_plus_one)
。
這種級別的複雜性與火花引發了很多討論開發人員,和開車的努力推出新的熊貓通過一個api與Python類型提示正式的求婚。我們的目標是讓用戶自然地表達他們的熊貓udf使用Python類型提示如上問題情況下沒有混亂。例如,上麵的情況下可以寫成如下:
defpandas_plus_one(v: pd.Series)- > pd.Series:返回v +1
defpandas_plus_one(itr:迭代器(pd.Series))- >迭代器(pd.Series):返回地圖(λv: +1itr)
defpandas_plus_one(pdf: pd.DataFrame)- > pd.DataFrame:返回pdf +1
新的熊貓與Python api類型提示
為了解決老熊貓udf的複雜性,從Apache火花與Python 3.6和3.0以上,Python類型提示等pandas.Series
,pandas.DataFrame
,元組
,迭代器
可以用來表達新的熊貓UDF類型。
此外,老熊貓udf被分成兩個API類:熊貓udf和熊貓的API函數。盡管他們在內部以類似的方式工作,有明顯的差異。
你可以以同樣的方式對待熊貓udf,你使用其他PySpark列實例。然而,您不能使用這些列的熊貓api函數實例。這裏有兩個例子:
#熊貓UDF進口熊貓作為pd從pyspark.sql.functions進口pandas_udf、log2坳@pandas_udf (“長”)defpandas_plus_one(s: pd.Series)- > pd.Series:返回s +1# pandas_plus_one (" id ")等於視為SQL expression_內部。#即可以結合其他列、函數和表達式。火花。範圍(10).select (pandas_plus_one(坳(“id”)- - -1)+ log2 (“id”)+1),告訴()
#熊貓API函數從打字進口迭代器進口熊貓作為pd
defpandas_plus_one(迭代器:迭代器(pd.DataFrame))- >迭代器(pd.DataFrame):返回地圖(λv: +1迭代器)
# pandas_plus_one隻是一個常規的Python函數,mapInPandas#邏輯視為_a單獨的SQL查詢plan_而不是SQL表達式。#因此,直接互動與其他表達式是不可能的。火花。範圍(10)。地圖InPandas(pandas_plus_one, schema=“id”),告訴()
同時,注意,熊貓udf需要Python類型的暗示而類型提示在熊貓函數api目前可選的。類型提示計劃在熊貓api和功能可能需要在未來。
新的熊貓udf
而不是手動定義和指定每個熊貓UDF類型,新的熊貓UDF推斷熊貓UDF類型從給定的Python類型暗示Python函數。目前有四個支持Python類型的情況下在熊貓udf提示:
- 係列,係列
- 迭代器係列的迭代器係列
- 迭代器的多個係列的迭代器係列
- 係列標量(單個值)
我們深入了解每個案例之前,讓我們看看三個重點工作與新熊貓udf。
- 盡管Python類型提示Python世界是可選的,您必須指定Python類型提示輸入和輸出,以使用新的熊貓udf。
- 用戶仍然可以使用老方法通過手動指定熊貓UDF類型。然而,鼓勵使用Python類型提示。
- 提示應該使用類型
pandas.Series
在所有情況下。然而,有一個變體pandas.DataFrame
應該用於其輸入或輸出類型提示:當輸入或輸出列的StructType。
看看下麵的例子:進口熊貓作為pd從pyspark.sql.functions進口pandas_udf df = spark.createDataFrame ([[1,“字符串”,(“一個嵌套的字符串”)]],“long_col長string_col字符串,struct_col struct < col1字符串>”)@pandas_udf (“col1字符串,col2長”)defpandas_plus_len(s1: pd。係列,s2: pd。係列,pdf: pd.DataFrame)- > pd.DataFrame:#專欄係列,結構是DataFrame列。pdf (“col2”)= s1 + s2。str。len()返回pdf# struct列預計DataFrame返回df.select (pandas_plus_len (“long_col”,“string_col”,“struct_col”)),告訴()
係列,係列
係列,係列映射到標量熊貓UDF在Apache火花2.3中引入的。提示類型可以表示為pandas.Series,。。。- > pandas.Series
。將一個或多個給定的函數pandas.Series
和輸出一個pandas.Series
。輸出長度預計將與輸入相同。
進口熊貓作為pd從pyspark.sql.functions進口pandas_udf@pandas_udf (“長”)defpandas_plus_one(s: pd.Series)- > pd.Series:返回s +1火花。範圍(10).select (pandas_plus_one (“id”)),告訴()
上麵的例子中可以映射到舊風格與標量熊貓UDF,如下。
從pyspark.sql.functions進口pandas_udf, PandasUDFType@pandas_udf (“長”,PandasUDFType.SCALAR)defpandas_plus_one(v):返回v +1火花。範圍(10).select (pandas_plus_one (“id”)),告訴()
迭代器係列的迭代器係列
這是一種新型的熊貓UDF Apache火花3.0。係列係列的一個變種,提示類型可以表示為迭代器(pd。係列)- >迭代器(pd.Series)
。的函數和輸出迭代器pandas.Series
。
整個輸出的長度必須相同長度的輸入。因此,它可以預取數據從輸入迭代器,隻要整個輸入和輸出的長度是相同的。給定的函數應該接受一個單獨的列作為輸入。
從打字進口迭代器進口熊貓作為pd從pyspark.sql.functions進口pandas_udf@pandas_udf (“長”)defpandas_plus_one(迭代器:迭代器(pd.Series))- >迭代器(pd.Series):返回地圖(λs: s +1迭代器)火花。範圍(10).select (pandas_plus_one (“id”)),告訴()
時也有用UDF執行需要昂貴的一些初始化狀態。下麵的偽代碼演示了這種情況。
@pandas_udf (“長”)def計算(迭代器:迭代器(pd.Series))- >迭代器(pd.Series):#做一些昂貴的初始化狀態狀態= very_expensive_initialization ()為x在迭代器:#負責整個迭代器使用。收益率calculate_with_state (x,狀態)df.select(計算(“價值”)),告訴()
迭代器的級數迭代器的級數也可以映射到老熊貓UDF的風格。看下麵的例子。
從pyspark.sql.functions進口pandas_udf, PandasUDFType@pandas_udf (“長”,PandasUDFType.SCALAR_ITER)defpandas_plus_one(迭代器):返回地圖(λs: s +1迭代器)火花。範圍(10).select (pandas_plus_one (“id”)),告訴()
迭代器的多個係列的迭代器係列
這種類型的熊貓UDF將在Apache 3.0火花,還介紹了與迭代器係列的迭代器係列。提示類型可以表示為迭代器(元組(熊貓。係列中,…]]- >迭代器[pandas.Series]
。
它有類似的特征和限製的迭代器係列的迭代器係列。給定的函數迭代器的一個元組pandas.Series
和輸出迭代器pandas.Series
。時也有用時使用一些州和預取輸入數據。整個輸出的長度也應與整個輸入的長度相同。然而,給定的函數應該多個列作為輸入,與迭代器係列的迭代器係列。
從打字進口迭代器,元組進口熊貓作為pd從pyspark.sql.functions進口pandas_udf@pandas_udf (“長”)defmultiply_two(迭代器:迭代器(元組(pd。係列,pd.Series]])- >迭代器(pd.Series):返回(a * b為a、b在迭代器)火花。範圍(10).select (multiply_two (“id”,“id”)),告訴()
這也可以映射到老熊貓UDF樣式如下。
從pyspark.sql.functions進口pandas_udf, PandasUDFType@pandas_udf (“長”,PandasUDFType.SCALAR_ITER)defmultiply_two(迭代器):返回(a * b為a、b在迭代器)火花。範圍(10).select (multiply_two (“id”,“id”)),告訴()
係列標量
係列標量映射到分組聚合熊貓UDF在Apache火花2.4中引入的。提示表示為類型pandas.Series,。。。- >任何
。函數接受一個或多個熊貓。係列和輸出原始數據類型。返回標量可以是一個Python原始類型,例如,int
,浮動
或NumPy數據類型等numpy.int64
,numpy.float64
等。任何
理想情況下應該相應特定的標量類型。
進口熊貓作為pd從pyspark.sql.functions進口pandas_udf從pyspark.sql進口窗口
df = spark.createDataFrame (((1,1.0),(1,2.0),(2,3.0),(2,5.0),(2,10.0),(“id”,“v”))@pandas_udf (“替身”)defpandas_mean(v: pd.Series)- - - >浮動:返回v。總和()df.select (pandas_mean (df (“v”])),告訴()df.groupby (“id”).agg (pandas_mean (df (“v”])),告訴()df.select (pandas_mean (df (“v”]).over (Window.partitionBy (“id”))),告訴()
上麵的例子可以轉化成分組總熊貓UDF的例子可以看到:
進口熊貓作為pd從pyspark.sql.functions進口pandas_udf, PandasUDFType從pyspark.sql進口窗口
df = spark.createDataFrame (((1,1.0),(1,2.0),(2,3.0),(2,5.0),(2,10.0),(“id”,“v”))@pandas_udf (“替身”,PandasUDFType.GROUPED_AGG)defpandas_mean(v):返回v。總和()df.select (pandas_mean (df (“v”])),告訴()df.groupby (“id”).agg (pandas_mean (df (“v”])),告訴()df.select (pandas_mean (df (“v”]).over (Window.partitionBy (“id”))),告訴()
新的熊貓函數api
這個新類別在Apache 3.0火花使您能夠直接應用一個Python本機函數,以熊貓和輸出實例對PySpark DataFrame。熊貓函數api支持Apache火花3.0:分組地圖,地圖,co-grouped地圖。
注意分組地圖熊貓UDF現在歸類為一組地圖熊貓API函數。如前所述,Python類型提示目前大熊貓api函數是可選的。
分組的地圖
熊貓分組版圖的API函數applyInPandas
在分組DataFrame,例如,df.groupby (…)
。這是映射到分組地圖熊貓UDF在老熊貓UDF類型。它將每一組映射到每一個pandas.DataFrame
的函數。請注意,它不需要為輸出是輸入的長度相同。
進口熊貓作為pd
df = spark.createDataFrame (((1,1.0),(1,2.0),(2,3.0),(2,5.0),(2,10.0),(“id”,“v”))defsubtract_mean(pdf: pd.DataFrame)- > pd.DataFrame:v = pdf.v返回pdf。作為sign(v=v - v.mean())df.groupby (“id”)。applyInPandas (subtract_mean、模式= df.schema),告訴()
分組的分組的映射類型映射到地圖熊貓UDF支持從火花2.3,如下:
進口熊貓作為pd從pyspark.sql.functions進口pandas_udf, PandasUDFType
df = spark.createDataFrame (((1,1.0),(1,2.0),(2,3.0),(2,5.0),(2,10.0),(“id”,“v”))@pandas_udf (df。模式,PandasUDFType.GROUPED_MAP)defsubtract_mean(pdf):v = pdf.v返回pdf。作為sign(v=v - v.mean())df.groupby (“id”蘋果(subtract_mean),告訴()
地圖
熊貓地圖API函數mapInPandas
DataFrame。它在Apache火花3.0是新的。它在每個分區地圖每一個批處理和轉換。該函數的迭代器pandas.DataFrame
和輸出迭代器pandas.DataFrame
。輸出長度不需要匹配輸入的大小。
從打字進口迭代器進口熊貓作為pddf = spark.createDataFrame (((1,21),(2,30.),(“id”,“年齡”))defpandas_filter(迭代器:迭代器(pd.DataFrame))- >迭代器(pd.DataFrame):為pdf在迭代器:收益率pdf [pdf。id= =1]
df。地圖InPandas(pandas_filter, schema=df.schema).show()
Co-grouped地圖
Co-grouped地圖,applyInPandas
在一個co-grouped DataFrame等df.groupby (…) .cogroup (df.groupby (…))
,還將介紹Apache 3.0火花。類似於分組的地圖,它將每一組映射到每一個pandas.DataFrame
函數,但組與另一個DataFrame通過常見的鍵(s),然後函數應用於每個cogroup。同樣,沒有長度限製輸出。
進口熊貓作為pd
df1 = spark.createDataFrame (((1201年,1,1.0),(1201年,2,2.0),(1202年,1,3.0),(1202年,2,4.0)),(“時間”,“id”,“v1”))df2 = spark.createDataFrame (((1201年,1,“x”),(1201年,2,“y”),(“時間”,“id”,“v2”))defasof_join(左:pd。DataFrame吧:pd.DataFrame)- > pd.DataFrame:返回pd。merge_asof(左,右,=“時間”,=“id”)df1.groupby (“id”).cogroup (df2.groupby (“id”)).applyInPandas (asof_join“時間int, int id, v1加倍,v2字符串“),告訴()
結論和未來的工作
即將發布的Apache 3.0(火花閱讀我們的預覽博客了解詳情)。將提供Python類型提示,使它更簡單可供用戶表達熊貓udf和熊貓的api函數。在未來,我們應該考慮添加支持其他類型提示組合在熊貓udf和熊貓的api函數。目前,許多可能的組合的支持的情況下隻有幾個Python類型的提示。還有其他正在進行的討論Apache火花社區。訪問一邊討論和未來的改進要學習更多的知識。