我想設置三角洲生活表管道攝取銅和銀表數據。銅和銀是分開的模式。
這將引發的的日常工作。看起來好當設置為連續運行,但失敗時觸發。
表:
datasource.bronze.customer
datasource.silver.customerview
datasource.silver.customer
目前統一目錄集成與達美住表需要有獨立的管道進行銅銀模式和模式。因此我開始通過複製數據從青銅(datasource.bronze.customer)銀(datasource.silver.customerview)作為管道不能直接引用一個表創建以外的管道。從表(datasource.silver.customerview)我將更改應用到銀表(datasource.silver.customer)
我的設置是在銀管道與以下錯誤:
“流動客戶沒有致命。發生一個錯誤,因為我們檢測到一個更新或刪除一個或多個源表中的行。流表可能隻使用擴展流源。如果你希望刪除或更新行源表將來,請轉換表客戶現場表,而不是現場直播表。為了解決這個問題,執行全表customer刷新。全麵刷新將試圖清除所有數據從表customer,然後加載所有數據流的來源。
non-append變化可以發現版本16。
操作:寫
用戶名(未指定):
源表名稱:customerview”
任何建議這個錯誤或正確地設置這個dlt管道的方法嗎?
或者隻是一個例子/模板/演示如何設置這個統一目錄管道和three-level-namespace會感謝。
代碼:
#為加州大學解決方案,直到三級名稱空間創建臨時銀表@dlt可用。表(name = viewNameSilver) def create_silver_temp_view():返回spark.table (f 'datasource.bronze。{tableNameBronze}”) #創建目標表定義# create_streaming_live_table()——棄用# create_target_table()——棄用dlt。create_streaming_table (name = tableNameSilver評論= f“幹淨、合並{tableNameSilver}”, table_properties ={“質量”:“銀”、“pipelines.autoOptimize。管理”:“真正的”})#表dlt scd2適用於銀。apply_changes(目標= tableNameSilver源= viewNameSilver鍵= primaryKeyCol sequence_by =坳(“__EffectiveStartDate”), except_column_list = [' __EffectiveStartDate '], stored_as_scd_type = sCDType)
謝謝。因為我在觸發模式下運行這個感覺再處理現有的文件。我想知道如果我遺漏了什麼東西在我的青銅表的定義?不應該隻處理新文件(隻要我不做一個完整的刷新)?現有的文件不會被改變,它應該在每天收到的新文件。
這是我的青銅的代碼:
@dlt。表(name = tableNameBronze評論=“原始數據從青銅攝取”,table_properties = {" myCompanyPipeline。”:“青銅”、“pipelines.autoOptimize質量。管理”:“真正的”})def create_bronze():返回(火花。readStream .format (cloudFiles) .option .option (“cloudFiles.inferColumnTypes”,“假”)(“cloudFiles。schemaHints”, schemaHintsBronze) .option (“cloudFiles。形式at", sourceFormat) .option("cloudFiles.schemaEvolutionMode","rescue") .option("cloudFiles.rescuedDataColumn","__RescuedData") .option("pathGlobfilter", fileNamePrefix) .load(dataPathBronze) .select( "*" ,col("_metadata.file_name").alias("__SourceFile") ,current_timestamp().alias("__IngestionDate") ,to_date(substring(col("_metadata.file_name"), -21, 8),'yyyyMMdd').alias("__EffectiveStartDate") ) )