跳轉到主要內容
工程的博客

自動進化你的嵌套列模式,流從三角洲表版本,檢查你的約束

三角洲湖0.8.0使得合並大版本
分享這篇文章

我們最近宣布的釋放三角洲湖0.8.0在合並引入了模式演化和性能改進和操作指標表的曆史。這個版本的主要特點是:

  • 無限的匹配和不匹配的合並操作的條款在Scala中,Java、Python。合並操作現在支持任意數量的whenMatched whenNotMatched條款。此外,合並查詢無條件刪除匹配的行不再扔在多個匹配錯誤。這將是使用SQL和火花3.1支持。看到文檔獲取詳細信息。
  • 合並操作現在支持模式演化的嵌套列。模式演化的嵌套列現在的頂級列具有相同的語義。例如,可以自動添加到新的嵌套列StructType列。看到自動模式演化的合並獲取詳細信息。
  • 現在並入和更新操作解決嵌套的結構體列的名字。更新操作更新和合並成現在命令解決嵌套的結構體列的名字,這意味著當比較或分配列StructType類型,嵌套列的順序並不重要(完全一樣的頂級列)。回複解決的位置,設置以下火花配置錯誤:spark.databricks.delta.resolveMergeUpdateStructsByName.enabled
  • δ表檢查約束。δ現在支持檢查約束。時提供,δ自動驗證數據添加到表中滿足指定的約束表達式。添加檢查約束,使用ALTER TABLE命令添加約束。看到文檔獲取詳細信息。
  • 從一個特定的版本(開始流表# 474)。當使用三角洲作為流媒體來源時,您可以使用選項startingTimestamp或startingVersion開始處理表從一個給定的版本和開始。您還可以設置startingVersion最新跳過表中現有的數據,從新的輸入數據流。看到文檔獲取詳細信息。
  • 與真空能力執行並行刪除(# 395)。當使用“真空”,您可以設置會話配置spark.databricks.delta.vacuum.parallelDelete.enabled真正為了使用火花並行執行刪除文件(基於洗牌分區的數量)。看到文檔獲取詳細信息。
  • 值得一提的是使用Scala簡化api讀寫。你可以進口io.delta.implicits。使用“δ”等引發讀寫api方法spark.read.delta(“/我的/表/路徑”)。看到文檔獲取詳細信息。

此外,我們還強調,現在可以讀一個三角洲表不用通過火花三角洲地區獨立的讀者三角洲鏽API。看到使用三角洲地區獨立的讀者和δ鏽API查詢你的三角洲湖沒有Apache火花™來了解更多信息。

得到的早期預覽O ' reilly的新電子書一步一步的指導你需要開始使用三角洲湖。

自動進化你的嵌套列模式

在以前的db2版本一樣,三角洲湖包括的能力:

與三角洲湖0.8.0,可以自動進化嵌套列在你的δ表更新合並操作。

讓我們展示通過使用一個簡單的咖啡咖啡的例子。我們將創建我們的第一個三角洲表使用下麵的代碼片段。

# espresso1 JSON字符串json_espresso1 = […]#創建抽樣espresso1_rdd = sc.parallelize (json_espresso1)#從抽樣讀取JSONespresso1 = spark.read.json (espresso1_rdd)#寫差值表espresso1.write。格式(“δ”).save (espresso_table_path)

咖啡桌的下麵是一個視圖:
DataFrame表δ8.0.0湖

下麵的代碼片段創建espresso_updates DataFrame:

#創建DataFrameJSON字符串json_espresso2 = […]espresso2_rdd = sc.parallelize (json_espresso2)espresso2 = spark.read.json (espresso2_rdd)espresso2.createOrReplaceTempView (“espresso_updates”)

用這個表格視圖:
DataFrame表δ0.8.0湖

觀察到espresso_updates DataFrame coffee_profile不同列,其中包括一個新的flavor_notes列。

#咖啡δ表“coffee_profile”模式|——coffee_profile:結構(可空=真正的)| |——臨時:(可空=真正的)| |——重量:(可空=真正的)
# espresso_updates DataFrame coffee_profile”模式|——coffee_profile:結構(可空=真正的)| |——flavor_notes:數組(可空=真正的)| | |——元素:字符串(containsNull =真正的)| |——臨時:(可空=真正的)| |——重量:(可空=真正的)

運行這兩個表之間的合並操作,運行以下SQL代碼片段:火花

合並濃縮咖啡作為t使用espresso_updates uu.espresso_id=t.espresso_id匹配然後更新*匹配然後插入*

默認情況下,這段代碼將有以下錯誤,因為coffee_profile列濃咖啡和espresso_updates之間是不同的。

錯誤SQL聲明:AnalysisException:不能把結構體,臨時:重量:>結構體。所有嵌套列必須匹配。

自動合並的救援

為了解決這個問題,使合並使用以下代碼片段;咖啡三角洲表將自動合並兩個表與不同模式包括嵌套列。

——啟用自動模式演化spark.databricks.delta.schema.autoMerge.enabled=真正的;

在一個單一的原子操作,合並執行以下:

  • 更新:espresso_id = 100已經更新的新的嗎flavor_notesespresso_changesDataFrame。
  • espresso_id = (101、102)沒有變化的數據。
  • 插入:espresso_id = 103插入一個新行,從espresso_changesDataFrame。
表格視圖顯示coffee_profile嵌套列的列。
表格視圖顯示coffee_profile嵌套列的列。

值得一提的是簡化讀寫與Scala api

你可以進口io.delta.implicits。使用δ等火花讀寫api方法spark.read.delta(“/我的/表/路徑”)。看到文檔獲取詳細信息。

/ /傳統上,閱讀三角洲表使用Scala,您將執行以下火花.read.format (“δ”).load (“/ tmp /咖啡/”),告訴()
/ /與Scala對內隱格式有點簡單進口io.delta.implicits。火花.read.delta (“/ tmp /咖啡/”),告訴()

檢查約束

你現在可以檢查約束添加到表,這不僅檢查現有數據,但也執行未來數據的修改。例如,以確保espresso_id > = 100運行這個SQL語句:

——確保espresso_id > = 100——這個約束將檢查和執行未來的修改數據表改變濃縮咖啡添加約束idCheck檢查(espresso_id> =One hundred.);
——刪除約束的表如果你不需要它改變濃縮咖啡下降約束idCheck;

以下約束將會失敗,“milk-based_espresso”列有真假值。

- - - - - -檢查如果隻有真正的;注意,這個約束將會失敗。改變濃縮咖啡添加約束milkBaseCheck檢查('牛奶- - - - - -based_espresso”(真正的));
——錯誤輸出錯誤SQL聲明:AnalysisException:1profitecpro.espresso違反檢查約束('牛奶- - - - - -based_espresso”(真正的))

添加或刪除檢查約束也會出現在事務日誌(通過描述曆史濃縮咖啡)你的三角洲表operationalParameters闡明約束條件。

表格視圖顯示約束操作在事務日誌的曆史
表格視圖顯示約束操作在事務日誌的曆史

開始流表從一個特定的版本

當使用三角洲作為流媒體來源時,您可以使用的選項startingTimestampstartingVersion開始處理表從一個給定的版本和開始。你也可以設置startingVersion最新的跳過表中現有的數據,從新的輸入數據流。看到文檔獲取詳細信息。

筆記本,我們將生成一個人工流:

#生成人工流stream_data = spark.readStream。格式(“速度”).option (“rowsPerSecond”,500年).load ()

然後生成一個新的增量表使用這個代碼片段:

=stream_data \.withColumn(“第二”,第二個(坳(“時間戳”)))\.writeStream \.format \(“δ”).option (“checkpointLocation”、“…”) \觸發(processingTime=2秒)\開始(“/δ/ iterator_table”)

中的代碼筆記本將運行大約20秒的流創建以下迭代器下麵的表與事務日誌的曆史。在這種情況下,這個表有10個交易。

——回顧曆史表的路徑描述曆史delta.”/δ/iterator_table/
——通過表名或回顧曆史描述iterator_table曆史;

表格視圖顯示迭代器表事務日誌的曆史

評審輸出迭代器

迭代器表有10個事務的持續時間大約20秒。查看這些數據在一個時間,我們將運行下一個SQL語句,計算每個插入的時間戳迭代器表的第二個(ts)。注意的價值t = 0是最低的時間戳,e想鬥持續時間(ts通過一組)通過運行以下:

選擇ts,(1)作為(選擇價值,(第二個- - - - - -min_second)作為ts(選擇*iterator_table交叉加入(選擇最小值(第二個)作為min_seconditerator_table) xy)z)集團通過ts訂單通過ts

前麵的聲明產生這個條形圖和時間桶(tscount()行)。

注意到20秒鍾流寫十個不同的執行事務,有19 time-buckets截然不同。

注意到20秒的流寫十個不同的執行事務,有19 time-buckets截然不同。

開始三角洲流從一個特定的版本

使用.option (“startingVersion”,“6”),我們可以指定表的哪個版本我們會想開始我們的readStream(包容)。

#開始使用startingVersion readStreamreiterator = spark.readStream。格式(“δ”).option (“startingVersion”,“6”).load (“/δ/ iterator_table /”)
#創建一個臨時視圖流reiterator.createOrReplaceTempView (“reiterator”)

下麵的圖是由重新運行前對新SQL查詢reiterator表。

注意reiterator表,現在有10個不同的time-buckets當我們從後麵的事務表的版本。

注意reiterator表,有10個不同的time-buckets,後來我們從事務表的版本。

開始使用δ0.8.0湖

嚐試三角洲湖與前麵的代碼片段在Apache火花3.1(或更高版本)實例(磚,試試這個DBR 8.0 +)。湖泊三角洲湖使您的數據更可靠,無論你創建一個新的或現有的數據遷移。要了解更多,請參考https://delta.io/,並加入三角洲湖社區通過鬆弛穀歌集團。你可以跟蹤所有的即將推出,計劃特性GitHub的裏程碑並嚐試管理三角洲湖在磚免費帳戶

學分

我們想要感謝下列貢獻者更新,doc變化,和貢獻在三角洲湖0.8.0:亞當Binford,艾倫·金,亞曆克斯·劉,阿裏•Afroozeh安德魯•福格蒂Burak•,大衛·劉易斯Gengliang Wang HyukjinKwon,拉斯科夫斯基Jacek,何塞•托雷斯Kian Ghodoussi,一貫Liu梨紋太陽,馬哈茂德•馬赫迪瑪麗安天雪,邁克爾•時常要邁克·迪亞斯Pranav Anand,拉胡爾從詼諧,斯科特•Sandre Shixiong朱,斯蒂芬妮Bodoff,如來佛Das, Wenchen粉絲,韋斯利·霍夫曼,小李,Yijia崔,任命,紮克時間,contrun ekoifman,義烏。

免費試著磚

相關的帖子

看到所有工程的博客的帖子
Baidu
map