簡化變化數據獲取與數據磚三角洲
得到的早期預覽O ' reilly的新電子書一步一步的指導你需要開始使用三角洲湖
注意:我們也推薦你閱讀高效的插入到數據與磚湖泊三角洲這解釋了使用MERGE命令做高效的插入和刪除。
一個常見的用例,我們遇到在磚是顧客執行變化數據捕獲(CDC)從一個或多個來源為一組磚三角洲的表。beplay体育app下载地址這些來源可能是本地或在雲中,操作性事務存儲,或者數據倉庫。普通膠水,將他們都是變更集生成:
- 使用ETL工具像Oracle GoldenGate或Informatica PowerExchange,
- 從供應商提供的變化表(例如,甲骨文變化數據捕獲),或
- 用戶維護數據庫表,捕獲變更集使用插入/更新/刪除觸發器
和他們希望合並這些變更集成磚三角洲。基於我們的經驗執行這個用例在公共和私人部門的客戶,我們提供一個參考體係結構來執行中心使用特性今天在磚三角洲。beplay体育app下载地址
背景
變化數據捕獲或疾病預防控製中心,簡言之,是指捕捉變化的過程一組數據源和合並在一組目標表,通常在一個數據倉庫。這些通常是刷新每晚每小時,或者在某些情況下,設置(例如,每15分鍾)。我們稱這一時期為刷新周期。
給定表的組改變記錄在一個刷新周期稱為一個變更集。最後,我們將記錄在一個變更集的集合作為一個記錄集具有相同的主鍵。直覺這是指不同的變化最終表中相同的記錄。
國旗 | ID | 價值 | CDC_TIMESTAMP |
---|---|---|---|
我 | 1 | 10 | 2018-01-01 16:02:00 |
U | 1 | 11 | 2018-01-01 16:02:01 |
D | 1 | 11 | 2018-01-01 16:02:03 |
U | 2 | 20. | 2018-01-01 16:02:00 |
D | 3 | 30. | 2018-01-01 16:02:00 |
表1:更改集C表2018-01-01 17:00:00時刻T
表1顯示了一個示例更改集C表T在給定的一段時間。更改集C有四個列:
- 一個標誌指示是否變化類型的I / U / D(插入/更新/刪除),
- 一個ID列惟一地標識記錄集,
- 一個值列更改記錄被更新時,和
- CDC_TIMESTAMP顯示插入/更新/刪除記錄時。目標表T除了國旗列有相同的模式。
在這種變更集,記錄ID 1插入、更新和刪除(行1、2和3)。這樣的記錄設置為ID = 1有三個記錄。記錄ID 2隻是更新,記錄ID 3被刪除。它是安全的假設插入記錄ID 2和3在某種程度上。
CDC磚前三角洲
前三角洲,樣本疾病預防控製中心管道我們的一些客戶是:甲骨文Informatica = > = >引發beplay体育app下载地址夜間批處理作業= >磚。
在這個場景中,Informatica推動變更集30多個不同的數據來源並鞏固他們在Oracle數據倉庫。大約一天一次,磚的工作從Oracle檢索這些變更集,通過JDBC、磚和刷新表。雖然這計劃成功productionized,它有兩個主要的缺點:
- 它已經超載的Oracle實例添加載荷,導致限製,這些ETL作業如何運行時,和
- 刷新率最好是在夜間,由於並發局限性的香草拚花表(磚前三角洲)。
疾病預防控製中心與磚δ
與磚三角洲,CDC管道現在是流線型的,可以更頻繁地刷新:Informatica = > S3 = >δ= >火花每批工作。在這個場景中,Informatica寫變更集直接使用Informatica S3的鑲花的作家。磚工作運行所需的sub-nightly刷新率(例如,每隔15分鍾,每小時,每3小時,等等)來讀取這些變更集和更新目標磚三角洲的表。
微小的變化,這管道也適應閱讀來自卡夫卡疾控中心記錄,因此,管道就像卡夫卡= > = >δ火花。在本節的其餘部分,我們詳細說明這個過程,以及我們如何使用磚三角洲作為沉疾控中心工作流程。
與我們的一個客戶,我們實現了這些beplay体育app下载地址疾病預防控製中心技術最大和最頻繁刷新ETL管道。在這個客戶場景中,Informatica寫道一個變更集S3為65表,有任何變化每15分鍾。而變更集本身是相當小的(使用插入覆蓋
這種方法的基本思想是保持暫存表,對於一個給定的記錄集的所有更新和積累最後表,其中包含當前最新的快照,用戶可以查詢。
圖1:插入覆蓋流從源Informatica磚三角洲的雲存儲
對於每一個刷新周期,火花作業將運行兩個INSERT語句。
- 插入(插入1):閱讀S3的變更集或卡夫卡在這個刷新周期,這些更改並將其插入暫存表。
- 插入覆蓋(插入2):獲取當前版本的每一個從暫存表記錄集和最終的表中覆蓋這些記錄。
圖2:插入覆蓋流從源到卡夫卡結構化流磚三角洲
一個熟悉的分類方案中心從業人員是不同的類型ala處理更新緩慢變化維度(SCDs)。我們暫存表映射接近SCD 2型方案而最終表地圖最接近一個SCD 1型方案。
實現
讓我們深入了解兩個步驟,首先第一個插入。
%scalaval變更集=數組(file1 file2,…)spark.read。拚花(變更集:_*).createOrReplaceTempView(“增量”)%sql插入成T_STAGING分區(CREATE_DATE_YEAR)選擇ID,價值,CDC_TIMESTAMP從增量
這裏,第一個單元格定義一個臨時視圖在美聯儲的變更集插入
在第二單元。的插入
相當簡單的異常分區
條款,讓我們花一些時間來打開。
回想一下,在雲數據存儲和HDFS,記錄存儲在文件和一個更新的單位為一個文件。磚的δ,這是檢查機關文件,提出了帖子。記錄需要更新時,火花需要讀取和改寫整個文件。因此,重要的是本地化更新盡可能少的文件。因此,我們分區分段和最後一個表的列,最小化的行數感動在疾控中心,並提供分區的分區列規範(Azure|AWS),這樣磚三角洲可以插入的記錄在正確的分區T_STAGING
。
接下來,我們看看第二個插入。
%sql插入覆蓋表T_FINAL分區(CREATE_DATE_YEAR)選擇ID,價值,CDC_TIMESTAMP從(選擇一個。*,排名()在(分區通過ID訂單通過CDC_TIMESTAMPDESC)作為RNK從T_STAGING。*在哪裏CREATE_DATE_YEAR在(2018年,2016年,2015年)B)在哪裏B.RNK=1和B.FLAG' D '
讓我們從內部開始查詢讀取T_STAGING
。回想一下,staging表可能有任意數量的插入、更新和刪除操作對於一個給定的記錄集。這些變化可能來自一個給定的變更集(例如,ID = 1
表1中有3個變化),或者它可能遇到變更集,因為它們插入分級表跨多個刷新時間。內排名
隨著外過濾器B.RNK=1和B.FLAG' D '
確保:
- 我們隻選擇最近的變化對於給定的記錄集,和
- 最近的變化是一個在哪裏
' D '
,我們排除整個紀錄被插入在最後的表,從而實現刪除記錄的目的。
接下來,注意到在CREATE_DATE_YEAR (…)
條款。這與分區(CREATE_DATE_YEAR)
在外層查詢確保磚δ隻會覆蓋這些分區,即2018年,2016年,2015年,其餘被擱淺。值得一提的是,雖然我們提供硬編碼值以上分區清醒,在實際的實現中,這些分區提供了一個Scala從查詢列表動態生成的變更集,等
val partitionsToOverwrite = spark.sql (“選擇年(to_date (create_date。”MM / dd / yyyy“從增量))”)…spark.sql (s”“”插入覆蓋T_FINAL…在($ {CREATE_DATE_YEAR partitionsToOverwrite.mkString (", "))…”“”)
性能
正如上麵提到的,磚三角洲CDC管道可以並發運行與用戶查詢一致的數據視圖。在這裏,我們顯示兩個特性在磚三角洲,可用於優化讀者和作家。
- 分區修剪:在第二個插入(即以上。,the writers), the query optimizer in Databricks Delta looks at the
分區
規範和在
列表中在哪裏
條款隻讀取和改寫那些需要更新的分區。在實踐中,這可以很容易地減少表的部分接觸到一半或者,通常情況下,低得多,從而幫助第二插入,同時,本地化更新T_FINAL
,選擇
查詢T_STAGING
。 - 數據跳/ ZORDER索引:用戶查詢
T_FINAL
的範圍可以從BI工具特別的SQL查詢。在這裏,可能有也可能沒有分區列的查詢CREATE_DATE_YEAR
在在哪裏
條款。例如,
%sql選擇…從T_FINAL在哪裏COL1=瓦爾和COL2=瓦爾
在這種情況下,既不COL1
也不COL2
是分區規範的一部分。然而,用戶可以創建一個z值指數這兩個列:
優化T_FINAL ZORDER (COL1, COL2)
下麵,磚三角洲集群鑲花的文件的z值,查詢等上麵的聯係隻有那些文件,可能包含COL1 =瓦爾
和COL2 =瓦爾
。
我們注意到兩個細節的z值指數擴大查詢的列表,他們可以使用
- 在上麵的案例中,隻有過濾查詢
COL1
(或者,,COL2
)也可以受益於該指數以來,不同,複合索引的RDBMS, z值指數並不偏向查詢過濾器在前綴索引列的列表。 - 不像上麵,如果查詢分區列上也有一個過濾器,然後分區修剪和z順序索引可以大大減少文件數量在查詢時。
我們稱這個優秀的讀者帖子有關數據為什麼以及如何跳過和z值索引有或沒有分區修剪工作。
並發性
提出了在早些時候帖子,磚三角洲將事務支持添加到雲存儲。我們依靠這種支持在以下方式。而覆蓋分區,磚三角洲將確保除了創建新的鑲花文件,離開舊鋪文件在用戶查詢並發運行數據。查詢開始覆蓋完成後將新數據。三角洲可靠地使用事務日誌查詢指向一致的數據版本。
壓實和清理
隨著時間的推移,這兩個T_STAGING
和T_FINAL
積累過期和未使用的記錄。例如,任何記錄T_STAGING
在哪裏等級> 1
,或任何文件T_FINAL
標誌著陳舊的覆蓋該文件。而這並不影響查詢的正確性,降低疾病預防控製中心和查詢性能。值得慶幸的是,這些維護任務在磚三角洲的簡化。清除舊的文件T_FINAL
例如,是那麼簡單
%sql真空T_FINAL
沒有保留參數(見真空文檔:Azure|AWS),該清洗所有過期文件,不再是在事務日誌和7天以上,這是足夠的時間來確保沒有並發讀者訪問這些文件。
打掃屋子上T_STAGING
,另一方麵,包括刪除所有記錄等級> 1
。最簡單的方法就是複製T_FINAL
成T_STAGING
%sql插入覆蓋T_STAGING選擇*從T_FINAL
上麵的命令和前麵所示優化
命令可以組織成一個筆記本維護任務和調度運行磚的工作。
作為即將到來的磚運行時的5.0發行版的一部分,使用並入可能成為另一個偉大的方法由於磚三角洲的性能改進和支持刪除(D)記錄。
Productionizing管道
磚作為一個平台不僅有助於開發和構Beplay体育安卓版本建ETL productionizing這些管道管道也加速時間。在這裏,我們描述了兩個特性和使能技術在Apache火花幫助我們productionize CDC管道。
配置驅動編程
構建大型應用程序中的一個常見設計模式是使用配置驅動軟件行為(例如,YAML或基於json配置文件)。火花支持SQL +通用編程語言像Scala和Python是適合這個設計模式,因為配置可以存儲在表和動態SQL構建使用它。讓我們看看這將在疾控中心工作背景。
首先,回想一下,我們疾控中心管道有65表。我們保持一個配置
表,每一行是一個65年的表,和領域幫助我們構建CDC SQL語句。
表 | PARTITION_COLUMN_EXPRESSION | PARTITION_COLUMN_ALIAS | RANK_EXPRESSION | IS_INSERT_ONLY |
---|---|---|---|---|
T1 | (to_date (create_date MM / dd / yyyy)) | create_date_year | 分區通過ID CDC_TIMESTAMP DESC秩序 | N |
T2 | (to_date (transaction_date MM / dd / yyyy)) | transaction_date_year | 分區通過ID1、ID2 CDC_TIMESTAMP DESC秩序 | N |
T3 | 零 | 零 | 零 | Y |
T4 | … | … | … | … |
表2 -配置表為推動疾控中心管道的一組表
為一個特定的配置信息表和執行CDC邏輯表,使用以下代碼。
val hiveDb =“mydb”val CONFIG_TABLE =“配置”/ /表是一個筆記本輸入小部件val表=年代" " " $ {dbutils.widgets.get (wTable)}“”“瓦爾(partitionColumnExpression partitionColumnAlias、rankExpression isInsertOnly) = spark.sql (s”“”選擇PARTITION_COLUMN_EXPRESSION、PARTITION_COLUMN_ALIAS RANK_EXPRESSION IS_INSERT_ONLY從$ {hiveDb}。$ {CONFIG_TABLE}TABLE_NAME =低(“美元表”)”“”)。作為[(字符串,字符串,字符串,布爾)].head
…
/ **插入1上麵的樣子。在這裏,表*變量是集對T1或T2從配置表* /spark.sql (s”“”表插入$ {}_STAGING分區($ {partitionColumnAlias)選擇$ {projectListFromIncremental}從增量”“”)
…/ /插入2可能看起來像val partitionsToOverwrite = spark.sql (s”“”選擇不同的$ {partitionColumnExpression}從增量”“)。作為[String] .collectspark.sql (s”“”表插入覆蓋表$ {}_FINAL分區($ {partitionColumnAlias})選擇$ {projectListFromIncremental}(選擇一個。*,排名()在(${rankExpression}) AS RNK從${表}_STAGING a *在$ {partitionColumnAlias} ($ {partitionsToOverwrite.mkString (", "))B)在B。RNK=1和B.FLAG' D '”“”)
筆記本工作流程和工作
說,上麵的是實現一個筆記本ProcessIncremental
。我們可以使用筆記本的工作流和有一個控製器筆記本通過的65個表,發現優秀的變更集,和電話ProcessIncremental
在他們身上。
val startDate可以=“20180101”val表= spark.sql (s”“”選擇TABLE_NAME從hiveDb。CONFIG_TABLE美元”“”)。作為[String] .collect。地圖(_.toLowerCase)
表。foreach{台= >val processTheseChangeSets = dbutils.notebook.run (“GetNextChangeSets”,0地圖(“wHiveDb”- > hiveDb,“wTable”- >資源,“wStartDate”- > startDate可以))如果(! processTheseChangeSets.isEmpty) {val統計= dbutils.notebook.run (“ProcessIncremental”,0地圖(“wHiveDb”- > hiveDb,“wIncrFiles”- > processTheseChangeSets,“wTable”- >(資源)))}
控製器的筆記本可以很容易地計劃作為一個工作在磚CDC管道運行所需的頻率。最後,盡管上述循環係列,它可以很容易地更改為一個平行的循環使用,說,.par成語把串行的收藏並行集合,或者使用Scala的未來。
結論
在這個博客中,我們提出了一個參考架構合並成磚三角洲,變更集捕獲通過疾控中心的工具(例如,Oracle GoldenGate或Informatica PowerExchange),或通過改變表維護的供應商(如Oracle變化數據捕獲),或通過改變表維護的用戶使用插入/更新/刪除觸發器。我們引入了火花SQL用於反映這些記錄在磚三角洲,兩個性能注意事項(分區和z順序索引),和輔助因素,如壓實和清理,以確保終端用戶查詢的表優化讀取。然後我們看到磚有助於加速的發展這兩個ETL管道通過支持配置驅動的編程,並使用筆記本productionizing這些工作流工作流和工作。
訪問在線三角洲湖中心要了解更多,請下載最新的代碼,並加入三角洲湖社區。