插入表使用合並成三角洲湖
您可以插入數據從源表,視圖,或者DataFrame目標三角洲表使用合並
SQL操作。三角洲湖支持插入、更新和刪除合並
,它支持擴展語法之外的SQL標準促進先進的用例。
假設您有一個源表命名people10mupdates
或一個源路徑/ tmp /δ/ people-10m-updates
,包含一個目標表命名的新數據people10m
或一個目標路徑/ tmp /δ/ people-10m
。其中一些新記錄可能已經出現在目標數據。合並的新數據,你想更新人的行id
已經存在並插入新行,不匹配id
是禮物。您可以運行下麵的查詢:
合並成people10m使用people10mupdates在people10m。id=people10mupdates。id當匹配然後更新集id=people10mupdates。id,firstName=people10mupdates。firstName,middleName=people10mupdates。middleName,姓=people10mupdates。姓,性別=people10mupdates。性別,生日=people10mupdates。生日,ssn=people10mupdates。ssn,工資=people10mupdates。工資當不匹配然後插入(id,firstName,middleName,姓,性別,生日,ssn,工資)值(people10mupdates。id,people10mupdates。firstName,people10mupdates。middleName,people10mupdates。姓,people10mupdates。性別,people10mupdates。生日,people10mupdates。ssn,people10mupdates。工資)
從delta.tables進口*deltaTablePeople=DeltaTable。forPath(火花,“/ tmp /δ/ people-10m”)deltaTablePeopleUpdates=DeltaTable。forPath(火花,“/ tmp /δ/ people-10m-updates”)dfUpdates=deltaTablePeopleUpdates。toDF()deltaTablePeople。別名(“人”)\。合並(dfUpdates。別名(“更新”),”的人。id=更新年代。id')\。whenMatchedUpdate(集={“id”:“updates.id”,“firstName”:“updates.firstName”,“middleName”:“updates.middleName”,“姓”:“updates.lastName”,“性別”:“updates.gender”,“生日”:“updates.birthDate”,“ssn”:“updates.ssn”,“工資”:“updates.salary”})\。whenNotMatchedInsert(值={“id”:“updates.id”,“firstName”:“updates.firstName”,“middleName”:“updates.middleName”,“姓”:“updates.lastName”,“性別”:“updates.gender”,“生日”:“updates.birthDate”,“ssn”:“updates.ssn”,“工資”:“updates.salary”})\。執行()
進口io。δ。表。_進口org。apache。火花。sql。功能。_瓦爾deltaTablePeople=DeltaTable。forPath(火花,“/ tmp /δ/ people-10m”)瓦爾deltaTablePeopleUpdates=DeltaTable。forPath(火花,“tmp /δ/ people-10m-updates”)瓦爾dfUpdates=deltaTablePeopleUpdates。toDF()deltaTablePeople。作為(“人”)。合並(dfUpdates。作為(“更新”),”的人。id=更新年代。id")。whenMatched。updateExpr(地圖(“id”- >“updates.id”,“firstName”- >“updates.firstName”,“middleName”- >“updates.middleName”,“姓”- >“updates.lastName”,“性別”- >“updates.gender”,“生日”- >“updates.birthDate”,“ssn”- >“updates.ssn”,“工資”- >“updates.salary”))。whenNotMatched。insertExpr(地圖(“id”- >“updates.id”,“firstName”- >“updates.firstName”,“middleName”- >“updates.middleName”,“姓”- >“updates.lastName”,“性別”- >“updates.gender”,“生日”- >“updates.birthDate”,“ssn”- >“updates.ssn”,“工資”- >“updates.salary”))。執行()
看到三角洲湖API文檔Scala和Python語法細節。SQL語法細節,請參閱合並成
使用合並修改所有無與倫比的行
在磚SQL和磚的運行時12.1及以上,可以使用當不匹配通過源
條款更新
orgydF4y2Ba刪除
記錄目標表中沒有對應的源表中的記錄。磚建議添加一個可選的條件條款,以避免完全重寫目標表。
下麵的代碼示例顯示了基本語法使用這個刪除,覆蓋目標表與源表的內容和刪除目標表中的無與倫比的記錄。更可伸縮模式表,更新和刪除源是有時限的,看到的增量同步三角洲與源表。
(targetDF。合並(sourceDF,”源。關鍵=目標。關鍵")。whenMatchedUpdateAll()。whenNotMatchedInsertAll()。whenNotMatchedBySourceDelete()。執行())
targetDF。合並(sourceDF,”源。關鍵=目標。關鍵")。whenMatched()。updateAll()。whenNotMatched()。insertAll()。whenNotMatchedBySource()。刪除()。執行()
合並成目標使用源在源。關鍵=目標。關鍵當匹配然後更新集*當不匹配然後插入*當不匹配通過源然後刪除
下麵的例子將條件添加到當不匹配通過源
條款和指定值更新在無與倫比的目標行。
(targetDF。合並(sourceDF,”源。關鍵=目標。關鍵")。whenMatchedUpdate(集={“target.lastSeen”:“source.timestamp”})。whenNotMatchedInsert(值={“target.key”:“source.key”,“target.lastSeen”:“source.timestamp”,“target.status”:“活躍”})。whenNotMatchedBySourceUpdate(條件=”目標。lastSeen> =(當前日期()- - - - - -時間間隔“5”一天)",集={“target.status”:“不活躍”})。執行())
targetDF。合並(sourceDF,”源。關鍵=目標。關鍵")。whenMatched()。updateExpr(地圖(“target.lastSeen”- >“source.timestamp”))。whenNotMatched()。insertExpr(地圖(“target.key”- >“source.key”,“target.lastSeen”- >“source.timestamp”,“target.status”- >“活躍”,))。whenNotMatchedBySource(”目標。lastSeen> =(當前日期()- - - - - -時間間隔“5”一天)")。updateExpr(地圖(“target.status”- >“不活躍”))。執行()
合並成目標使用源在源。關鍵=目標。關鍵當匹配然後更新集目標。lastSeen=源。時間戳當不匹配然後插入(關鍵,lastSeen,狀態)值(源。關鍵,源。時間戳,“活躍”)當不匹配通過源和目標。lastSeen> =(當前日期()- - - - - -時間間隔“5”一天)然後更新集目標。狀態=“不活躍”
合並操作語義
下麵是詳細的描述合並
編程操作語義。
可以有任意數量的
whenMatched
和whenNotMatched
條款。whenMatched
條款執行時源行匹配目標表行根據匹配條件。這些條款有以下語義。whenMatched
條款最多隻能有一個更新
和一個刪除
行動。的更新
行動合並
僅更新(類似於指定的列更新
操作)匹配的目標行。的刪除
刪除匹配的行。每一個
whenMatched
條款可以有一個可選的條件。如果這一條款條件存在,更新
orgydF4y2Ba刪除
行動執行任何匹配的源-目標行隻有當條款條件為真。如果有多個
whenMatched
條款,然後他們評估的順序指定。所有whenMatched
條款,除了最後一個,必須有條件。如果沒有一個
whenMatched
條件評估為true的源和目標匹配的行對合並條件,那麼目標行左不變。更新目標三角洲表的所有列的相應列源數據集,使用
whenMatched (…) .updateAll ()
。這相當於:whenMatched(…)。updateExpr(地圖(“col1”- >“source.col1”,“col2”- >“source.col2”,…))
對所有目標三角洲表的列。因此,這一行動假定源表具有相同的列的目標表,否則查詢拋出一個分析錯誤。
請注意
這種行為變化時自動啟用模式遷移。看到自動模式演化獲取詳細信息。
whenNotMatched
條款執行時源行不匹配任何行基於目標匹配條件。這些條款有以下語義。whenNotMatched
條款可以隻有插入
行動。生成新行根據指定的列和相應的表達式。你不需要指定目標表中的所有列。對於未指定的目標列,零
被插入。每一個
whenNotMatched
條款可以有一個可選的條件。如果條款條件存在,源行插入隻有當條件為真行。否則,源列將被忽略。如果有多個
whenNotMatched
條款,然後他們評估的順序指定。所有whenNotMatched
條款,除了最後一個,必須有條件。插入目標三角洲表的所有列的相應列源數據集,使用
whenNotMatched (…) .insertAll ()
。這相當於:whenNotMatched(…)。insertExpr(地圖(“col1”- >“source.col1”,“col2”- >“source.col2”,…))
對所有目標三角洲表的列。因此,這一行動假定源表具有相同的列的目標表,否則查詢拋出一個分析錯誤。
請注意
這種行為變化時自動啟用模式遷移。看到自動模式演化獲取詳細信息。
whenNotMatchedBySource
條款執行當目標行不匹配任何源行基於合並條件。這些條款有以下語義。whenNotMatchedBySource
條款可以指定刪除
和更新
行動。每一個
whenNotMatchedBySource
條款可以有一個可選的條件。如果條款條件存在,目標行修改隻對這一行,如果條件為真。否則,目標行是不變的。如果有多個
whenNotMatchedBySource
條款,然後他們評估的順序指定。所有whenNotMatchedBySource
條款,除了最後一個,必須有條件。根據定義,
whenNotMatchedBySource
條款沒有將列值從一個源行,所以不能引用源列。對於每一列要修改,您可以指定一個文字或目標列上執行操作,如集target.deleted_count=target.deleted_count+1
。
重要的
一個
合並
源數據集的操作就會失敗,如果多行匹配和合並嚐試更新相同的目標三角洲表行。根據SQL合並的語義,等更新操作是模棱兩可的尚不清楚應該使用哪個源行更新匹配的目標行。源表可以進行預處理來消除多個匹配的可能性。看到變化數據捕獲的例子——顯示了如何預處理改變數據集(即源數據集)隻保留最新的改變對於每個關鍵申請前三角洲到目標表。你可以申請一個SQL
合並
操作在一個SQL視圖隻有視圖已被定義為創建視圖viewName作為選擇*從deltaTable
。
重複數據刪除在編寫到三角洲表中
常見的ETL的用例是收集日誌到三角洲表通過添加一個表。然而,通常來源可以生成重複的日誌記錄和下遊重複數據刪除步驟需要照顧他們。與合並
,你可以避免插入重複的記錄。
合並成日誌使用newDedupedLogs在日誌。uniqueId=newDedupedLogs。uniqueId當不匹配然後插入*
deltaTable。別名(“日誌”)。合並(newDedupedLogs。別名(“newDedupedLogs”),“日誌。uniqueId = newDedupedLogs.uniqueId”)\。whenNotMatchedInsertAll()\。執行()
deltaTable。作為(“日誌”)。合並(newDedupedLogs。作為(“newDedupedLogs”),“日誌。uniqueId = newDedupedLogs.uniqueId”)。whenNotMatched()。insertAll()。執行()
deltaTable。作為(“日誌”)。合並(newDedupedLogs。作為(“newDedupedLogs”),“日誌。uniqueId = newDedupedLogs.uniqueId”)。whenNotMatched()。insertAll()。執行();
請注意
包含新日誌的數據集需要刪除處理內部本身。通過SQL合並的語義,它匹配和刪除處理新數據與現有的數據表中,但是如果有新的數據集內重複數據,插入。因此,在合並之前刪除處理新的數據表。
如果你知道你可能會重複的記錄隻有幾天,您可以優化您的查詢進一步通過分區表的日期,然後指定目標表的日期範圍相匹配。
合並成日誌使用newDedupedLogs在日誌。uniqueId=newDedupedLogs。uniqueId和日誌。日期>當前日期()- - - - - -時間間隔7天當不匹配和newDedupedLogs。日期>當前日期()- - - - - -時間間隔7天然後插入*
deltaTable。別名(“日誌”)。合並(newDedupedLogs。別名(“newDedupedLogs”),“日誌。uniqueId = newDedupedLogs。uniqueId和日誌。日期>當前日期()- - - - - -時間間隔7天")\。whenNotMatchedInsertAll(“newDedupedLogs。日期>當前日期()- - - - - -時間間隔7天")\。執行()
deltaTable。作為(“日誌”)。合並(newDedupedLogs。作為(“newDedupedLogs”),“日誌。uniqueId = newDedupedLogs。uniqueId和日誌。日期>當前日期()- - - - - -時間間隔7天")。whenNotMatched(“newDedupedLogs。日期>當前日期()- - - - - -時間間隔7天")。insertAll()。執行()
deltaTable。作為(“日誌”)。合並(newDedupedLogs。作為(“newDedupedLogs”),“日誌。uniqueId = newDedupedLogs。uniqueId和日誌。日期>當前日期()- - - - - -時間間隔7天")。whenNotMatched(“newDedupedLogs。日期>當前日期()- - - - - -時間間隔7天")。insertAll()。執行();
這是更有效的比前麵的命令作為副本看起來隻有在過去的7天的日誌,而不是整個表。此外,您可以使用此純插入合並結構化流執行連續重複數據刪除的日誌。
在流媒體查詢,您可以使用合並操作
foreachBatch
不斷編寫任何流數據與重複數據刪除三角洲表。看到下麵的流的例子的更多信息foreachBatch
。在另一個流媒體查詢,你可以不斷地從δ讀取刪除處理數據表。這是有可能的,因為一個純插入合並隻有δ表添加新數據。
慢慢地改變數據(SCD) 2型操作到三角洲表中
三角洲住表原生支持跟蹤和應用SCD 2型。看到改變數據獲取與三角洲生活表。
更改數據寫入一個增量表
類似的化合物,另一個常見的用例,通常被稱為變化數據捕獲(CDC),是適用於所有數據更改來自外部數據庫到三角洲表。換句話說,一組更新,刪除,插入應用到一個外部表需要應用於三角洲表。您可以使用合並
如下。
瓦爾deltaTable:DeltaTable=…/ / DeltaTable模式(關鍵字,值)/ / DataFrame變化後列/ /——關鍵:關鍵的變化/ / -時間:時間之間的變化順序變化(可以取代其他順序id)/ / - newValue:更新或插入的值如果鍵不刪除/ /刪除:真如果鍵被刪除,假如果鍵插入或更新瓦爾changesDF:DataFrame=…/ /找到最新的改變基於時間戳的每個鍵/ /注意:嵌套結構,馬克斯在結構計算/ / max第一結構體字段,如果回到第二個字段,等等。瓦爾latestChangeForEachKey=changesDF。selectExpr(“關鍵”,“結構(時間、newValue刪除)otherCols”)。groupBy(“關鍵”)。gg(馬克斯(“otherCols”)。作為(“最新”))。selectExpr(“關鍵”,“最新*”。)deltaTable。作為(“t”)。合並(latestChangeForEachKey。作為(“s”),”年代。關鍵= t.key”)。whenMatched(“s.deleted = true”)。刪除()。whenMatched()。updateExpr(地圖(“關鍵”- >“s.key”,“價值”- >“s.newValue”))。whenNotMatched(“s.deleted = false”)。insertExpr(地圖(“關鍵”- >“s.key”,“價值”- >“s.newValue”))。執行()
deltaTable=…# DeltaTable模式(關鍵字,值)# DataFrame變化後列#鍵:關鍵的變化#——時間:時間之間的變化順序變化(可以取代其他順序id)# - newValue:更新或插入的值如果鍵不刪除# -刪除:真如果鍵被刪除,假如果鍵插入或更新changesDF=火花。表(“改變”)#找到最新的改變基於時間戳的每個鍵#注意:嵌套結構,馬克斯在結構計算#馬克斯第一結構體字段,如果回到第二個字段,等等。latestChangeForEachKey=changesDF\。selectExpr(“關鍵”,“結構(時間、newValue刪除)otherCols”)\。groupBy(“關鍵”)\。gg(馬克斯(“otherCols”)。別名(“最新”))\。選擇(“關鍵”,“最新*”。)\deltaTable。別名(“t”)。合並(latestChangeForEachKey。別名(“s”),”年代。關鍵= t.key”)\。whenMatchedDelete(條件=“s.deleted = true”)\。whenMatchedUpdate(集={“關鍵”:“s.key”,“價值”:“s.newValue”})\。whenNotMatchedInsert(條件=“s.deleted = false”,值={“關鍵”:“s.key”,“價值”:“s.newValue”})。執行()
增量同步三角洲與源表
在磚SQL和磚的運行時12.1及以上,您可以使用當不匹配通過源
創建任意條件自動刪除和替換表的一部分。這是特別有用,當你有一個源表記錄可能會改變或被刪除後幾天初始數據輸入,但最終解決最終狀態。
下麵的查詢顯示了使用這種模式從源選擇5天的記錄,更新目標匹配記錄,插入新記錄從源到目標,並刪除所有無與倫比的記錄從過去5天的目標。
合並成目標作為t使用(選擇*從源在哪裏created_at> =(當前日期()- - - - - -時間間隔“5”一天))作為年代在t。關鍵=年代。關鍵當匹配然後更新集*當不匹配然後插入*當不匹配通過源和created_at> =(當前日期()- - - - - -時間間隔“5”一天)然後刪除
通過提供相同的布爾濾源表和目標表,你可以動態地傳播變化從源到目標表,包括刪除。
請注意
雖然這種模式可以使用沒有任何條件條款,這將導致完全重寫目標表可以是昂貴的。