穀歌與Delta Lake的數據流集成,用於更改數據捕獲

這是Badal,穀歌beplay娱乐ios和Databricks數據團隊之間的合作帖子。我們感謝Badal的合夥人Eugene Miretsky和高級數據工程師Steven deutcher - kobayashi,以及穀歌的產品經理Etai Margolin的貢獻。

操作數據庫捕獲對理解業務的當前狀態至關重要的業務事務。對業務執行情況的實時洞察使數據團隊能夠根據市場情況快速做出業務決策。

Databricks提供了一個托管雲平台,用於實時分析從源係統(包括運營Beplay体育安卓版本數據庫)收集的數據。與Databricks湖屋平台Beplay体育安卓版本,您可以將所有數據存儲在一個安全、開放的湖屋架構中,該架構結合了最好的數據倉庫和數據湖,以統一您的所有分析和AI工作負載。今天,我們很高興和大家分享我們的搭檔Badal.io的發布穀歌數據流Delta Lake連接器,它為MySQL和Oracle關係數據庫啟用了更改數據捕獲(CDC)。CDC是一個基於軟件的過程,用於識別和跟蹤源數據管理係統(如關係數據庫(RDBMS))中數據的更改。CDC可以在新的數據庫事件發生時連續處理數據,從而提供數據的實時活動。

為什麼基於疾病預防控製中心

基於日誌的CDC是傳統批處理數據攝取的替代方法。它讀取數據庫的本機事務日誌(有時稱為重做或二進製日誌),並在事件發生時通過不斷地將更改流傳輸到目標,提供實時或近實時的數據複製。

CDC提供了以下好處:

  • 簡化的攝入:批處理攝取通常需要對源數據模型有深入的了解,才能處理增量上傳和刪除;數據工程師需要與領域專家一起為每個表配置攝取。CDC減少了吸收新數據集的時間和成本。
  • 實時數據:CDC流以幾秒或幾分鍾的延遲變化,支持各種實時用例,如接近實時的儀表板、數據庫複製和實時分析。
  • 對生產工作負載的幹擾最小:常規的批處理攝取利用數據庫資源來查詢數據,CDC從數據庫的重做或歸檔日誌中讀取更改,從而使資源消耗最小化。
  • 基於事件的架構:微服務可以以事件的形式訂閱數據庫中的更改。微服務可以在保持數據一致性的同時構建自己的視圖、緩存和索引。

為什麼Datastream數據

穀歌Cloud Datastream是一個易於使用的CDC和複製服務,允許您在異構數據庫、存儲係統和應用程序之間可靠地同步數據,並以最小的延遲。

Datastream的好處包括:

  • Serverless,因此不需要提供或管理資源,服務會根據需要自動擴展或縮小。
  • 易於使用的設置和監視經驗實現了超快的時間到價值
  • 安全,具有您期望從穀歌Cloud獲得的私有連接選項和安全性,對源數據庫沒有影響。
  • 準確和可靠具有透明的狀態報告和麵對數據和模式更改時健壯的處理靈活性。
  • 寫入目標的數據被規範化為統一類型的模式.這意味著下遊消費者幾乎完全是來源不明的,這使得它成為一個簡單的解決方案,易於伸縮,以支持廣泛的不同來源。

Datastream是一種無服務器且易於使用的更改數據捕獲(CDC)和複製服務

連接器設計

Badal.io和Databricks合作編寫了一個Datastream三角洲湖接頭

體係結構

Datastream以avro或JSON格式將更改日誌記錄寫入穀歌雲存儲(GCS)文件中的文件。datastream-delta連接器使用Spark結構化流讀取到達的文件,並將它們流到Delta Lake表。

Delta Lake CDC架構,Datastream以avro或JSON格式將更改日誌記錄寫入穀歌雲存儲(GCS)文件中的文件。

連接器為每個源表創建兩個Delta Lake表:

  1. Staging表:該表包含自複製開始以來在源數據庫中所做的每一個更改。每一行表示一個數據流DML語句(插入、更新、刪除)。它可以被重放以重建數據庫在過去任何給定時刻的狀態。下麵是一個staging表的示例。
read_timestamp source_timestamp 對象 source_metadata 有效載荷
2021-05-16
T00:40:05.000
+ 0000
2021-05-16
T00:40:05.000
+ 0000
demo_inventory。
選民
{“表”:“inventory.voters”,“數據庫”:“演示”,
“primary_keys”:“id”,“log_file”:“mysql-bin.000002”,
“log_position”:27105167,“change_type”
:“插入”、“is_deleted”:假}

{" id ":“743621506”、“名稱”:“先生。約書亞·傑克遜","地址":"傑西卡·普萊恩斯公寓106\nWhitestad 567號,HI 51614″,"性別":" t "}
2021-05-16
T00:40:06.000
+ 0000
2021-05-16
T00:40:06.000
+ 0000
demo_inventory。
選民
{“表”:“inventory.voters”,“數據庫”:“演示”,
“primary_keys”:“id”,“log_file”:“mysql-bin.000002”,
“log_position”:27105800,“change_type”:
“更新”、“is_deleted”:假}
{" id ": " 299594688″," name ": " Ronald Stokes ", " address ": " 940 Jennifer Burg Suite 133\nRyanfurt, AR 92355″," gender ": " m "}
2021-05-16
T00:40:07.000
+ 0000
2021-05-16
T00:40:07.000
+ 0000
demo_inventory。
選民
{“表”:“inventory.voters”,“數據庫”:“演示”,
“primary_keys”:“id”,“log_file”:“mysql-bin.000002”,
“log_position”:27106451,“change_type”:
“刪除”、“is_deleted”:假}
{" id ": " 830510405″," name ": " Thomas Olson ", " address ": " 2545 Cruz Branch Suite 552\nWest Edgarton, KY 91433″," gender ": " n "}
  1. 目標表:包含源表的最新快照。
id 的名字 地址 性別 datastream_metadata
_source_timestamp
datastream_metadata
_source_metadata_log
帶有_file
datastream_metadata
_source_metadata_log
_position
207846446 邁克爾•湯普森 508波特山 2021-05-16
T00:21:02.000
+ 0000
mysql-bin.000002 26319210
289483866 勞倫·詹寧斯 03347年布朗群島 t 2021-05-16
T02:55:40.000
+ 0000
mysql-bin.000002 31366461
308466169 帕特裏夏·萊利 991年弗雷德裏克大壩 t 2021-05-16
T00:59:59.000
+ 0000
mysql-bin.000002 27931699
348656975 萊利博士喜怒無常 89422年德溫脊 t 2021-05-16
T00:08:32.000
+ 0000
mysql-bin.000002 25820266
385058605 伊麗莎白·吉爾 728多蘿西鎖 f 2021-05-16
T00:18:47.000
+ 0000
mysql-bin.000002 26226299

連接器將數據攝取分解為一個多步驟過程:

  1. 掃描GCS以發現所有活動表。Datastream將每個表存儲在單獨的子目錄中。
  2. 如果需要,解析表元數據以創建新的Delta Lake數據庫和表。
  3. 為每個表初始化兩個流:
    • 來自GCS源的結構化流
    • 結構化流使用增量表作為
  4. 如果staging和目標表的模式與當前微批處理的模式不同,則修改它。使用Delta Lake自動遷移Staging表模式模式遷移特性,它有一個目標表模式,在執行MERGE語句之前以編程方式修改該模式。
  5. 將更改(針對每個表)流到一個staging表中。staging表是一個僅可追加的表,它存儲更改日誌的行,其中每一行表示一個DML語句(插入、更新、刪除)。
  6. 流來自staging表的更改,並使用Delta Lake將它們合並到最終表中MERGE語句

表元數據發現

Datastream發送每個事件時都帶有操作它所需的所有元數據:表模式、主鍵、排序鍵、數據庫、表信息等。

因此,用戶不需要為他們想要攝取的每個表提供額外的配置。相反,表是自動發現的,並且從每個批處理的事件中提取所有相關信息。這包括:

  1. 表和數據庫名稱
  2. 表模式
  3. 在merge語句中使用的主鍵和排序鍵。

合並邏輯

本節將描述MERGE操作在高級別的工作方式。此代碼由庫執行,而不是由用戶實現。MERGE到目標表的設計需要謹慎,以確保所有記錄都被正確更新,特別是:

  1. 使用主鍵可以正確標識代表相同實體的記錄。
  2. 如果一個微批對同一記錄有多個條目,則隻使用最新的條目。
  3. 通過比較目標表中記錄的時間戳與批處理中的記錄,並使用最新版本,可以正確處理亂序記錄。
  4. 刪除記錄處理正常。

首先,對於每個微批,我們執行如下操作:

SELECT * RANK() OVER (PARTITION BY pkey1, pkey2 ORDER BY source_timestamp, source_metadata.log_file, source_metadata.log_position) AS row_number FROM T_STAGING A.* WHERE row_number = 1

然後執行一個類似於下麵SQL語句的合並操作:

使用staging_table as s ON t. pkey1 = s. pkey1 AND t. pkey2 = s. pkey2 WHEN MATCHED AND t.datastream_metadata_source_timestamp <= s.source_timestamp AND s.source_metadata合並為target_table。is_deleted THEN DELETE WHEN MATCHED AND t.datastream_metadata_source_timestamp <= s.source_timestamp THEN update set t.colA = s.colA, WHEN NOT MATCHED BY TARGET AND {stagingAlias}.{deleteColumn}!=True然後插入(colA)值(colA) *為了可讀性,我們忽略了更新用於排序的元數據列

壓實和清理

流工作負載可能導致寫入的拚花文件的大小低於最佳值。通常,如果數據量不夠大,就需要在寫入更小的文件和增加流延遲之間做出權衡,以便積累更多的數據來寫入。小文件可能會導致讀取和合並性能下降,因為作業需要掃描大量文件。

此外,當更新記錄的新條目覆蓋舊條目時,MERGE查詢往往會導致大量未使用的數據。未使用的記錄不會影響查詢正確性,但會隨著時間的推移降低CDC和用戶查詢性能。

為了減輕這個問題,我們鼓勵用戶采取以下其中一項措施:

  1. 如果使用Databricks管理的集群,最好的選擇是使用自動優化和壓縮優化文件大小
  2. 調度一個查詢,定期調用OPTIMIZE和VACUUM
  3. 在寫入目標表之前,通過設置DELTA_MICROBATCH_PARTITIONS選項,使用連接器的內置特性來合並分區。這是一個使用Databrick自動優化的簡化版本(效率較低)。

為什麼三角洲湖要建湖屋

三角洲湖是一個開源項目,建立可靠的數據湖,您可以輕鬆地管理和擴展到數十億個文件。Delta Lake采用開源Apache Parquet作為雲對象存儲數據的柱狀文件格式,包括穀歌cloud storage (GCS)、Azure Blob storage、Azure data Lake storage (ADLS)、AWS S3 (Simple storage Service)和Hadoop Distributed file System (HDFS)。成千上萬的組織使用Delta Lake作為其企業數據和分析平台的基礎。Beplay体育安卓版本通過Delta Lake的以下特性來實現數據湖的可靠性、可擴展性和治理:

  • 用於Apache Spark工作負載的ACID事務:可序列化的隔離級別確保多個並發讀取器和寫入器可以並行操作,並且永遠不會看到不一致的數據。支持合並、更新和刪除操作,以支持複雜的用例,如更改數據捕獲、緩慢更改維度(SCD)操作和流上serts。
  • 可擴展的元數據處理:可以輕鬆處理由數十億個分區和文件組成的大型表。
  • 模式實施:讀時的模式對於某些用例是有用的,但這可能導致較差的數據質量和報告異常。Delta Lake提供了指定模式並實施它的能力。
  • 審計曆史:事務日誌記錄對數據的所有更改,提供對執行的操作、由誰執行、何時執行等的完整審計跟蹤。
  • 時間旅行:數據版本支持回滾,以便在某個時間點恢複來恢複數據。

Delta Lake與Apache Spark api完全兼容,因此您可以將其與現有的數據管道一起使用,隻需進行最小的更改。Databricks提供了一個托管雲服務來構建您的數據湖,並運行您的分析工作負載,為Delta lake提供了幾個額外的性能特性:

  • 光子執行引擎:新的執行引擎,提供非常快的性能,並與Apache Spark api兼容。
  • 數據跳過索引:創建文件級統計信息,以避免掃描不包含相關數據的文件。想象一下,有數百萬個包含銷售數據的文件,但隻有十幾個文件包含您需要的實際信息。使用數據跳過索引,優化器將確切地知道要讀取哪些文件並跳過其餘文件,從而避免對數百萬個文件進行全麵掃描。
  • 文件壓縮(bin-packing):通過將小文件合並為大文件來提高讀查詢的速度。數據湖可能會積累大量小文件,特別是在數據流和增量更新時。小文件會導致讀操作變慢。通過壓縮將小文件合並為較少的大文件是實現快速讀訪問的關鍵數據湖維護技術。
  • z - ordered:對同一組文件中的相關字段進行排序,以減少需要讀取的數據量。
  • Bloom Filter Indexes:快速搜索數十億行,測試集合中某個元素的成員關係。

Delta Lake是一個開源項目,旨在構建可靠的數據湖,您可以輕鬆管理並擴展到數十億個文件。

要開始,請訪問穀歌數據流Delta Lake連接器GitHub的項目。如果您還沒有Databricks帳戶,請嚐試Databricks免費的

免費嚐試Databricks 開始

報名

Baidu
map