10分鍾從熊貓考拉在Apache火花
2020年3月31日 在工程的博客
這是客人社區從Haejoon李,一個軟件工程師在韓國和Mobigen考拉貢獻者。
熊貓是一個很好的工具來分析小數據集在單個機器上。在需要更大的數據集時,用戶通常選擇PySpark。然而,將代碼從熊貓PySpark並不容易PySpark api從熊貓api是大大不同的。考拉使學習曲線明顯更容易通過提供在PySpark pandas-like api。考拉,用戶可以利用的好處PySpark用最小的努力,從而能更快的找到價值。
許多博客等考拉:簡單從熊貓過渡到Apache的火花,怎麼處女Hyperloop減少處理時間和考拉從小時分鍾嗎,10分鍾考拉考拉官方文檔展示了緩解熊貓和考拉之間的轉換。然而,盡管擁有相同的api,有微妙之處,當工作在一個分布式環境中,用戶可能不明顯的熊貓。此外,隻有~ 70%的大熊貓在考拉api實現。在開源社區積極落實剩下的熊貓考拉api,用戶需要使用PySpark周圍工作。最後,考拉等也提供了自己的apito_spark (),DataFrame.map_in_pandas (),ks.sql ()等等,可以極大地提高用戶的工作效率。
因此,考拉並不意味著完全取代PySpark學習需求。相反,考拉使學習PySpark更容易通過提供pandas-like功能。精通考拉,用戶需要了解基本的火花和一些PySpark api。事實上,我們發現用戶使用考拉和PySpark互換往往從考拉提取最大的價值。
特別是,兩種類型的用戶從考拉最受益:
- 熊貓想要向外擴展的用戶使用PySpark和潛在的代碼遷移到PySpark。考拉是可伸縮的,使學習PySpark容易得多
- 引發用戶希望利用考拉成為更有效率。考拉提供pandas-like功能,這樣用戶不需要在PySpark建立這些函數本身
這篇博客不僅可以展示是多麼容易把代碼寫在熊貓考拉,但也討論使用考拉的最佳實踐;當你使用考拉作為熊貓的替代,如何使用PySpark繞開熊貓api在考拉時,當你申請Koalas-specific api來提高生產力,等。可以找到這個博客中的示例的筆記本在這裏。
分布式和分區考拉DataFrame
即使你可以應用相同的api考拉和熊貓一樣,引擎蓋下麵一個考拉DataFrame非常不同熊貓DataFrame。一個考拉DataFrame分布,這意味著數據分區,計算在不同的工人。另一方麵,所有的數據在一個熊貓DataFrame適合在一個機器。正如您將看到的,這種差異會導致不同的行為。
遷移從熊貓考拉
這一節將描述熊貓考拉,考拉如何支持移民不同的代碼示例。
對象的創建
下麵的包通常進口為了使用考拉。技術上這些包像numpy或熊貓不是必要的,但允許用戶利用考拉更靈活。
進口numpy作為np進口熊貓作為pd進口databricks.koalas作為ks
一係列考拉可以通過創建一個值列表,以同樣的方式作為一個熊貓係列。考拉係列也可以通過創建一個熊貓係列。
#創建一個熊貓係列ps = pd.Series ([1,3,5np.nan,6,8])#創建一個考拉係列ks = ks.Series ([1,3,5np.nan,6,8])#創建一個考拉係列通過熊貓係列ks = ks.Series (ps)ks = ks.from_pandas (ps)
最佳實踐:如下所示,考拉並不能保證指數的順序不像熊貓。這是因為在考拉幾乎所有業務運行在一個分布式的方式。您可以使用Series.sort_index()如果你想要求指標。
> > >ps01.013.025.03南46.058.0dtype: float64> > >ks3南25.013.058.001.046.0名稱:0dtype: float64#應用sort_index考拉係列()> > >kser.sort_index ()01.013.025.03南46.058.0名稱:0dtype: float64
一個考拉DataFrame也可以通過NumPy創建數組,以同樣的方式作為一個熊貓DataFrame。一個考拉DataFrame索引與PySpark DataFrame。因此,熊貓DataFrame指數將保存在考拉DataFrame後創建一個考拉DataFrame通過熊貓DataFrame。
#創建一個熊貓DataFramepdf = pd.DataFrame ({“一個”:np.random.rand (5),“B”:np.random.rand (5)})#創建一個考拉DataFramekdf = ks.DataFrame ({“一個”:np.random.rand (5),“B”:np.random.rand (5)})#創建一個考拉DataFrame通過熊貓DataFramekdf = ks.DataFrame (pdf)kdf = ks.from_pandas (pdf)
同樣地,指數的順序可以按照DataFrame.sort_index ()。
> > >pdf一個B00.0158690.58445510.2243400.63213220.6371260.82049530.8105770.38861140.0370770.876712> > >kdf.sort_index ()一個B00.0158690.58445510.2243400.63213220.6371260.82049530.8105770.38861140.0370770.876712
查看數據
與熊貓DataFrame一樣,考拉的頂行DataFrame可以顯示使用DataFrame.head ()。一般來說,從大熊貓時可能發生混淆轉換PySpark由於不同行為的熊貓和PySpark之間的頭(),但考拉支持這熊貓一樣的使用限製PySpark ()。
> > >kdf.head (2)一個B00.0158690.58445510.2243400.632132
考拉的快速統計摘要DataFrame可以顯示使用DataFrame.describe ()。
> > >kdf.describe ()一個B數5.0000005.000000的意思是0.3449980.660481性病0.3604860.195485最小值0.0158690.38861125%0.0370770.58445550%0.2243400.63213275年%0.6371260.820495馬克斯0.8105770.876712
排序可以使用考拉DataFrame DataFrame.sort_values ()。
> > >kdf.sort_values (=“B”)一個B30.8105770.38861100.0158690.58445510.2243400.63213220.6371260.82049540.0370770.876712
更換一個考拉DataFrame可以通過使用DataFrame.transpose ()。
> > >kdf.transpose ()01234一個0.0158690.2243400.6371260.8105770.037077B0.5844550.6321320.8204950.3886110.876712
最佳實踐:DataFrame.transpose()將失敗時的行數比計算的價值。max_rows,默認設置為1000。這是為了防止用戶在不知情的情況下執行昂貴的操作。考拉,你可以很容易地重置默認compute.max_rows。看官方文檔DataFrame.transpose ()為更多的細節。
> > >從databricks.koalas.config進口set_option, get_option> > >ks.get_option (“compute.max_rows”)1000年> > >ks.set_option (“compute.max_rows”,2000年)> > >ks.get_option (“compute.max_rows”)2000年
選擇或訪問數據
與熊貓DataFrame一樣,選擇單個列的考拉DataFrame返回一個係列。
> > > kdf [“一個”]#或kdf.A00.01586910.22434020.63712630.81057740.037077名稱:dtype: float64
選擇多個列的考拉DataFrame返回一個考拉DataFrame。
> > >kdf [[“一個”,“B”]]一個B00.0158690.58445510.2243400.63213220.6371260.82049530.8105770.38861140.0370770.876712
切從考拉DataFrame可供選擇的行。
> > >kdf.loc [1:2]一個B10.2243400.63213220.6371260.820495
切片行和列也是可用的。
> > >kdf.iloc [:3,1:2]B00.58445510.63213220.820495
最佳實踐:默認情況下,考拉不允許添加列來自不同DataFrames或係列考拉DataFrame因為添加列需要連接操作通常是昂貴的。這個操作可以啟用通過設置計算。ops_on_diff_frames為True。看到可用選項文檔的更多細節。
> > >ks = ks.Series ([One hundred.,200年,300年,400年,500年],指數= [0,1,2,3,4])> > >kdf [“C”]= ks
…ValueError:不能把係列或dataframe因為它從dataframe不同。為了讓這個操作,使“compute.ops_on_diff_frames”選擇。#那些需要管理選項> > >從databricks.koalas.config進口set_option, reset_option> > >set_option (“compute.ops_on_diff_frames”,真正的)> > >kdf [“C”]= ks#重置為默認在未來避免潛在的昂貴的操作> > >reset_option (“compute.ops_on_diff_frames”)> > >kdfA B C00.0158690.584455One hundred.10.2243400.632132200年30.8105770.388611400年20.6371260.820495300年40.0370770.876712500年
考拉DataFrame Python函數的應用
DataFrame.apply()是一個非常強大的函數由許多熊貓用戶青睞。考拉DataFrames也支持這個功能。
> > >kdf.apply (np.cumsum)A B C00.0158690.584455One hundred.10.2402101.216587300年31.0507861.605198700年21.6879132.4256931000年41.7249903.3024041500年
DataFrame.apply()也適用於軸= 1或“列”(0或“指數”是默認的)。
> > >kdf.apply (np。cumsum軸=1)A B C00.0158690.600324100.60032410.2243400.856472200.85647230.8105771.199187401.19918720.6371261.457621301.45762140.0370770.913788500.913788
同時,Python本機函數可以應用到一個考拉DataFrame。
> > >kdf.apply (λx: * *2)A B C00.0002520.34158810000年10.0503290.39959140000年30.6570350.151018160000年20.4059300.67321290000年40.0013750.768623250000年
最佳實踐:雖然沒問題,建議指定返回類型提示火花的返回類型內部考拉DataFrame應用用戶定義函數時。如果沒有指定返回類型提示,考拉函數運行一次一個小樣本推斷出火花返回類型可以是相當昂貴的。
> > >def廣場(x)- > ks.Series [np.float64]:…返回x * *2> > >kdf.apply(廣場)A B C00.4059300.67321290000.010.0013750.768623250000.020.0002520.34158810000.030.6570350.151018160000.040.0503290.39959140000.0
注意,DataFrame.apply()在考拉不支持全球聚合的設計。然而,如果低於計算數據的大小。shortcut_limit,可能工作因為它用熊貓作為快捷方式執行。
#正常工作以來的大小的數據> >ks.DataFrame ({“一個”:範圍(1000年)})蘋果(λ上校:坳。馬克斯())一個999年名稱:0dtype: int64#不正常工作以來大小的數據>計算。shortcut_limit (1000年)> >>ks.DataFrame ({“一個”:範圍(1001年)})蘋果(λ上校:坳。馬克斯())一個165年一個580年一個331年一個497年一個829年一個414年一個746年一個663年一個912年一個1000年一個248年一個82年名稱:0dtype: int64
最佳實踐:考拉,計算。shortcut_limit (default = 1000) computes a specified number of rows in pandas as a shortcut when operating on a small dataset. Koalas uses the pandas API directly in some cases when the size of input data is below this threshold. Therefore, setting this limit too high could slow down the execution or even lead to out-of-memory errors. The following code example sets a higher compute.shortcut_limit, which then allows the previous code to work properly. See the可用選項為更多的細節。
> > >ks.set_option (“compute.shortcut_limit”,1001年)> > >ks.DataFrame ({“一個”:範圍(1001年)})蘋果(λ上校:坳。馬克斯())一個1000年名稱:0dtype: int64
分組數據
分組數據列是一個公共api的熊貓。DataFrame.groupby()有考拉。
> > >kdf.groupby (“一個”)。總和()B C一個0.2243400.632132200年0.6371260.820495300年0.0158690.584455One hundred.0.8105770.388611400年0.0370770.876712500年
參見下麵的分組數據由多個列。
> > >kdf.groupby ([“一個”,“B”])。總和()C一個B0.2243400.632132200年0.0158690.584455One hundred.0.0370770.876712500年0.8105770.388611400年0.6371260.820495300年
策劃和可視化數據
在熊貓,DataFrame。數據可視化的情節是一個很好的解決方案。它可以用來在考拉以同樣的方式。
注意,考拉利用近似為更快的渲染。因此,結果可能會略有不同數量的數據時比plotting.max_rows大。
看下麵的例子,情節考拉DataFrame與DataFrame.plot.bar條形圖()。
> > >速度= [0.1,17.5,40,48,52,69年,88年]> > >壽命= [2,8,70年,1.5,25,12,28]> > >指數= [“蝸牛”,“豬”,“大象”,…“兔子”,“長頸鹿”,“狼”,“馬”]> > >kdf = ks.DataFrame ({“速度”:速度,…“壽命”:壽命},指數=指數)> > >kdf.plot.bar ()
此外,單杠情節支持DataFrame.plot.barh ()
> > >kdf.plot.barh ()
做一個餅圖使用DataFrame.plot.pie ()。
> > >kdf = ks.DataFrame ({“質量”:【0.330,4.87,5.97),…“半徑”:【2439.7,6051.8,6378.1)},…指數= [“水星”,“金星”,“地球”])> > >kdf.plot.pie (y =“質量”)
最佳實踐:對於條形圖、餅圖,隻顯示top-n-rows呈現更有效率,這可以通過設置選項plotting.max_rows。
使情節堆放區域使用DataFrame.plot.area ()。
> > >kdf = ks.DataFrame ({…“銷售”:【3,2,3,9,10,6,3),…“注冊”:【5,5,6,12,14,13,9),…“訪問”:【20.,42,28,62年,81年,50,90年),…},指數= pd.date_range(開始=“2019/08/15”結束=“2020/03/09”,…頻率=“米”))> > >kdf.plot.area ()
使線圖表使用DataFrame.plot.line ()。
> > >kdf = ks.DataFrame ({“豬”:【20.,18,489年,675年,1776年),…“馬”:【4,25,281年,600年,1900年)},…指數= [1990年,1997年,2003年,2009年,2014年])> > >kdf.plot.line ()
最佳實踐:區域和情節,將繪製的數據比例由plotting.sample_ratio可以設置。默認是1000,或plotting.max_rows一樣。看到可用選項獲取詳細信息。
做一個直方圖使用DataFrame.plot.hist ()
> > >kdf = pd.DataFrame (…np.random.randint (1,7,6000年),…列= [“一個”])> > >kdf [“兩個”]= kdf [“一個”)+ np.random.randint (1,7,6000年)> > >kdf = ks.from_pandas (kdf)> > >kdf.plot.hist(垃圾箱=12α=0.5)
做散點圖使用DataFrame.plot.scatter ()
> > >kdf = ks.DataFrame ([[5.1,3.5,0]、[4.9,3.0,0]、[7.0,3.2,1),…(6.4,3.2,1]、[5.9,3.0,2]],…列= [“長度”,“寬度”,“物種”])> > >kdf.plot.scatter (x =“長度”y =“寬度”c =“物種”colormap =“冬青”)
缺失的功能和考拉的工作區
使用考拉時,有幾件事要注意的問題。首先,並不是所有的大熊貓在考拉api目前可用。目前,關於在考拉~ 70%的大熊貓api可用。此外,還有考拉和熊貓之間微妙的行為差異,即使應用相同的api。由於不同,它不會意義實現特定的熊貓考拉的api。本節討論常見的解決方法。
通過轉換使用熊貓api
當處理丟失的熊貓考拉api,一個常見的解決方案是將考拉DataFrames熊貓或PySpark DataFrames,然後應用熊貓或PySpark api。考拉DataFrames之間的轉換和熊貓/ PySpark DataFrames很簡單:DataFrame.to_pandas()和koalas.from_pandas轉換從熊貓/ ();DataFrame.to_spark()和DataFrame.to_koalas轉換從PySpark / ()。然而,如果考拉DataFrame太大了,適合在一個單獨的機器,轉換成熊貓會導致一個內存不足的錯誤。
以下代碼片段顯示了一個簡單的使用DataFrame.to_pandas ()。
> > >kidx = kdf.index> > >kidx.to_list ()
…PandasNotImplementedError:方法pd.Index.to_list ()”是不實現的。如果你想收集你的數據作為一個NumPy數組,使用“to_numpy ()”代替。
最佳實踐:提出了PandasNotImplementedError Index.to_list ()。考拉不支持這個,因為它需要收集所有數據到客戶端(驅動節點)。把一個簡單的解決方法是使用to_pandas熊貓()。
> > >.to_list kidx.to_pandas () ()(0,1,2,3,4]
本機支持大熊貓對象
考拉也為大熊貓提供本機支持對象。熊貓考拉可以直接利用對象如下。
> > >kdf = ks.DataFrame ({“一個”:1。,…“B”:pd.Timestamp (“20130102”),…“C”:pd.Series (1指數=列表(範圍(4)),dtype =“float32”),…' D ':np.array ([3)*4dtype =“int32”),…“F”:“foo”})> > >kdfB C D F01.02013年01-021.03噴火11.02013年01-021.03噴火21.02013年01-021.03噴火31.02013年01-021.03噴火
ks.Timestamp()還沒有實現,ks.Series()不能用於考拉DataFrame的創建。在這些情況下,大熊貓本地對象pd.Timestamp()和pd.Series()可以使用。
發布一個熊貓考拉的函數
此外,考拉提供Koalas-specific api,如DataFrame.map_in_pandas(),本機支持分配給定的熊貓考拉的函數。
> > >我= pd.date_range (“2018-04-09”時間=2000年頻率=“1 d1min”)> > >ts = ks.DataFrame ({“一個”:【“時間戳”]},指數=我)> > >ts.between_time (“0:15”,“0:16”)
…PandasNotImplementedError:方法pd.DataFrame.between_time ()”是不然而實現。
DataFrame.between_time()尚未實現的考拉。如下所示,將一個簡單的解決方法是熊貓DataFrame使用to_pandas(),然後應用功能。
> >>ts.to_pandas () .between_time (“0:15”,“0:16”)一個2018年-04年-24年00:15:00時間戳2018年-04年-25年00:16:00時間戳2022年-04年-04年00:15:00時間戳2022年-04年-05年00:16:00時間戳
然而,DataFrame.map_in_pandas()是一個更好的替代解決方案,因為它不需要移動數據到一個客戶機節點,並可能導致內存不足的錯誤。
> >>ts.map_in_pandas(函數=λpdf: pdf.between_time (“0:15”,“0:16”))一個2022年-04年-04年00:15:00時間戳2022年-04年-05年00:16:00時間戳2018年-04年-24年00:15:00時間戳2018年-04年-25年00:16:00時間戳
最佳實踐:這樣,DataFrame.between_time(),它是一個熊貓的函數,可以在一個分布式考拉DataFrame因為DataFrame.map_in_pandas跨多個節點()執行給定的函數。看到DataFrame.map_in_pandas ()。
使用SQL在考拉
考拉支持標準SQL語法ks.sql()它允許執行火花SQL查詢並返回結果作為考拉DataFrame。
> > >kdf = ks.DataFrame ({“年”:【1990年,1997年,2003年,2009年,2014年),…“豬”:【20.,18,489年,675年,1776年),…“馬”:【4,25,281年,600年,1900年]})> > >ks.sql (“SELECT *從{kdf}豬> 100”)今年豬馬01990年20.411997年182522003年489年281年32009年675年600年42014年1776年1900年
同時,混合考拉DataFrame和熊貓DataFrame支持連接操作。
> > >pdf = pd.DataFrame ({“年”:【1990年,1997年,2003年,2009年,2014年),…“羊”:【22,50,121年,445年,791年),…“雞”:【250年,326年,589年,1241年,2118年]})> > >ks.sql (“‘…選擇ks。豬,pd.chicken…從{kdf} ks內連接{pdf} pd…ks。年= pd.year…ORDER BY ks。豬,pd.chicken”)豬雞018326年120.250年2489年589年3675年1241年41776年2118年
使用PySpark
你也可以申請幾個PySpark考拉DataFrames api。PySpark背景在考拉工作時可以讓你更有效率。如果你知道PySpark,您可以使用PySpark api作為工作區當pandas-equivalent api不是考拉。如果你覺得舒服PySpark,您可以使用許多豐富的特性,如火花UI,曆史服務器,等等。
轉換和PySpark DataFrame
一個考拉DataFrame可以很容易地轉換為PySpark DataFrame使用DataFrame.to_spark(),類似於DataFrame.to_pandas ()。另一方麵,一個PySpark DataFrame可以很容易地轉換為一個考拉DataFrame使用DataFrame.to_koalas(),它擴展了火花DataFrame類。
> > >kdf = ks.DataFrame ({“一個”:【1,2,3,4,5),“B”:【10,20.,30.,40,50]})> > >自衛隊= kdf.to_spark ()> > >類型(sdf)pyspark.sql.dataframe.DataFrame> > >sdf.show ()+ - - - + - - - +| | | B+ - - - + - - - +|1|10||2|20.||3|30.||4|40||5|50|+ - - - + - - - +
注意,從PySpark考拉會導致內存不足錯誤當默認的索引類型序列。默認索引類型可以通過設置compute.default_index_type(默認=序列)。如果默認索引序列必須是在一個大數據集,應該使用distributed-sequence。
> > >從databricks.koalas進口option_context> > >與option_context (…“compute.default_index_type”,“distributed-sequence”):…kdf = sdf.to_koalas ()> > >類型(kdf)databricks.koalas.frame.DataFrame> > >kdf一個B34401220.2330.45500110
最佳實踐:轉換從一個PySpark DataFrame考拉DataFrame可以有一些開銷,因為它需要創建一個新的默認索引內部——PySpark DataFrames沒有指標。你可以避免這種開銷通過指定列可以作為索引列。看到默認索引類型更多的細節。
> > >sdf.to_koalas (index_col =“一個”)B一個110220.330.440550
檢查火花的執行計劃
DataFrame.explain()是一個有用的PySpark API和考拉也可以。它可以顯示引發實際執行前執行計劃。它能幫助你理解和預測實際執行,避免關鍵性能下降。
從databricks.koalas進口option_context與option_context (“compute.ops_on_diff_frames”,真正的,“compute.default_index_type”,“分布式”):df = ks。範圍(10)+ ks。範圍(10)df.explain ()
上麵的命令僅僅添加了兩個DataFrames相同的值。結果如下所示。
= = = =物理計劃* (5)項目[…]+ - SortMergeJoin […),FullOuter:- * (2)[…],假,0:+ -交換hashpartitioning (…), [id =#):+ - * (1)項目[…]:+ - * (1)範圍(0,10一步=1分裂=12)+ - * (4)[…],假,0+ - ReusedExchange […),Exchange hashpartitioning(...), [id=#)
所示的物理計劃,執行將是相當昂貴的,因為它將執行排序合並連接DataFrames結合起來。改善執行性能,您可以重用相同的DataFrame避免合並。看到物理計劃引發SQL來了解更多信息。
與option_context (“compute.ops_on_diff_frames”,假,“compute.default_index_type”,“分布式”):df = ks。範圍(10)df = df + dfdf.explain ()
現在使用相同的DataFrame操作和避免結合不同DataFrames和觸發排序合並連接,由compute.ops_on_diff_frames啟用。
==物理計劃==*(1)項目[…]+- - - - - -*(1)項目[…]+- - - - - -*(1)範圍(0,10,一步=1,分裂=12)
這個操作是比上一個便宜而產生相同的輸出。檢查DataFrame.explain()來幫助提高代碼效率。
緩存DataFrame
DataFrame.cache()是一個有用的PySpark API和有考拉。它用於緩存的輸出一個考拉操作,這樣就不需要重新計算在隨後的執行。這將顯著提高執行速度輸出時需要多次訪問。
與option_context (“compute.default_index_type”,“分布式”):df = ks.range (10)new_df = (df + df) .cache () #“(df + df)”這裏是緩存作為“df”new_df.explain ()
如下物理計劃顯示,new_df將緩存一旦執行。
==物理計劃==*(1)InMemoryTableScan […]+- - - - - -InMemoryRelation […),StorageLevel (…)+- - - - - -*(1)項目[…]+- - - - - -*(1)項目[…]+- - - - - -*(1)項目[…]+- - - - - -*(1)範圍(0,10,一步=1,分裂=12)
InMemoryTableScan和InMemoryRelation意味著new_df將緩存,它不需要執行相同的操作(df + df)時執行下一個時間。
可以不緩存緩存DataFrame DataFrame.unpersist ()。
new_df.unpersist ()
最佳實踐:可以使用緩存DataFrame上下文管理器來確保對DataFrame緩存的範圍。它將被緩存和範圍內未回。
與(df + df) .cache ()作為df:df.explain ()
結論
這個博客中的示例演示如何輕鬆遷移你的熊貓考拉在處理大型數據集的代碼庫。考拉是建立在PySpark之上,提供了相同的API接口,熊貓。雖然熊貓和考拉之間有細微的差別,考拉提供額外Koalas-specific函數,可以簡化工作時在分布式環境中。最後,這個博客顯示在考拉時常見的解決方法和最佳實踐。熊貓用戶需要擴展,考拉很符合他們的需求。
開始使用考拉Apache火花
你可以開始嚐試在這個博客在這個例子筆記本,請訪問考拉的文檔和閱讀的例子,和貢獻考拉GitHub。同時,加入koalas-dev郵件討論和新發布公告列表。