考拉之間的互操作性和Apache火花

PySpark用戶如何有效地使用考拉

考拉是一個開源項目,為大熊貓提供的替代,使有效的擴展了數百名工人節點為日常數據科學和機器學習。經過一年多的發展,因為它是首先介紹了去年,考拉1.0發布

熊貓是一個Python包中常用的數據科學家,但它不規模大數據。當他們的數據變得大,他們必須選擇和學習另一個係統,如Apache火花從一開始為了采用和轉換現有的工作負載。熊貓考拉填補這些差距通過提供等價的api, Apache火花。介紹了很多在前麵博客,也包括使用考拉時的最佳實踐。

熊貓考拉不僅有助於用戶還PySpark用戶因為考拉支持許多功能與PySpark困難。例如,引發用戶可以直接從他們的陰謀數據PySpark DataFrame通過考拉繪圖api,類似於熊貓。PySpark DataFrame更SQL兼容和考拉DataFrame接近Python本身提供更多的直覺與Python在某些情況下。在考拉的文檔,有各種熊貓等效api實現的。

在這篇文章中,我們將重點放在PySpark用戶可以利用他們的知識和本機之間的交互PySpark和考拉更快地編寫代碼。我們包括許多獨立的例子,您可以運行,如果你有火花考拉安裝,或者你正在使用磚運行時。從磚7.1運行時,考拉是打包在一起,這樣你就可以運行它沒有手動安裝。

考拉和PySpark DataFrames

深潛水之前,讓我們看看考拉之間的差異和PySpark DataFrames第一。

外部,它們是不同的。考拉DataFrames無縫地遵循的結構熊貓DataFrames並實現索引/標識符。的PySpark DataFrame,另一方麵,往往更符合關係/表在關係數據庫中,並沒有獨特的行標識符。

在內部,考拉DataFrames是建立在PySpark DataFrames。考拉將熊貓api轉換為邏輯計劃引發的SQL。計劃是優化和執行複雜的SQL引擎和健壯的火花引發社會的不斷改進。考拉也遵循火花讓懶惰的評價語義最大化性能。實現大熊貓DataFrame結構和大熊貓豐富的api,需要一個隱式排序,考拉DataFrames有代表pandas-equivalent指標的內部元數據映射到列和列標簽PySpark DataFrame。

盡管考拉利用PySpark執行引擎,您可能仍然麵臨輕微PySpark相比,性能下降。作為討論的處女Hyperloop遷移經驗通常,主要原因是:

  • 使用默認的索引。構建默認索引的開銷取決於數據大小,集群組成,等。因此,它總是傾向於避免使用默認的索引。它將討論更多關於這下麵的其他部分。
  • 一些api PySpark和熊貓有相同的名字,但不同的語義。例如,考拉DataFrame和PySpark DataFrame計數API。前計數non-NA /空條目的數量為每個列/行,後者計算檢索的行數,包括行包含null。

> > > ks。DataFrame ({a: [1, 2, 3] b: (4、5、6)}) .count () 3 b 3 > > >火花。createDataFrame (…[[1,4],[2、5],[3、6]],模式= [“a”、“b”]) .count () 3

轉換和PySpark DataFrames

PySpark用戶,很高興知道你可以輕易地之間來回一個考拉DataFrame PySpark DataFrame和引擎蓋下麵發生了什麼,這樣你不需要害怕進入世界應用高度可伸縮的熊貓考拉api火花。

to_koalas ()

導入考拉的包時,它會自動把to_koalas()方法來PySpark DataFrames。您可以簡單地使用這種方法將PySpark DataFrames考拉DataFrames。

假設你有一個PySpark DataFrame:

> > >自衛隊=火花。createDataFrame (((1、10.0 ' '), (2, 20.0,“b”), (3, 30.0,“c”)],模式= [' x ', ' y ', ' z ']) > > > sdf.show () + - - - + - - - + - - - + | x y z | | | + - - - + - - - + - - - + | 1 | 10.0 | | | 2 | 20.0 | b | | 3 | 30.0 | c | + - - - + - - - + - - - +

首先,導入考拉包。您通常使用ks作為包的一個別名。

> > >進口磚。考拉是ks

火花DataFrame轉換為一個考拉DataFrame to_koalas()方法如上所述。

> > > kdf = sdf.to_koalas () > > > kdf 10.0 x y z 0 1 1 2 20.0 b 2 3 30.0攝氏度

kdf從PySpark創建的是一個考拉DataFrame DataFrame。懶洋洋地執行計算實際需要的數據時,例如顯示或存儲數據,計算PySpark一樣。

to_spark ()

接下來,您還應該知道如何回到PySpark DataFrame考拉。您可以使用to_spark考拉DataFrame()方法。

> > > sdf_from_kdf = kdf.to_spark () > > > sdf_from_kdf.show () + - - - + - - - + - - - + x y | | z | | + - - - + - - - + - - - + | 1 | 10.0 | | | 2 | 20.0 | b | | 3 | 30.0 | c | + - - - + - - - + - - - +

現在你有一個PySpark DataFrame。注意,不再有考拉DataFrame包含索引列。處理以下指標的最佳實踐將在稍後討論。

指數和index_col

如上所示,考拉在內部管理幾列作“指數”列,以代表大熊貓的指數。“指數”列是用於訪問行loc / iloc索引器或用於sort_index()方法不指定排序鍵列,甚至用於匹配相應的操作結合超過兩行DataFrames或係列,例如df1 + df2,等等。

如果已經有這樣的列PySpark DataFrame,您可以使用index_col參數指定索引列。

> > > kdf_with_index_col = sdf.to_koalas (index_col =“x”) #或index_col = [x] > > > kdf_with_index_col y z x 1 10.0 2 20.0 b 3 30.0攝氏度

這一次列x不視為常規列但該指數之一。

如果你有多個列作為索引,你可以通過列名稱的列表。

> > >自衛隊。to_koalas (index_col = [' x ', ' y ']) z 10.0 x y 1 2 20.0 b 3 30.0攝氏度

當回到PySpark DataFrame,您還使用index_col參數保存索引列。

> > > kdf_with_index_col.to_spark (index_col =“指數”),告訴()#或index_col =(“指數”)+——+ - - - + - - - + | z指數y | | | +——+ - - - + - - - + | 1 | 10.0 | | | 2 | 20.0 | b | | 3 | 30.0 | c | +——+ - - - + - - - +

否則,該指數如下損失。

> > > kdf_with_index_col.to_spark(),告訴()+ - - - + - - - + y z | | | + - - - + - - - + | 10.0 | | | 20.0 | b | | 30.0 | c | +——+ - - - +

列名的數量應與索引列的數量。

> > > kdf。to_spark (index_col = [' index1 ', ' index2 ']),告訴()回溯(最近的電話最後):……ValueError:索引列的長度是1;然而,鑒於“index_col”的長度是2。

默認索引

如您所見,如果你不指定index_col參數,創建一個新的列索引。

> > > sdf.to_koalas () 10.0 x y z 0 1 1 2 20.0 b 2 3 30.0攝氏度

列是從哪裏來的?

答案是“違約指數”。如果沒有指定index_col參數,考拉自動高度DataFrame一列索引。有三種類型的默認指標:“序列”,“distributed-sequence”和“分布式”。每一種都有其獨特的特點和局限性,如性能損失。減少性能開銷,強烈鼓勵通過index_col當從指定索引列PySpark DataFrame。

默認索引時也使用考拉不知道哪個列用於索引。例如,reset_index()沒有任何參數,試圖將所有常規的索引數據列和重新創建索引:

> > > kdf_with_index_col.reset_index () 10.0 x y z 0 1 1 2 20.0 b 2 3 30.0攝氏度

您可以更改默認的索引類型通過設置它作為一個考拉選項“compute.default_index_type”:

ks.set_option (‘compute.default_index_type’,‘序列’)

ks.options.compute.default_index_type =“序列”

序列類型

目前使用的“序列”類型默認情況下在考拉因為它保證了指數不斷增加,像熊貓。然而,它使用一個非分區窗口函數內部,這意味著所有的數據需要收集到一個單獨的節點。如果該節點沒有足夠的內存,性能將顯著退化,或會出現OutOfMemoryError。

> > > ks.set_option (‘compute.default_index_type’,‘序列’)> > > spark.range (5) .to_koalas () id 0 0 1 1 2 2 3 3 4 4

distributed-sequence類型

當使用“distributed-sequence”指數,性能損失明顯不如“序列”類型。計算並生成索引以分布式的方式,但需要另一個額外的火花產生全球序列內部工作。這也並不能保證結果的自然秩序。一般來說,它變成了一個不斷增加。

> > > ks.set_option (“compute.default_index_type”、“distributed-sequence”) > > > spark.range (5) .to_koalas () id 3 3 1 1 2 2 4 4 0 0

分布類型

“分布式”指數幾乎沒有性能損失,總是產生單調遞增的數字。如果該指數隻是需要獨特的數字對於每一行,或者訂單的行,這個索引類型將是最好的選擇。然而,數字有一個非命定論的差距。這意味著該指數類型將不太可能被用作索引操作結合兩個多DataFrames或係列。

> > > ks.set_option (“compute.default_index_type”、“分布式”)> > > spark.range (5) .to_koalas () id 17179869184 0 17179869184 1 17179869184 2 77309411328 3 77309411328 4

比較

如您所見,每個索引類型都有其獨特的特征總結如表所示。選擇默認的索引類型應該仔細考慮你的工作負載。

分布式計算 Map-side操作 持續的增加 性能
序列 不,在一個工作節點 不,需要洗牌 是的 不利於大型數據集
distributed-sequence 是的 是的,但是需要另一個火花工作 是的,在大多數情況下 足夠好
分布式 是的 是的 沒有

另請參閱考拉文檔中默認的索引類型

使用火花I / O

有很多大熊貓的函數來讀取和寫入數據和考拉。

這是熊貓的函數的列表,樹袋熊使用火花I / O。

相對應的api和它們的參數按照api熊貓。然而,在當前行為的細微差別。例如,大熊貓read_csv可以讀取一個文件通過http協議,但考拉仍然不支持自底層引發引擎本身不支持它。

這些考拉函數也有index_col參數來指定應該使用哪些列索引或索引列名稱應該是什麼,類似於to_koalas()或to_spark()函數如上所述。如果沒有指定,默認索引連接或索引列是迷路了。

例如,如果你不指定index_col參數,默認索引連接如下——分布式默認索引用於簡單。

> > > kdf.to_csv(/路徑/ / test.csv) > > > kdf_read_csv = ks.read_csv(/路徑/ / test.csv) > > > kdf_read_csv x y z 0 2 20.0 b 8589934592 3 30.0 c 8589934592 1 10.0

而如果你指定index_col參數,指定的列成了一個索引。

> > > kdf.to_csv(' /道路/ /測試。csv, index_col =“指數”)> > > kdf_read_csv_with_index_col = ks.read_csv(路徑“/ / /測試。csv”, index_col =“指數”)> > > kdf_read_csv_with_index_col 30.0 x y z指數2 3 c 10.0 1 2 20.0 0 1

此外,每個函數關鍵字參數設置選項DataFrameWriter和DataFrameReader火花。給定的鍵直接傳遞給他們的選擇和配置行為。這是非常有用的,當pandas-origin參數不足以操縱數據但PySpark支持缺失的功能。

> > > # nullValue是選擇特定的火花的CSV I / O。> > > ks.read_csv(' /道路/ /測試。csv, index_col =“指數”,nullValue = ' b ') x 30.0 y z指數2 3 c 10.0 20.0 1 2 0 1沒有

考拉特定的I / O功能

除了上述功能從熊貓,考拉有它自己的功能。

首先,DataFrame。to_table和ks。read_table是讀寫火花表隻需指定表名。這是類似於DataFrameWriter。saveAsTable DataFrameReader。分別表火花。

其次,DataFrame。to_spark_io和ks。read_spark_io一般引發I / O。有幾個可選參數使用的方便性,和其他關鍵字參數。你可以自由設置選項用於DataFrameWriter。保存並DataFrameReader。負載的火花。

> > > #“壓縮”是引發特定的選項。> > > kdf.to_spark_io(' /道路/ /測試。或c', format='orc', index_col='index', compression="snappy") >>> kdf_read_spark_io = ks.read_spark_io('/path/to/test.orc', format='orc', index_col='index') >>> kdf_read_spark_io x y z index 1 2 20.0 b 0 1 10.0 a 2 3 30.0 c

獸人格式不支持在上麵的例子中在大熊貓,但是考拉可以寫和讀,因為底層火花的I / O支持它。

最後但並非最不重要,考拉也可以讀寫表如果你有δ湖安裝

三角洲湖是一個開源存儲層,數據可靠性湖泊。三角洲湖提供ACID事務,可擴展的元數據處理,結合流媒體和批量數據處理。

不同於其他文件來源,read_delta函數允許用戶指定表的版本時間旅行。

> > > kdf.to_delta(/路徑/ / test.delta, index_col =“指數”)> > > kdf_read_delta = ks.read_delta(/路徑/ / test.delta, index_col =“指數”)> > > kdf_read_delta x y z指數0 1 1 2 10.0 20.0 30.0 b 2 3 c > > > #更新數據和覆蓋三角洲表> > > kdf [x] = kdf [x] + 10 > > > kdf [y] = kdf [y] * 10 > > > kdf [x] = kdf [x] * 2 > > > kdf.to_delta(/路徑/ / test.delta, index_col =“指數”)> > > > > > #閱讀最新數據ks.read_delta(/路徑/ / test.delta, index_col =“指數”)x y z指數0 22 100.0 1 24 200.0 b 2 26 300.0 c > > > #讀取數據的版本0 > > > ks.read_delta(/路徑/ / test.delta, version = 0, index_col =“指數”)10.0 x y z指數0 1 1 2 20.0 b 2 3 30.0攝氏度

請參閱三角洲湖為更多的細節。

火花訪問器

考拉提供火花為用戶訪問器利用現有PySpark api更容易。

Series.spark。轉換和Series.spark.apply

係列。火花訪問器變換,應用函數來處理潛在的火花列對象。

例如,假設您有以下考拉DataFrame:

> > > kdf = ks。DataFrame ({a: [1、2、3、4]]}) > > > kdf 0 1 1 2 2 3 3 4

你可以用astype鑄型函數,但是如果你不習慣,你可以使用的列使用Series.spark火花。轉換函數:

> > >進口numpy pyspark.sql np > > >。類型進口倍增式> > > > > > kdf [' a_astype_double '] = kdf.a.astype (np.float64) > > > kdf [' a_cast_double '] = kdf.a.spark。變換(λscol: scol.cast(倍增式()))> > > kdf [[' a ', ' a_astype_double ', ' a_cast_double ']]一個a_astype_double a_cast_double 0 1 1.0 3.0 - 3.0 2.0 - 2.0 1.0 - 1 2 2 3 3 4 4.0 - 4.0

用戶函數傳遞給Series.spark。變換函數火花的列使用PySpark函數對象,並可以操縱它。

你也可以使用pyspark.sql的函數。函數變換/應用功能:

從pyspark > > >。sql導入函數作為F > > > > > > kdf [' a_sqrt '] = kdf.a.spark。變換(λscol: F.sqrt (scol)) > > > kdf [' a_log '] = kdf.a.spark。變換(λscol: F.log (scol)) > > > kdf [[' a ', ' a_sqrt ', ' a_log ']]一個a_sqrt a_log 0 1 1.000000 1.732051 - 1.098612 1.414214 - 0.693147 0.000000 - 1 2 2 3 3 4 2.000000 - 1.386294

Series.spark用戶函數。變換應該返回火花列作為輸入的長度相同,而Series.spark。應用可以返回一個不同長度的火花列,如調用聚合函數。

> > > kdf.a.spark。應用(λscol: F.collect_list (scol)) 0(1、2、3、4)名稱:dtype:對象

DataFrame.spark.apply

同樣,DataFrame。火花訪問器有一個應用功能。用戶函數並返回一個火花DataFrame並可以應用任何轉換。如果你想保持火花DataFrame索引列,您可以設置index_col參數。在這種情況下,用戶函數必須包含一個列名稱相同的火花DataFrame返回。

> > > kdf.spark。應用(λ自衛隊:自衛隊。selectExpr(“指數* 10指數”、“+ 1”),index_col =“指數”)一個索引0 2 10 3 20 4 30 5

如果您省略index_col,它將使用默認的索引。

> > > kdf.spark。應用(λ自衛隊:自衛隊。selectExpr (+ 1 ")) 17179869184 2 42949672960 3 42949672960 4 42949672960 5

火花模式

你可以看到當前潛在的火花DataFrame.spark模式。模式和DataFrame.spark.print_schema。他們都把index_col參數如果你想知道模式包括索引列。

> > >進口numpy np > > >熊貓作為pd導入> > > > > > kdf = ks。DataFrame ({a:列表(“abc”),…b:列表(範圍(1、4)),…“c”: np。(3、6).astype不等(i1)……“d”: np.arange (4.0, 7.0, dtype = ' float64 '),…“e”:真的,假的,真正的,…f: pd。date_range(' 20130101 ',時間= 3)},…列= [a, b, c, d, e, f)) > > > #打印模式引發的DDL格式化字符串> > > kdf.spark.schema () .simpleString ()“struct <字符串,b:長整型數字,c:非常小的整數,d:雙e: boolean、f:時間戳>”> > > kdf.spark.schema (index_col =“指數”).simpleString () 'struct' >>> # Print out the schema as same as Spark’s DataFrame.printSchema() >>> kdf.spark.print_schema() root |-- a: string (nullable = false) |-- b: long (nullable = false) |-- c: byte (nullable = false) |-- d: double (nullable = false) |-- e: boolean (nullable = false) |-- f: timestamp (nullable = false) >>> kdf.spark.print_schema(index_col='index') root |-- index: long (nullable = false) |-- a: string (nullable = false) |-- b: long (nullable = false) |-- c: byte (nullable = false) |-- d: double (nullable = false) |-- e: boolean (nullable = false) |-- f: timestamp (nullable = false)

解釋星火計劃

如果你想知道當前星火計劃,您可以使用DataFrame.spark.explain ()。

> > > #火花一樣的DataFrame.explain () > > > kdf.spark.explain() = = = =物理計劃掃描ExistingRDD […]> > > kdf.spark.explain(真正的)= =解析邏輯計劃= =…= = = =分析邏輯計劃……= = = =優化邏輯計劃……= = = =物理計劃掃描ExistingRDD […]>>> # New style of mode introduced from Spark 3.0. >>> kdf.spark.explain(mode="extended") == Parsed Logical Plan == ... == Analyzed Logical Plan == ... == Optimized Logical Plan == ... == Physical Plan == Scan ExistingRDD[...]

緩存

火花訪問器還提供緩存相關功能,緩存,持續下去,unpersist, storage_level財產。您可以使用緩存功能作為上下文經理unpersist緩存。讓我們看看一個例子。

從pyspark進口StorageLevel > > > > > > > > >與kdf.spark.cache緩存():……打印(cached.spark.storage_level)…磁盤內存反序列化1 x > > >複製kdf.spark.persist (StorageLevel.MEMORY_ONLY)緩存:……打印(cached.spark.storage_level)…內存序列化1 x當上下文完成複製,緩存會自動清除。如果你想保持緩存,緩存可以做如下:> > > = kdf.spark.cache() > > >打印(cached.spark.storage_level)磁盤內存反序列化1 x複製

不再需要時,你必須叫DataFrame.spark.unpersist()顯式地從緩存中刪除它。

> > > cached.spark.unpersist ()

提示

有一些連接在考拉的業務,如合並、連接和更新。盡管實際的連接方法取決於底層的火花規劃師引擎蓋下,你仍然可以指定一個提示與ks.broadcast()函數或DataFrame.spark.hint()方法。

> > > kdf1 = ks。DataFrame({“關鍵”:[“foo”、“酒吧”,“記者”,“foo”),…“價值”:[1,2,3,5]},…列=(“關鍵”、“價值”))> > > kdf2 = ks。DataFrame({“關鍵”:[“foo”、“酒吧”,“記者”,“foo”),…“價值”:(5、6、7、8)},…列=[“關鍵”、“價值”])> > > kdf1。合並(kdf2 =“關鍵”).explain物理計劃()= = = =……SortMergeJoin……> > > kdf1.merge (ks.broadcast (kdf2) =“關鍵”).explain物理計劃()= = = =…… BroadcastHashJoin ... ... >>> kdf1.merge(kdf2.spark.hint('broadcast'), on='key').explain() == Physical Plan == ... ... BroadcastHashJoin ... ...

特別是DataFrame.spark.hint()更有用如果底層的火花是3.0或更高版本,因為更多的提示在火花3.0中是可用的。

結論

考拉DataFrame類似PySpark DataFrame因為樹袋熊使用PySpark DataFrame內部。外部,考拉DataFrame好像熊貓DataFrame工作。

為了填補這個缺口,考拉有很多有用的功能,用戶熟悉PySpark工作考拉和PySpark DataFrame容易。雖然有一些額外的護理需要處理該指數在轉換期間,考拉提供PySpark用戶DataFrames之間輕鬆轉換,輸入/輸出PySpark api來讀/寫和火花訪問器暴露PySpark友好內部緩存和探索DataFrame等特性。此外,火花訪問器提供了一種自然的方式來玩考拉係列和PySpark列。

PySpark用戶可以受益於考拉如上所示。嚐試和了解更多的例子磚運行時。

閱讀更多

找到更多關於考拉,看到以下資源:

  1. 試著隨行筆記本
  2. 閱讀前博客10分鍾從熊貓考拉在Apache火花
  3. 火花+人工智能峰會2020交談”考拉,熊貓在Apache火花
  4. 火花+人工智能峰會2020交談”考拉:做一個簡單的從熊貓過渡到Apache的火花

O ' reilly學習引發的書

免費試著磚 開始

報名

Baidu
map