我有多個事務資源在Azure磚env (MySQL,該軟件,MySQL datadumps)為客戶公司DataLake。他們基本上都是“資源管理”(使用不同的dbms,接收應用程序轉儲等),但我不相信他們保護模式漂移和我想攝取/存儲所有原始數據(因為我缺乏信心的來源,其中一些依賴外部第三方)在一批ETL過程,但隻過程相關的數據。理想情況下,我不想存儲為每個攝入每個源表的副本。
由於數據的多樣性,我不想捕獲它們都以同樣的方式改變,所以我的解決方案是讓一個日誌源表的當前狀態之間的差異和鏡子前麵的攝取。然後我可以工作在選擇如何捕獲變化銀/金在案例的基礎上,通過傳播從登錄到清潔表的變更。
我有下麵的當前DLT的解決方案,但我懷疑是非常低效的,有一個更好的做事的方式利用火花/磚的功能。我也意識到,我可以很容易的跑到一個伯父或分頁問題因為我加載整個表,不流/ batchign,而且我不完全確定DLT表如何處理改變列的數量。
成本是一個顯著的因素(客戶端主要是在捐助資金)工作。我有一個團結Catalog-enabled ws,但核心DLT正在使用。我也擔心持續的DLT日誌表。我不確定這是一個合理的DLT的用例。數據量實際上是很小的(每個源的GBs),但有一個或兩個怪物表。
歡迎任何建議選擇更有效的解決方案!
這些解決方案/線程我一直看:
數據改變飼料:https://docs.www.eheci.com/delta/delta-change-data-feed.html
https://community.www.eheci.com/t5/data-engineering/what-are-the-best-practices-for-change-data-cap..。
https://community.www.eheci.com/t5/data-engineering/when-should-change-data-feed-be-used/td-p/26000
外部管理:https://community.www.eheci.com/t5/data-engineering/how-do-you-capture-change-logs-from-rdms-source..。
//www.eheci.com/blog/2019/07/15/migrating-transactional-data-to-a-delta-lake-using-aws-dm..。
//www.eheci.com/blog/2018/10/29/simplifying-change-data-capture-with-databricks-delta.htm..。
DLT疾病預防控製中心:https://docs.gcp.www.eheci.com/delta-live-tables/cdc.html
類似的討論是ETL事務性的來源:
https://community.www.eheci.com/t5/data-engineering/implement-autoloader-to-ingest-data-into-delta-..。
https://stackoverflow.com/questions/59591536/how-to-compare-two-versions-of-delta-table-to-get-chang..。
這是實際的函數
從輸入導入列表,可選、Dict元組,設置從dataclasses從熊貓進口進口dataclass DataFrame pyspark PandasDf dlt導入。sql進口DataFrame pyspark.sql SparkDf。從pyspark.sql列導入列。功能導入current_timestamp,點燃@dataclass類來源:名稱:str server_url: str用戶:str密碼:str司機:str = " org.mariadb.jdbc。司機“def generate_diff_table(查詢:str table_name: str, mirror_table_location: str, diff_log_location: str,) - >沒有:@dlt。表(name = f“{table_name} _diff_log”,評論= f“更改日誌查詢:查詢{}”,路徑= diff_log_location) def create_diff_table():時間戳:列= current_timestamp()打印(查詢)source_df: SparkDf = spark.read.format (jdbc)。選項(司機=來源。司機,url = f”{來源。server_url}”、查詢=查詢、用戶=來源。用戶、密碼=來源。密碼,useSSL = True, trustServerCertificate = True) .load () mirror_path: str = f”{mirror_table_location}。{table_name} _mirror“#如果表已經攝入,做減法設置當前版本之間的差異和之前的版本和存儲差異作為一個新表(diff_df)如果spark.catalog.tableExists (mirror_path): prev_mirror_df: SparkDf =火花。sql (f“Select * from {mirror_path}”) diff_insert: SparkDf = source_df.subtract (prev_mirror_df)。withColumn (colName =“行動”,坳=點燃(“插入”))。withColumn (colName =“ingest_timestamp坳=點燃(時間戳))diff_del: SparkDf = prev_mirror_df.subtract (source_df)。withColumn (colName =“行動”,坳=點燃(“刪除”))。withColumn (colName =“ingest_timestamp坳=點燃(時間戳))diff_df: SparkDf = diff_insert.union (diff_del)其他:#如果tbale沒有攝入之前,基本上是所有行插入diff_df = source_df。withColumn (colName =“行動”,坳=點燃(“插入”))。withColumn (colName =“ingest_timestamp坳=點燃(時間戳))source_df.write.mode(“覆蓋”).saveAsTable (mirror_path)返回diff_df
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
我的“錯誤”與軟件工程師伴侶主修純粹數學的問題。
他們同意
1)將兩個表到內存中並不是一個好主意,尤其是在那麼diff表也將被舉行在內存中
2)做減法操作集是真的,真的效率低下,特別是看起來pyspark減去是基於SQL除了明顯的不使用索引,所以對基表中的每一行重複開拓整個檢查重複的其他表,然後做兩次。不是可持續的,可伸縮的。
他們的解決方案是一行一行地處理表,並檢查等價。
該解決方案通過表迭代一次,兩類和一個副本,並運行一個查詢每一行,。
我與這個方法主要是擔心我會建立自己的迭代器,而不是利用流媒體平台功能,我不確定我可以保證我的表都有主鍵進行排序。Beplay体育安卓版本
我離開了很多關於自動增量問題和排序的相關性,並與模式漂移等發生了什麼。
第二種方法是使用結構化流perroiw功能,做一個查找源表到目標表的每一行,如果它存在,“上次”添加當前日期列,這樣任何行,刪除過時的“最後檢查”的價值。任何行,不存在與當前日期可以添加一個“添加日期”列。這意味著做一個查找在目標表源表的每一行。潛在目標表索引可以幫助在這裏,我不知道。這也意味著源表和目標表是不同的形狀和我需要排除一點元數據列行操作比較。
任何見解的相對效率的這些函數仍然是受歡迎的。我想試試我的伴侶的方法首先,看看它比較原始的方法。也許如果我剛剛瘋了多餘的時間我會嚐試我的想法和看到的影響是什麼。如果我有任何有趣的結果分享在這裏添加它。
我覺得我做的lto wheel-reinventing,不過,如果有人對這個完全錯誤的”“你認為評論,我洗耳恭聽。
我的“錯誤”與軟件工程師伴侶主修純粹數學的問題。
他們同意
1)將兩個表到內存中並不是一個好主意,尤其是在那麼diff表也將被舉行在內存中
2)做減法操作集是真的,真的效率低下,特別是看起來pyspark減去是基於SQL除了明顯的不使用索引,所以對基表中的每一行重複開拓整個檢查重複的其他表,然後做兩次。不是可持續的,可伸縮的。
他們的解決方案是一行一行地處理表,並檢查等價。
該解決方案通過表迭代一次,兩類和一個副本,並運行一個查詢每一行,。
我與這個方法主要是擔心我會建立自己的迭代器,而不是利用流媒體平台功能,我不確定我可以保證我的表都有主鍵進行排序。Beplay体育安卓版本
我離開了很多關於自動增量問題和排序的相關性,並與模式漂移等發生了什麼。
第二種方法是使用結構化流perroiw功能,做一個查找源表到目標表的每一行,如果它存在,“上次”添加當前日期列,這樣任何行,刪除過時的“最後檢查”的價值。任何行,不存在與當前日期可以添加一個“添加日期”列。這意味著做一個查找在目標表源表的每一行。潛在目標表索引可以幫助在這裏,我不知道。這也意味著源表和目標表是不同的形狀和我需要排除一點元數據列行操作比較。
任何見解的相對效率的這些函數仍然是受歡迎的。我想試試我的伴侶的方法首先,看看它比較原始的方法。也許如果我剛剛瘋了多餘的時間我會嚐試我的想法和看到的影響是什麼。如果我有任何有趣的結果分享在這裏添加它。
我覺得我做的lto wheel-reinventing,不過,如果有人對這個完全錯誤的”“你認為評論,我洗耳恭聽。
這是原始的implementaiton pos,利益(排除在原文減少長度)
#一些黑名單和模式重命名在必要時(由於衝突和/或非法字符)SourceName = str SchemaName = str表= str ColName = str SourceSchemaName = str TargetSchemaName = str excl_schemas: Dict [SourceName、列表[SchemaName]] = {" my_first_source ": [“information_schema”、“sys”、“performance_schema”、“mysql”,“搬運工”),“my_snd_source”: […]}excl_tables: Dict [SourceName Dict [SchemaName,設置[表]]]= {my_first_source: {},“my_snd_source”: {},“data_dump”: {" datadump ": {“table_logstore_huge_log_file}}} # todo:寫入表的記錄已經排除rename_schemas: Dict [SourceName, Dict [SourceSchemaName, TargetSchemaName]] = {" my_first_source ":{“我模式”:“my_schema”、“主要”:“mys_source_main”},}
#創建一個日誌和更新源模式信息的鏡子def python_to_sql_list (py_list:列表(str)) - > str:““python列表轉換成sql-parsable名單”“返回”(“+”,' " . join (py_list) + " ')”schemas_to_exclude:列表(str) = excl_schemas [current_source] excl_schema_str: str = python_to_sql_list (schemas_to_exclude) info_schema_query: str = f”選擇TABLE_SCHEMA、TABLE_NAME COLUMN_NAME, COLUMN_KEY information_schema。列TABLE_SCHEMA不是{excl_schema_str}”generate_diff_table(查詢= info_schema_query table_namem = f”info_sch_ {current_source}”, mirror_table_location = f“pipeline_utils.info_schema_change_logs diff_log_location = f pipeline_utils.info_schema_change_logs) #得到表的一個列表,攝取的鏡子info_schema tables_to_ingest:列表(元組[str、列表(str)]] =火花。sql (f”選擇不同的TABLE_SCHEMA TABLE_NAME pipeline_utils.info_schemas_mirror。{current_source} TABLE_SCHEMA不是{ex_schema_str}”) .rdd.groupByKey () .mapValues(列表).collect ()
#實際攝入n_total: int len (tables_to_ingest) =我:int = 1 #遍曆每個表更新schema_name diff日誌和當地的鏡子,table_list tables_to_ingest: #處理重命名要求schema_name = schema_name如果source_schema_name rename_schemas [current_source]: target_schema_name = rename_schemas [current_source] [source_schema_name]其他:target_schema_nm = schema_nm #得到任何排除表列出,否則返回空集tables_to_exclude:設置(str) = excl_tables [current_source]。get (source_schema_name,()) #處理每個表在table_list table_name:打印(f“{我}/ {n_total}”) + = 1 #跟蹤進展如果table_name不在tables_to_exclude: generate_diff_table(查詢select * from {source_schema_name} = f”。{table_name}”, table_name = f”{schema_name} _ {table_name}”, mirror_table_location = f”銅牌。{target_schema_name}”, diff_log_loc = f”銅牌。{target_schema_name}”)