客座博客:處女Hyperloop一減少處理時間與考拉從小時分鍾
2019年8月22日 在公司博客上
看點播網絡研討會了解更多:從熊貓考拉:減少對處女Time-to-Insights Hyperloop的數據
在維珍Hyperloop,我們讓Hyperloop現實工作,所以我們可以在航空公司運送乘客和貨物的速度但在空中旅行的成本的一小部分。為了建立一個商業上可行的係統,我們收集和分析大量不同數量的數據,包括Devloop試車跑道運行時,大量的測試平台,以及各種仿真、基礎設施和社會經濟數據。我們大多數的腳本處理數據是使用Python編寫的庫以熊貓為主要數據處理工具,粘在一起的一切。在這篇文章中,我們想要與你分享我們的經驗使用考拉擴展我們的數據分析,實現大規模加速效果與小代碼更改。
隨著我們繼續發展和構建新東西,我們的數據處理的需要。由於我們的數據操作的規模和複雜性增加,我們pandas-based Python腳本過於緩慢,以滿足我們的業務需求。這使我們火花,希望快速處理時間和靈活的數據存儲以及按需可伸縮性。然而,我們在“火花開關”——我們必須做出很多自定義的改變遷移pandas-based PySpark代碼庫。我們需要一個解決方案,不僅要快得多,但也最好不需要重寫代碼。這些挑戰使我們研究其他選項,我們很高興地發現,存在一種簡單的方法來跳過這乏味的一步:考拉包,最近由磚開源。
中描述的那樣考拉的自述,
考拉項目數據科學家與大數據交互時更有效率,通過實現熊貓DataFrameAPI的Apache火花。(…)立即生產與火花,沒有學習曲線,如果你已經熟悉的熊貓。都有一個代碼庫,與熊貓(測試,較小的數據集)和火花(分布式數據集)。
在本文中,我將試圖證明為什麼考拉(大部分)如此值得嚐試。通過改變不到1%的大熊貓,我們能夠運行代碼與考拉和火花。我們可以減少執行時間超過10倍,從幾個小時到幾分鍾,因為環境是可以水平伸縮,我們準備更多的數據。
快速啟動
考拉在安裝之前,確保你有火花用PySpark集群設置,可以使用它。然後,簡單地運行:
pip安裝考拉
或者,對於conda用戶:
考拉- c conda-forge conda安裝
指考拉的自述為更多的細節。
快速安裝後檢查:
進口databricks.koalas作為kskdf = ks.DataFrame ({“column1”:【4.0,8.0},{“column2”:【1.0,2.0]})kdf
如您所見,考拉可以渲染pandas-like互動表。多方便啊!
例子與基本操作
為了這篇文章中,我們產生了一些測試數據由4列和參數化的行數。
進口熊貓作為pd# #生成1 m行測試數據pdf = generate_pd_test_data (1 e6)pdf.head (3)> > >時間戳pod_id trip_id speed_mph07.522523pod_13 trip_679.340006122.029855pod_5 trip_2265.202122221.473178pod_20 trip_10669.901507
- 免責聲明:這是一個隨機生成的測試文件用於績效評估,Hyperloop的主題相關,但不代表我們的數據。完整的測試腳本用於本文可以在這裏找到:https://gist.github.com/patryk-oleniuk/043f97ae9c405cbd13b6977e7e6d6fbc。
我們想所有pod-trips評估一些關鍵的描述性分析,例如:每個pod-trip的旅行時間是什麼?
操作需要:
- 集團
(“pod_id”、“訪問id”)
- 對於每一個旅行,計算
trip_time
最後一個時間戳,第一個時間戳。 - 計算的分布pod-trip時報(意思是,stddev)
短&慢(熊貓)方式:
# 1(片段)
進口熊貓作為pd#分組。馬克斯(去年時間戳)和加入與分組。最小值(第一個時間戳)法國燃氣公司=pdf.groupby ([“pod_id”,“trip_id”]).agg ({“時間戳”:【“最小值”,“馬克斯”]})gdf.columns=(“timestamp_first”,“timestamp_last”]法國燃氣公司(“trip_time_sec”]=法國燃氣公司(“timestamp_last”]- - - - - -法國燃氣公司(“timestamp_first”]法國燃氣公司(“trip_time_hours”]=法國燃氣公司(“trip_time_sec”]/3600.0#計算統計數據在旅行時間pd_result=gdf.describe ()
長和快速(PySpark)的方法:
# 2(片段)
進口pyspark作為火花#進口熊貓df來火花(這條線是不使用為分析)自衛隊=spark.createDataFrame (pdf)#排序通過時間戳和groupby自衛隊=sdf.sort (desc(“時間戳”))自衛隊=自衛隊。groupBy (“pod_id”、“trip_id”) .agg (F。馬克斯(“時間戳”).alias (“timestamp_last”),F。最小值(“時間戳”).alias (“timestamp_first”))#添加另一個列trip_time_sec作為的區別之間的第一個和去年自衛隊=sdf.withColumn (“trip_time_sec”,sdf2 [“timestamp_last”]- - - - - -sdf2 [“timestamp_first”])自衛隊=sdf.withColumn (“trip_time_hours”,sdf3 [“trip_time_sec”]/3600.0)#計算統計數據在旅行時間sdf4.select (F.col (“timestamp_last”),F.col (“timestamp_first”),F.col (“trip_time_sec”),F.col (“trip_time_hours”.toPandas) .summary () ()
短和快(考拉):
# 3(片段)
進口databricks.koalas作為ks#進口熊貓df考拉(因此也引發)(這條線不用於分析)kdf = ks.from_pandas (pdf)#下麵的代碼是一樣的熊貓版本gdf = kdf.groupby ([“pod_id”,“trip_id”]).agg ({“時間戳”:【“最小值”,“馬克斯”]})gdf.columns=(“timestamp_first”,“timestamp_last”]法國燃氣公司(“trip_time_sec”]= gdf [“timestamp_last”]- gdf [“timestamp_first”]法國燃氣公司(“trip_time_hours”]= gdf [“trip_time_sec”)/3600.0.to_pandas ks_result = gdf.describe () ()
注意,# 1和# 3片段,代碼完全相同的,所以“火花開關”是無縫的。對於大多數大熊貓的腳本,你甚至可以嚐試改變進口熊貓磚。考拉是pd
,一些腳本都能良好的運行進行微調,與下麵的一些限製解釋。
結果
所有的片段都返回相同的pod-trip-times結果驗證。的描述
和總結
熊貓和火花的方法略有不同,解釋道在這裏但這應該不是太影響性能。
結果:樣本
先進的例子:udf和複雜的操作
我們現在要解決更複雜的問題dataframe相同,看看熊貓和考拉的實現是不同的。
目的:分析每個pod-trip平均速度:
- 集團
(“pod_id”、“訪問id”)
- 每pod-trip計算總行駛距離找到麵積低於速度(時間)圖(方法解釋道在這裏):
- 分組df的排序
時間戳
列。 - 計算時間戳的差別。
- 繁殖速度的差別,這將導致距離的時間差異。
- 求和
distance_travelled
列——這將給我們每pod-trip總行駛距離。 - 計算
旅行的時間
作為timestamp.last
- - - - - -timestamp.first
(在前麵的段落)。 - 計算
average_speed
作為distance_travelled
/旅行的時間
。 - 計算的分布pod-trip *(意思是,stddev)。
我們決定使用一個定製的應用功能和實現這個任務UDF(用戶定義函數)。
熊貓:
(片段# 4)
進口熊貓作為pddefcalc_distance_from_speed(法國燃氣公司):gdf = gdf.sort_values (“時間戳”)法國燃氣公司(“time_diff”]= gdf [“時間戳”].diff ()返回pd.DataFrame ({“distance_miles”:[(gdf [“time_diff”]* gdf [“speed_mph”])。總和()),“travel_time_sec”:[gdf [“時間戳”].iloc [-1]- gdf [“時間戳”].iloc [0]]})結果= df.groupby ([“pod_id”,“trip_id”])。應用(calculate_distance_from_speed)結果(“distance_km”[]=結果“distance_miles”)*1.609結果(“avg_speed_mph”[]=結果“distance_miles”)/結果(“travel_time_sec”)/60.0結果(“avg_speed_kph”[]=結果“avg_speed_mph”)*1.609results.describe ()
PySpark道:
# 5(片段)
進口databricks.koalas作為ks從pyspark.sql.functions進口pandas_udf, PandasUDFType從pyspark.sql.types進口*進口pyspark.sql.functions作為F模式= StructType ([StructField (“pod_id”StringType ()),StructField (“trip_id”StringType ()),StructField (“distance_miles”,倍增式()),StructField (“travel_time_sec”倍增式())])@pandas_udf (模式,PandasUDFType.GROUPED_MAP)defcalculate_distance_from_speed(法國燃氣公司):gdf = gdf.sort_values (“時間戳”)打印(gdf)法國燃氣公司(“time_diff”]= gdf [“時間戳”].diff ()返回pd.DataFrame ({“pod_id”:[gdf [“pod_id”].iloc [0]],“trip_id”:[gdf [“trip_id”].iloc [0]],“distance_miles”:[(gdf [“time_diff”]* gdf [“speed_mph”])。總和()),“travel_time_sec”:[gdf [“時間戳”].iloc [-1]gdf [“時間戳”].iloc [0]]})自衛隊= spark_df.groupby (“pod_id”,“trip_id”蘋果(calculate_distance_from_speed)自衛隊= sdf.withColumn (“distance_km”F.col (“distance_miles”)*1.609)自衛隊= sdf.withColumn (“avg_speed_mph”F.col (“distance_miles”)/ F.col (“travel_time_sec”)/60.0)自衛隊= sdf.withColumn (“avg_speed_kph”F.col (“avg_speed_mph”)*1.609)自衛隊= sdf.orderBy (sdf.pod_id sdf.trip_id).toPandas sdf.summary () ()#彙總計算幾乎相同的結果描述
考拉的方式:
# 6(片段)
進口databricks.koalas作為ksdefcalc_distance_from_speed_ks(法國燃氣公司)- > ks.DataFrame [str,str,浮動,浮動]:gdf = gdf.sort_values (“時間戳”)法國燃氣公司(“meanspeed”]= (gdf [“時間戳”].diff () * gdf [“speed_mph”])。總和()法國燃氣公司(“triptime”]= (gdf [“時間戳”].iloc [-1]- gdf [“時間戳”].iloc [0])返回gdf [[“pod_id”,“trip_id”,“meanspeed”,“triptime”]].iloc [0:1]
kdf = ks.from_pandas (df)結果= kdf.groupby ([“pod_id”,“trip_id”])。應用(calculate_distance_from_speed_ks)#由於當前包的局限性,groupby.apply()返回c0 . .c3列名結果。列s = [“pod_id”,“trip_id”,“distance_miles”,“travel_time_sec”]#火花groupby不設置groupby關口作為索引,不排序結果= results.set_index ([“pod_id”,“trip_id”]).sort_index ()結果(“distance_km”[]=結果“distance_miles”)*1.609結果(“avg_speed_mph”[]=結果“distance_miles”)/結果(“travel_time_sec”)/60.0結果(“avg_speed_kph”[]=結果“avg_speed_mph”)*1.609results.describe ()
考拉的應用是基於PySpark的實現pandas_udf
這需要模式信息,這就是為什麼函數的定義還必須定義類型提示。包的作者介紹了新的自定義類型提示,ks.DataFrame
和ks.Series
。不幸的是,當前的實現應用
方法很麻煩,花了一點努力到達相同的結果(列名稱改變,groupby鍵返回)。然而,所有的行為是適當的解釋方案文檔。
性能
評估性能的考拉,我們描述為不同的行數的代碼片段。
分析實驗數據磚平台上完成,使用集群配置如下:Beplay体育安卓版本
- 火花司機節點(也用於執行熊貓腳本):8 CPU核心,61 gb的RAM。
15引發工人節點:4個cpu核心,30.5 gb的RAM(金額:60 cpu / 457.5 gb)。
每個實驗重複10次,下麵所示的剪輯指示執行的最小和最大時間。
基本運維
當數據很小,初始化操作和數據傳輸是巨大的計算相比,所以熊貓快得多(標記)。對於大量的數據,大熊貓處理時間超過分布式解決方案(b)標誌。我們可以觀察一些性能擊中考拉,但接近PySpark隨著數據的增加(c)標誌。
udf
UDF剖析,PySpark和考拉文檔中指定,性能大幅降低。這就是為什麼我們需要減少我們測試的行數由100 x vs的基本運維情況。為每個測試用例,考拉和PySpark顯示性能驚人的相似,顯示一致的底層實現。在實驗中,我們發現存在一個更快的方式執行組操作使用PySpark windows功能,但是這不是當前實現考拉隻所以我們決定將UDF的版本進行比較。
討論
考拉似乎是正確的選擇,如果你想讓你的熊貓立即可伸縮和可執行代碼來處理更大的數據集上不可能在單個節點上。快速交換考拉後,通過伸縮火花的集群,您可以允許更大的數據集,大大提高了處理時間。你的表現應該是類似(但5低50%,這取決於數據集規模和集群)PySpark。
另一方麵,考拉API層並導致明顯的性能影響,尤其是相比,本機火花。在一天結束的時候,如果計算性能是你的當務之急,您應該考慮從Python切換到Scala。
限製和差異
與考拉在你前幾小時,你可能會想,“這是為什麼沒有實現? !”Currently, the package is still under development and is missing some pandas API functionality, but much of it should be implemented in the next few months (for examplegroupby.diff ()
或kdf.rename ()
)。
還從我的經驗作為一個貢獻者項目,要麼是太複雜的一些特性來實現火花API還是跳過由於顯著的性能。例如,DataFrame.values
需要實現,整個工作集中在單個節點的內存,所以是次優的,有時甚至不可能。相反如果你需要檢索一些最終結果的司機,你可以調用DataFrame.to_pandas ()
或DataFrame.to_numpy ()
。
另一個更重要的是,考拉“執行鏈不同於熊貓”:對dataframe執行操作時,但放在隊列的操作不執行。隻有當結果是必須的,例如當調用kdf.head ()
或kdf.to_pandas ()
的操作將被執行。可能會誤導那些從來沒有接觸過火花,因為熊貓並逐行一切。
結論
考拉幫助我們減少負擔”Spark-ify“我們的熊貓代碼。如果你還在掙紮伸縮熊貓的代碼,你也應該試一試!如果你是極度缺失與熊貓發現的任何行為或不一致,請打開一個問題作為一個社區,我們可以確保包是積極和持續改進。同時,隨時做出貢獻!