跳轉到主要內容
工程的博客

基準:考拉(PySpark)和Dask

2021年4月7日 工程的博客

分享這篇文章

考拉熊貓是一種數據科學圖書館實現api的Apache火花所以數據科學家可以使用自己喜歡的api在各種規模的數據集。這篇文章比較的性能Dask的實現大熊貓PySpark API和考拉。使用一個可重複的基準測試中,我們發現考拉是4倍的速度比Dask在單個節點上,8 x集群上,在某些情況下,多達25 x

首先,我們穿過基準方法,環境和我們的測試結果。然後,我們討論為什麼考拉/火花顯著高於Dask深入火花的優化的SQL引擎,使用先進的技術,如代碼生成和查詢優化。

方法

基準是對2009 - 2013年執行的黃色出租車旅行記錄(157 GB)紐約的出租車和豪華轎車委員會(TLC)旅行記錄數據。我們從熊貓工作量確定通用操作等基本的統計計算,連接,過濾和分組數據集。

本地和分布式執行也考慮為了涵蓋單節點情況下和集群計算情況下全麵。測量的操作/沒有過濾操作和緩存來考慮各種實際工作負載。

因此,我們執行的基準尺寸如下:

  • 標準操作(本地和分布式執行)
  • 操作與過濾(本地和分布式執行)
  • 帶過濾和緩存的操作(本地和分布式執行)

數據集

黃色出租車旅行記錄數據集包含CSV文件,其中包括17列數字和文本類型。字段包括上車和下車日期/時間、上車和下車地點,旅行的距離,分項價格、利率類型、付款類型和driver-reported乘客計數。CSV文件被下載到磚文件係統(DBFS),然後被轉換成拚花文件通過考拉獲得更好的效率。

操作

我們分析了多個現有大熊貓的工作負載和識別常見的幾種模式操作。下麵是一些派生操作的偽代碼。

def操作(df):#複雜算術np。罪……np。因為……np.arctan2#計數len(df)#計算指數len(df.index)# groupby統計df.groupby (=“series_c”).agg (…(“的意思是”,“性病”]…)#加入合並(df2 df1)#連接數len(合並(df2 df1))#的意思df.series_a.mean ()#意味著複雜的運算(np。罪……np。因為……np.arctan2) .mean ()#係列之外的意思(df。series_a + df.series_b) .mean ()#係列的乘法(df。series_a * df.series_b) .mean ()#讀文件read_parquet (…)#係列之外df。series_a + df.series_b#係列乘法df。series_a * df.series_b#標準推導df.series_a.std ()#價值計算df.series_a.value_counts ()

被處決的操作有/沒有過濾和緩存分別考慮懶惰的影響評估、緩存和優化相關的在這兩個係統,如下所示。

  • 標準操作
    操作(df)
  • 操作與過濾
    #計算過濾懶洋洋地一起操作。操作(df ((df。tip_amt > =1)& (df.tip_amt
  • 過濾操作發現之間的記錄,收到小費1 - 5美元,它過濾36%的原始數據。
  • 操作與過濾和緩存
    #考拉df = df [(df。tip_amt > =1)& (df.tip_amt
                    <前># Daskdf = df [(df。tip_amt > =1)& (df.tip_amt
  • 緩存啟用時,完全緩存之前測量的數據操作。

在整個代碼中使用這個基準,請參閱筆記本包含在這個博客的底部。

環境

基準進行單個節點本地執行,以及分布式的集群3工人節點執行。設置環境輕鬆,我們使用磚7.6運行時(Apache火花3.0.1)和磚筆記本。

係統環境

  • 操作係統:Ubuntu 18.04.5 LTS
  • Java:祖魯8.50.0.51-CA-linux64(構建1.8.0_275-b01)
  • Scala:2.12.10
  • Python:3.7.5

Python庫

  • 熊貓:1.1.5
  • PyArrow:1.0.1
  • NumPy:1.19.5
  • 考拉:1.7.0
  • Dask:2021.03.0

本地執行

為當地執行,我們使用一個單一的i3.16xlargeVM的AWS有488 GB的內存和64核25千兆以太網。

機為當地執行規範

分布式執行

分布式執行3工人節點使用i3.4xlarge虛擬機內存容量122 GB和16個核心()10千兆以太網。這種集群的總內存單節點配置。

機規範分布式執行

結果

下麵的基準測試結果包括概述以幾何的方式來解釋的一般性能差異考拉Dask,每一欄顯示了運行時間的比率Dask和考拉(Dask /考拉)之間。因為考拉api編寫PySpark之上,這個基準測試的結果同樣適用於PySpark。

標準操作

在當地執行,考拉是平均1.2倍速度比Dask:

  • 考拉,加入計數(連接數)快17.6倍。
  • 在Dask,計算標準偏差為3.7 x更快。

在分布式執行,考拉是平均2.1倍速度比Dask:

  • 在考拉,計數指數操作快25倍。
  • 在Dask,意味著複雜的算術運算快1.8倍。

操作與過濾

在當地執行,考拉是平均6.4倍速度比Dask在所有情況下:

  • 考拉的計數操作快11.1倍。
  • 複雜的算術運算有最小的考拉快2.7倍的差距。

在分布式執行,考拉是平均9.2倍速度比Dask在所有情況下:

  • 在考拉,計數指數操作快16.7倍。
  • 複雜的算術運算有最小的考拉快3.5倍的差距。

操作與過濾和緩存

在當地執行,考拉是平均1.4倍速度比Dask:

  • 考拉,加入計數(連接數)快5.9倍。
  • 在Dask,Series.value_counts(值計數)快3.6倍。

在分布式執行,考拉是平均5.2倍速度比Dask在所有情況下:

  • 在考拉,計數指數操作快28.6倍。
  • 複雜的算術運算有最小的考拉快1.7倍的差距。

分析

考拉(PySpark)速度大大快於Dask在大多數情況下。原因似乎是直截了當的考拉和PySpark都基於火花,最快的一個分布式計算引擎。引發全優化SQL引擎(SQL)火花爭霸查詢計劃的優化和代碼生成。作為一個粗略的比較,火花SQL幾乎與1600 +一百萬行代碼貢獻者在11年,而Dask的代碼基火花大約10%的400 +貢獻者6年左右。

為了確定哪些因素導致了考拉的性能的許多優化技術在火花SQL,我們分析了這些操作以分布式方式執行過濾當考拉最優於Dask:

  • 統計計算
  • 連接

我們挖到這些操作的執行和計劃優化方麵,能夠識別的兩個最重要的因素:在火花SQL代碼生成和查詢計劃優化。

代碼生成

最重要的一個執行優化在火花SQL代碼生成。火花引擎在運行時為每個查詢生成優化字節碼,大大提高了性能。這種優化大大受影響的統計計算和連接在考拉的基準避免虛擬功能分派,等等,請先閱讀代碼生成介紹博客要學習更多的知識。

例如,相同的基準代碼意味著計算需要大約8.37秒和連接數與代碼生成殘疾人需要約27.5秒磚生產環境。後啟用代碼生成(默認),計算平均需要1.26秒左右,加入計數需要2.27秒。這是一個提高650%和1200%,分別。

代碼生成的性能差異

代碼生成的性能差異

查詢計劃的優化

火花SQL優化器有一個複雜的查詢計劃:催化劑在執行動態優化查詢計劃(自適應查詢執行)。考拉的統計計算和加入過濾、催化劑優化器還極大地提高了性能。

當考拉計算均值沒有利用催化劑查詢優化,原始在火花SQL執行計劃大致如下。它使用蠻力閱讀所有列,然後執行多次投影與中間的過濾之前計算的意思。

聚合(avg(fare_amt)]+- - - - - -項目(fare_amt)+- - - - - -項目(vendor_name, fare_amt, tip_amt,…)+- - - - - -過濾器tip_amt> =1tip_amt
              相當低效的,因為它需要閱讀更多的數據,支出更多時間/O多次執行相同的預測。另一方麵,下麵的計劃優化有效地執行通過催化劑優化器:
              <精準醫療>聚合(avg(fare_amt)]+- - - - - -項目(fare_amt)+- - - - - -關係(fare_amt tip_amt], tip_amt> =1tip_amt
              這個計劃就會簡單得多。現在隻有讀取所需的列計算(修剪),過濾數據- - - - - -水平,節省了內存使用量(過濾器疊加)。作為加入操作數(加入通過PySpark計數)、考拉,創建一個原始執行計劃火花SQL作為下圖:
              <精準醫療>聚合(())+- - - - - -項目(tip_amt,……)+- - - - - -加入:- - - - - -項目(tip_amt,……):+- - - - - -過濾器tip_amt> =1tip_amt
              它也有同樣的問題作為所示平均計算。它不必要的讀取項目數據多次。一個區別數據將被打亂交換執行加入操作,這通常會導致大量網絡I/O負麵的性能影響。催化劑優化器能力刪除洗牌數據一個一邊加入小得多,<強大的>BroadcastHashJoin</強大的>你看下圖:
              <精準醫療>聚合(())+- - - - - -項目+- - - - - -<強大的>BroadcastHashJoin</強大的>:- - - - - -項目[]:+- - - - - -過濾器tip_amt> =1tip_amt BroadcastExchange+- - - - - -項目[]+- - - - - -關係[]

它不僅適用於列修剪和過濾疊加,也消除了洗牌一步廣播DataFrame越小。在內部,它將小DataFrame發送給每個執行者,執行沒有交換數據的連接。這消除了不必要的混亂,極大地提高了性能。

結論

測試的結果表明,考拉(PySpark)明顯優於Dask大多數用例,最大的因素是引發SQL的執行引擎與許多先進的優化技術。

考拉的本地和分布式執行確認操作的速度遠遠超過Dask如下所示:

  • 當地執行:2.1 x(幾何平均數)和4(簡單平均)
  • 分布式執行:4.6 x(幾何平均數)和7.9(簡單平均)

其次,緩存的性能影響Dask和考拉,它大大降低了運行時間。

最後,所示的性能差距最大的是統計計算和連接的分布式執行過濾的考拉(PySpark)快9.2倍確定情況下幾何平均數。

我們包括完整的獨立的筆記本,數據集和操作,所有設置和基準代碼透明。請參考下麵的筆記本:

免費試著磚

相關的帖子

看到所有工程的博客的帖子
Baidu
map