跳轉到主要內容
工程的博客

簡單,可靠的插入和刪除三角洲湖表上使用Python api

三角洲湖0.4.0包括Python api和就地轉化鋪三角洲湖表

2019年10月3日 工程的博客

分享這篇文章

試試這個Jupyter筆記本

我們興奮地宣布釋放三角洲湖0.4.0介紹了Python api用於操縱和管理數據在三角洲表。這個版本的主要特點是:

  • Python api DML和實用程序操作(# 89)- - -現在可以使用Python api來更新/刪除/合並三角洲湖表中的數據和實用程序運行操作(即。、真空、曆史)。這些都是很好的構建複雜工作負載在Python中,例如,緩慢變化維度(SCD)操作,合並更改數據複製,插入從流媒體查詢。看到文檔為更多的細節。
  • Convert-to-Delta(# 78)- - -現在你可以拚花表的轉換為三角洲湖表沒有修改任何數據。這是偉大的轉換非常大的鑲花表,成本會很高,重寫三角洲表。此外,這個過程是可逆的,可以拚花表轉換為三角洲湖表,操作(例如,刪除或合並),且容易將其轉換回拚花表。看到文檔為更多的細節。
  • SQL工具操作——你現在可以使用SQL運行實用操作真空和曆史。看到文檔更多細節關於如何配置火花來執行這些Delta-specific SQL命令。

有關更多信息,請參考三角洲湖0.4.0發行說明三角洲湖文檔>表刪除、更新和合並

https://www.youtube.com/watch?v=R4f6SKOetB4

在這個博客中,我們將演示Apache火花™2.4.3如何使用Python和新的Python api在上下文中三角洲湖0.4.0準時飛行性能場景。我們將介紹如何插入和刪除數據,查詢舊版本的數據與時間旅行和真空清理舊版本。


得到的早期預覽O ' reilly的新電子書一步一步的指導你需要開始使用三角洲湖。

如何開始使用三角洲湖

三角洲湖包是可用的——包選擇。在我們的示例中,我們還將展示真空文件的能力和執行SQL命令三角洲湖在Apache火花。因為這是一個簡短的演示中,我們還將啟用以下配置:

  • spark.databricks.delta.retentionDurationCheck.enabled = false允許我們真空文件比默認保留7天時間短。請注意,這隻是SQL命令所需的真空。
  • spark.sql.extensions = io.delta.sql.DeltaSparkSessionExtension在Apache的火花,使三角洲湖SQL命令;這不是需要Python或Scala API調用。
#使用火花包。// pyspark——包io.delta: delta-core_2:0.4。0——設計“spark.databricks.delta.retentionDurationCheck.enabled = false”——設計“spark.sql.extensions = io.delta.sql.DeltaSparkSessionExtension”

加載和保存我們的三角洲湖數據

這個場景將使用準時飛行性能或延誤產生的數據集麗塔BTS飛行離開統計;這個數據在行動包括的一些示例通過d3 2014航班起飛性能。js Crossfilter準時飛行性能與GraphFrames Apache火花™。這個數據集可以在本地下載github的位置。在pyspark,首先閱讀數據集。

#位置變量tripdelaysFilePath =“/根/數據/ departuredelays.csv”pathToEventsTable =“/根/ deltalake / departureDelays.delta”#讀取航班延誤數據departureDelays =火花。讀\.option (“頭”,“真正的”)\.option (“inferSchema”,“真正的”)\. csv (tripdelaysFilePath)

接下來,讓我們來拯救我們departureDelays數據集到三角洲湖表。三角洲湖存儲通過保存此表,我們將能夠利用其特性包括ACID事務,統一批處理和流和時間旅行。

#將航班延誤數據保存到三角洲湖格式departureDelays \.write \格式(“δ”)\.mode (“覆蓋”)\.save (“departureDelays.delta”)

注意,這種方法類似於拚花你通常如何保存數據;而不是指定格式(“鋪”)現在,您將指定格式(“δ”)。如果你看看底層文件係統,您將注意到四個文件的創建departureDelays三角洲湖表。

/ departureDelays.delta $ ls - l。。_delta_log部分- 00000 df6f69ea e6aa - 424 b - bc0e f3674c4f1906 c000.snappy.parquet- 00001 - 711 -一部分bcce3 fe9e - 466 e - a22c - 8256 f8b54930 c000.snappy.parquet- 00002 - 778 -一部分ba97d - 89 - b8 - 4942 a495 - 5 - f6238830b68 c000.snappy.parquet部分- 00003 - 1 - a791c4a - 6 - f11 - 49 - a8 - 8837 - 8093 - a3220581 c000.snappy.parquet
請注意,_delta_log文件夾包含三角洲湖事務日誌。有關更多信息,請參閱深入三角洲湖:開箱事務日誌。

現在,讓我們重新加載數據但這次DataFrame將由三角洲湖。

#負載在三角洲湖航班延誤數據格式delays_delta =火花\.read \格式(“δ”)\.load (“departureDelays.delta”)#創建臨時視圖delays_delta.createOrReplaceTempView (“delays_delta”)#有多少航班在西雅圖和舊金山之間spark.sql (“select count(1)從delays_delta起源=‘海’和目的地=“舊金山”),告訴()

最後,讓我們確定來自西雅圖的航班到舊金山的數量;在這個數據集,有1698個航班。

就地轉換三角洲湖

如果你有現有的鑲花表,你有能力執行就地轉換表三角洲湖因此不需要重寫你的表。轉換表,您可以運行下麵的命令。

三角洲。表s import*#轉換非分區拚花路徑“/道路/ /表”deltaTable=DeltaTable。convertToDelta(火花,“parquet. /道路/ /表”)#轉換分區的拚花路徑“/道路/ /表”分區通過整數命名“部分”partitionedDeltaTable=DeltaTable。convertToDelta(火花,“拚花。/道路/ /表”,“int”部分)

更多信息,包括如何做這個轉換在Scala和SQL,請參考轉換為三角洲湖

刪除我們的飛行數據

從傳統數據湖表刪除數據,您需要:

  1. 從你選擇的所有數據表不包括你想要刪除的行
  2. 基於前麵的查詢創建一個新表
  3. 刪除原始表
  4. 將新表重命名為下遊的原始表名稱依賴關係。

執行所有這些步驟,而是與三角洲湖,我們可以簡化這個過程通過運行一個DELETE語句。要展示這一點,讓我們刪除所有的航班提前或準時(即到達。延遲)。

/ departureDelays.delta $ ls - l - 00000 a2a19ba4 _delta_log部分- 17 - e9 - 4931 - 9 - bbf 3 - c9d4997780b c000.snappy。拚花一部分- 00000 df6f69ea e6aa - 424 b - bc0e f3674c4f1906 c000.snappy。拚花一部分- 00001 - 711 - bcce3 fe9e - 466 e - a22c - 8256 f8b54930 c000.snappy。拚花一部分- 00001 - a0423a18 - 62 eb - 46 - b3 - a82f ca9aac1f1e93 c000.snappy。拚花一部分- 00002 - 778 - ba97d - 89 - b8 - 4942 a495 - 5 - f6238830b68 c000.snappy。拚花一部分- 00002 - bfaa0a2a - 0 - a31 4 -沛富aa63 - 162402 - f802cc c000.snappy。拚花一部分- 00003 - 1 - a791c4a - 6 - f11 - 49 - a8 - 8837 - 8093 - a3220581 c000.snappy。拚花一部分- 00003 - b0247e1d f5ce - 4 - b45 - 91 - cd - 16413 - c784a66 c000.snappy.parquet

在傳統數據湖泊,刪除通過重寫整個表不包括執行值刪除。三角洲湖,刪除而不是由選擇性地寫新版本的文件被刪除,隻包含數據標誌著之前的文件刪除。這是因為三角洲湖使用多版本並發控製原子操作表:例如,當一個用戶刪除數據,另一個用戶可以查詢之前版本的表。這多版本的模型也使我們能夠穿越時間(即。時間旅行)和查詢以前的版本我們會看到。

更新我們的飛行數據

從傳統數據湖表更新數據,您需要:

  1. 從你選擇的所有數據表不包括您想修改的行
  2. 修改行需要更新/改變
  3. 合並這兩個表來創建一個新表
  4. 刪除原始表
  5. 將新表重命名為下遊的原始表名稱依賴關係。

而不是執行所有這些步驟,三角洲湖,我們可以簡化這個過程通過運行一個更新語句。顯示這個,讓我們更新所有的航班來自底特律到西雅圖。

#更新所有航班始發底特律現在,來自西雅圖deltaTable。更新("origin = 'DTW'", { "origin": "'SEA'" } )#有多少航班之間的西雅圖舊金山spark.sql (“select count(1)從delays_delta起源=‘海’和目的地=“舊金山”)。顯示()

底特律的航班現在標記為西雅圖的航班,我們現在有986個航班始發從西雅圖到舊金山。如果你列出你的文件係統departureDelays文件夾(即。美元. ./ departureDelays / ls - l),你會發現現在有11個文件(而不是8後刪除文件和四個文件在創建表)。

我們的飛行數據合並

一個常見的場景在處理數據時湖是不斷追加數據到你的桌子上。這往往導致重複數據(表你不想插入行),需要插入新行,一些需要更新的行。三角洲湖,所有這一切都可以通過使用merge操作(類似於SQL merge語句)。

讓我們從一個示例數據集,你會想要更新,插入或刪除處理以下查詢。

#什麼航班之間的舊金山這些日期spark.sql ("select * from delays_delta where origin = 'SEA' and destination = 'SFO' and date like '1010%' limit 10").顯示()

這個查詢的輸出看起來像下表所示。注意,使用已經被添加到這個博客清楚地識別哪些行刪除處理(藍色)、更新(黃色),插入(綠色)。

接下來,讓我們來創造我們自己的merge_table包含數據,我們將插入、更新或減少重複與下麵的代碼片段。

項= [(1010710,31日,590年,“海”,“舊金山”),(1010521,10,590年,“海”,“舊金山”),(1010822,31日,590年,“海”,“舊金山”)]關口= [“日期”,“延遲”,“距離”,“起源”,“目的地”]merge_table =火花。createDataFrame(項目、峽路)merge_table.toPandas ()

在前麵的表(merge_table),有三排,獨特的日期值:

  1. 1010521:這一行需要更新航班表與一個新的延遲值(黃色)
  2. 1010710:這一行重複的(藍色)
  3. 1010822:這是一個新行插入(綠色)

三角洲湖,這可以很容易地獲得通過合並下麵的代碼片段聲明中提到。

#合並merge_table航班deltaTable.alias \(“飛行”)合並(merge_table.alias(“更新”)、“航班。日期= updates.date ") \.whenMatchedUpdate (={“延遲”:“updates.delay”}) \.whenNotMatchedInsertAll () \執行()#什麼航班之間的舊金山這些日期spark.sql ("select * from delays_delta where origin = 'SEA' and destination = 'SFO' and date like '1010%' limit 10").顯示()

所有這三個動作的重複數據刪除、更新和插入有效地完成了一個聲明。

查看表的曆史

如前所述,在每個交易後(刪除、更新),有多個文件中創建文件係統。這是因為對於每個事務,有不同版本的三角洲湖表。這可以被使用DeltaTable.history ()方法如下。

deltaTable.history ()。顯示()+- - - - - - - + - - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + - - - + - - - - - - - + - - - - - - - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +|版本|時間戳|用戶標識|用戶名|操作|operationParameters|工作|筆記本|clusterId|readVersion|isolationLevel|isBlindAppend|+- - - - - - - + - - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + - - - + - - - - - - - + - - - - - - - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +|2|2019年-09年-29年15:41:22|||更新|(謂語- - - - - ->(或…||||1||||1|2019年-09年-29年15:40:45|||刪除|(謂語- - - - - ->[" (…|零|零|零|0|零|假|| 0 | 2019-09-29 15:40:14零零| | | |寫[模式- > Overwrit……零| |空零零零錯誤| | | | |+ - - - - - - - - - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + - - - + - - - - - - - + - - - - - - - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
請注意,您還可以執行相同的任務使用SQL:火花。sql(“描述曆史”+ pathToEventsTable +“”),告訴()

如您所見,有三個代表不同版本的表行(下麵是刪節版幫助使它易於閱讀)的每個操作(創建表,刪除和更新):

版本 時間戳 操作 operationParameters
2 2019-09-29 15:41:22 更新 (謂詞- >(或…
1 2019-09-29 15:40:45 刪除 (謂詞- > [" (…
0 2019-09-29 15:40:14 (模式- > Overwrit……

穿越時間與表的曆史

通過時間旅行,你可以看到三角洲湖審查表的版本或時間戳。有關更多信息,請參閱三角洲湖文檔>讀舊版本的數據使用時間旅行。要查看曆史數據,指定版本時間戳選擇;在下麵的代碼片段中,我們將指定版本選項

#負載DataFrames每一個版本dfv0=spark.read.format(“δ”).option (“versionAsOf”,0).load (“departureDelays.delta”)dfv1=spark.read.format(“δ”).option (“versionAsOf”,1).load (“departureDelays.delta”)dfv2=spark.read.format(“δ”).option (“versionAsOf”,2).load (“departureDelays.delta”)#計算大海舊金山的航班數量每一個版本曆史cnt0=dfv0。的地方(“起源=‘海’”)。在哪裏(“目的地”=“舊金山”)。()cnt1=dfv1。在哪裏("origin = 'SEA'").在哪裏(“目的地”=“舊金山”)。()cnt2=dfv2。在哪裏("origin = 'SEA'").在哪裏(“目的地”=“舊金山”)。()#打印價值打印(“海- > SFO計數:創建表:% s,刪除:% s,更新:% s”%(cnt0 cnt1, cnt2))
              # #輸出- - - - - ->SFO計數:創建:1698年,刪除:837年更新:986年

無論是治理、風險管理和合規(GRC)或回滾錯誤,三角洲湖表同時包含元數據(例如記錄刪除發生這一事實與這些運營商)和數據(如實際行刪除)。但是我們怎麼刪除數據文件為合規或大小原因嗎?

清理舊版本與真空表

三角洲湖真空方法將刪除所有的行(和文件)默認7天以上(參考:三角洲湖真空)。如果你查看文件係統,您將注意到的11個文件表。

/ departureDelays.delta $ ls - l_delta_log部分- 00000 - 5 - e52736b - 0 - e63 - 48 - f3 - 8 d56 - 50 - f7cfa0494d c000.snappy.parquet- 00000 - 69 -一部分eb53d5 - 34 - b4 - 408 f - a7e4 - 86 - e000428c37 c000.snappy.parquet部分- 00000 f8edaf04 - 712 e - 4 - ac4 - 8 b42 - 368 - d0bbdb95b c000.snappy.parquet部分- 00001 - 20893 -速度- 9 - d4f - 4 - c1f b619 - 3 - e6ea1fdd05f c000.snappy.parquet部分- 00001 - 9 - b68b9f6 bad3 - 434 f - 9498 - f92dc4f503e3 c000.snappy.parquet部分- 00001 d4823d2e - 8 f9d - 42 - e3 - 918 d - 4060969 - e5844 c000.snappy.parquet部分- 00002 - 24 - da7f4e 7 - e8d - 40 - d1 - b664 - 95 - bf93ffeadb c000.snappy.parquet部分- 00002 - 3027786 - c - 20 - a9 4 - b19 - 868 d - dc7586c275d4 c000.snappy.parquet部分- 00002 f2609f27 - 3478 4 - bf9 aeb7 - 2 - c78a05e6ec1 c000.snappy.parquet部分- 00003 - 850436 - a6 c4dd - 4535 a1c0 - 5 - dc0f01d3d55 c000.snappy.parquet部分- 00003 b9292122 - 99 a7 - 4223 - aaa9 - 8646 - c281f199 c000.snappy.parquet

刪除所有的文件,隻保留當前的快照數據,您將指定一個小值真空方法(而不是默認的保留7天)。

#刪除所有文件比0小時。deltaTable.vacuum (0)
注意,您執行相同的任務通過SQL語法:¸#刪除所有文件比0小時火花。sql(“真空”+ pathToEventsTable +“保留0小時”)

真空已完成後,當你回顧你會注意到更少的文件的文件係統曆史數據已被刪除。

/ departureDelays.delta $ ls - l_delta_log部分- 00000 f8edaf04 - 712 e - 4 - ac4 - 8 b42 - 368 - d0bbdb95b c000.snappy.parquet部分- 00001 - 9 - b68b9f6 bad3 - 434 f - 9498 - f92dc4f503e3 c000.snappy.parquet部分- 00002 - 24 - da7f4e 7 - e8d - 40 - d1 - b664 - 95 - bf93ffeadb c000.snappy.parquet部分- 00003 b9292122 - 99 a7 - 4223 - aaa9 - 8646 - c281f199 c000.snappy.parquet
注意,時間旅行的能力超過保存期丟失一個版本在運行真空。

接下來是什麼

嚐試嚐試今天三角洲湖的前麵的代碼片段在Apache火花2.4.3(或更大)的實例。通過使用三角洲湖,您可以使您的數據湖泊更可靠(不管你創建一個新的或現有的數據遷移湖)。要了解更多,請參考https://delta.io/並加入三角洲湖社區通過鬆弛穀歌集團。你可以跟蹤所有的即將推出,計劃特性github的裏程碑

到來,我們也很興奮火花AI歐洲峰會10月15日至17日。在峰會上,我們會有一個訓練專用的三角洲湖

學分

我們想要感謝下列貢獻者更新,doc變化,在三角洲湖0.4.0和貢獻:Andreas Neumann Burak•,何塞•托雷斯Jules Damji Jungtaek Lim梨紋太陽,邁克爾•Armbrust成員Mukul沒吃,Pranav Anand,拉胡爾從詼諧,Shixiong朱,如來佛Das,特裏•金Wenchen粉絲,韋斯利·霍夫曼Yishuang,育才Yu, lys0716。

免費試著磚

相關的帖子

看到所有工程的博客的帖子
Baidu
map