自動進化你的嵌套列模式,流從三角洲表版本,檢查你的約束
我們最近宣布的釋放三角洲湖0.8.0在合並引入了模式演化和性能改進和操作指標表的曆史。這個版本的主要特點是:
- 無限的匹配和不匹配的合並操作的條款在Scala中,Java、Python。合並操作現在支持任意數量的whenMatched whenNotMatched條款。此外,合並查詢無條件刪除匹配的行不再扔在多個匹配錯誤。這將是使用SQL和火花3.1支持。看到文檔獲取詳細信息。
- 合並操作現在支持模式演化的嵌套列。模式演化的嵌套列現在的頂級列具有相同的語義。例如,可以自動添加到新的嵌套列StructType列。看到自動模式演化的合並獲取詳細信息。
- 現在並入和更新操作解決嵌套的結構體列的名字。更新操作更新和合並成現在命令解決嵌套的結構體列的名字,這意味著當比較或分配列StructType類型,嵌套列的順序並不重要(完全一樣的頂級列)。回複解決的位置,設置以下火花配置錯誤:
spark.databricks.delta.resolveMergeUpdateStructsByName.enabled
。 - δ表檢查約束。δ現在支持檢查約束。時提供,δ自動驗證數據添加到表中滿足指定的約束表達式。添加檢查約束,使用ALTER TABLE命令添加約束。看到文檔獲取詳細信息。
- 從一個特定的版本(開始流表# 474)。當使用三角洲作為流媒體來源時,您可以使用選項startingTimestamp或startingVersion開始處理表從一個給定的版本和開始。您還可以設置startingVersion最新跳過表中現有的數據,從新的輸入數據流。看到文檔獲取詳細信息。
- 與真空能力執行並行刪除(# 395)。當使用“真空”,您可以設置會話配置spark.databricks.delta.vacuum.parallelDelete.enabled真正為了使用火花並行執行刪除文件(基於洗牌分區的數量)。看到文檔獲取詳細信息。
- 值得一提的是使用Scala簡化api讀寫。你可以進口
io.delta.implicits。
使用“δ”等引發讀寫api方法spark.read.delta(“/我的/表/路徑”)
。看到文檔獲取詳細信息。
此外,我們還強調,現在可以讀一個三角洲表不用通過火花三角洲地區獨立的讀者和三角洲鏽API。看到使用三角洲地區獨立的讀者和δ鏽API查詢你的三角洲湖沒有Apache火花™來了解更多信息。
得到的早期預覽O ' reilly的新電子書一步一步的指導你需要開始使用三角洲湖。
自動進化你的嵌套列模式
在以前的db2版本一樣,三角洲湖包括的能力:
- 執行合並操作,
- 簡化你的插入/更新/刪除操作在一個單一的原子操作,
- 實施和發展模式(也可以發現更多細節該技術討論),
- 發展模式在合並操作。
與三角洲湖0.8.0,可以自動進化嵌套列在你的δ表更新和合並操作。
讓我們展示通過使用一個簡單的咖啡咖啡的例子。我們將創建我們的第一個三角洲表使用下麵的代碼片段。
# espresso1 JSON字符串json_espresso1 = […]#創建抽樣espresso1_rdd = sc.parallelize (json_espresso1)#從抽樣讀取JSONespresso1 = spark.read.json (espresso1_rdd)#寫差值表espresso1.write。格式(“δ”).save (espresso_table_path)
下麵的代碼片段創建espresso_updates DataFrame:
#創建DataFrame從JSON字符串json_espresso2 = […]espresso2_rdd = sc.parallelize (json_espresso2)espresso2 = spark.read.json (espresso2_rdd)espresso2.createOrReplaceTempView (“espresso_updates”)
觀察到espresso_updates DataFrame coffee_profile不同列,其中包括一個新的flavor_notes列。
#咖啡δ表“coffee_profile”模式|——coffee_profile:結構(可空=真正的)| |——臨時:雙(可空=真正的)| |——重量:雙(可空=真正的)
# espresso_updates DataFrame coffee_profile”模式|——coffee_profile:結構(可空=真正的)| |——flavor_notes:數組(可空=真正的)| | |——元素:字符串(containsNull =真正的)| |——臨時:雙(可空=真正的)| |——重量:雙(可空=真正的)
運行這兩個表之間的合並操作,運行以下SQL代碼片段:火花
合並成濃縮咖啡作為t使用espresso_updates u在u.espresso_id=t.espresso_id當匹配然後更新集*當不匹配然後插入*
默認情況下,這段代碼將有以下錯誤,因為coffee_profile列濃咖啡和espresso_updates之間是不同的。
錯誤在SQL聲明:AnalysisException:不能把結構體,臨時:雙重量:雙>來結構體。所有嵌套列必須匹配。
自動合並的救援
為了解決這個問題,使合並使用以下代碼片段;咖啡三角洲表將自動合並兩個表與不同模式包括嵌套列。
——啟用自動模式演化集spark.databricks.delta.schema.autoMerge.enabled=真正的;
在一個單一的原子操作,合並執行以下:
更新:espresso_id = 100
已經更新的新的嗎flavor_notes
從espresso_changes
DataFrame。espresso_id = (101、102)
沒有變化的數據。插入:espresso_id = 103
插入一個新行,從espresso_changes
DataFrame。
值得一提的是簡化讀寫與Scala api
你可以進口io.delta.implicits。
使用δ
等火花讀寫api方法spark.read.delta(“/我的/表/路徑”)
。看到文檔獲取詳細信息。
/ /傳統上,閱讀三角洲表使用Scala,您將執行以下火花.read.format (“δ”).load (“/ tmp /咖啡/”),告訴()
/ /與Scala對內隱格式有點簡單進口io.delta.implicits。火花.read.delta (“/ tmp /咖啡/”),告訴()
檢查約束
你現在可以檢查約束添加到表,這不僅檢查現有數據,但也執行未來數據的修改。例如,以確保espresso_id > = 100
運行這個SQL語句:
——確保espresso_id > = 100——這個約束將檢查和執行未來的修改數據表改變表濃縮咖啡添加約束idCheck檢查(espresso_id> =One hundred.);
——刪除約束的表如果你不需要它改變表濃縮咖啡下降約束idCheck;
以下約束將會失敗,“milk-based_espresso”列有真假值。
- - - - - -檢查如果列有隻有真正的值;注意,這個約束將會失敗。改變表濃縮咖啡添加約束milkBaseCheck檢查('牛奶- - - - - -based_espresso”在(真正的));
——錯誤輸出錯誤在SQL聲明:AnalysisException:1行在profitecpro.espresso違反新檢查約束('牛奶- - - - - -based_espresso”在(真正的))
添加或刪除檢查約束也會出現在事務日誌(通過描述曆史濃縮咖啡)你的三角洲表operationalParameters闡明約束條件。
開始流表從一個特定的版本
當使用三角洲作為流媒體來源時,您可以使用的選項startingTimestamp
或startingVersion
開始處理表從一個給定的版本和開始。你也可以設置startingVersion
來最新的
跳過表中現有的數據,從新的輸入數據流。看到文檔獲取詳細信息。
在筆記本,我們將生成一個人工流:
#生成人工流stream_data = spark.readStream。格式(“速度”).option (“rowsPerSecond”,500年).load ()
然後生成一個新的增量表使用這個代碼片段:
流=stream_data \.withColumn(“第二”,第二個(坳(“時間戳”)))\.writeStream \.format \(“δ”).option (“checkpointLocation”、“…”) \。觸發(processingTime=2秒)\。開始(“/δ/ iterator_table”)
中的代碼筆記本將運行大約20秒的流創建以下迭代器下麵的表與事務日誌的曆史。在這種情況下,這個表有10個交易。
——回顧曆史表的路徑描述曆史delta.”/δ/iterator_table/”
——通過表名或回顧曆史描述iterator_table曆史;
評審輸出迭代器
迭代器表有10個事務的持續時間大約20秒。查看這些數據在一個時間,我們將運行下一個SQL語句,計算每個插入的時間戳迭代器表的第二個(ts
)。注意的價值t = 0
是最低的時間戳,e想鬥持續時間(ts
通過一組)通過運行以下:
選擇ts,數(1)作為問從(選擇價值,(第二個- - - - - -min_second)作為ts從(選擇*從iterator_table交叉加入(選擇最小值(第二個)作為min_second從iterator_table) xy)z)集團通過ts訂單通過ts
前麵的聲明產生這個條形圖和時間桶(ts
count()行問
)。
注意到20秒的流寫十個不同的執行事務,有19 time-buckets截然不同。
開始三角洲流從一個特定的版本
使用.option (“startingVersion”,“6”)
,我們可以指定表的哪個版本我們會想開始我們的readStream(包容)。
#開始使用startingVersion readStreamreiterator = spark.readStream。格式(“δ”).option (“startingVersion”,“6”).load (“/δ/ iterator_table /”)
#創建一個臨時視圖流reiterator.createOrReplaceTempView (“reiterator”)
下麵的圖是由重新運行前對新SQL查詢reiterator表。
注意reiterator表,有10個不同的time-buckets,後來我們從事務表的版本。
開始使用δ0.8.0湖
嚐試三角洲湖與前麵的代碼片段在Apache火花3.1(或更高版本)實例(磚,試試這個DBR 8.0 +)。湖泊三角洲湖使您的數據更可靠,無論你創建一個新的或現有的數據遷移。要了解更多,請參考https://delta.io/,並加入三角洲湖社區通過鬆弛和穀歌集團。你可以跟蹤所有的即將推出,計劃特性GitHub的裏程碑並嚐試管理三角洲湖在磚免費帳戶。
學分
我們想要感謝下列貢獻者更新,doc變化,和貢獻在三角洲湖0.8.0:亞當Binford,艾倫·金,亞曆克斯·劉,阿裏•Afroozeh安德魯•福格蒂Burak•,大衛·劉易斯Gengliang Wang HyukjinKwon,拉斯科夫斯基Jacek,何塞•托雷斯Kian Ghodoussi,一貫Liu梨紋太陽,馬哈茂德•馬赫迪瑪麗安天雪,邁克爾•時常要邁克·迪亞斯Pranav Anand,拉胡爾從詼諧,斯科特•Sandre Shixiong朱,斯蒂芬妮Bodoff,如來佛Das, Wenchen粉絲,韋斯利·霍夫曼,小李,Yijia崔,任命,紮克時間,contrun ekoifman,義烏。