我有一個過程,從json青銅表加載數據。然後添加一個列並創建一個銀色的表。但銀表NULL值,有青銅表中的值。過程如下:
def load_to_silver(來源、表):
file_format = ' json '
context_path = f“/ mnt / company-data-landing /{來源}/{表}/ '
#模式演化將救援-新模式打破列將存儲在獲救,而不是失敗的管道運行。
# backfillInterval確保任何遺漏文件最終加工(每周)。
cloudfile = {
“cloudFiles。schemaEvolutionMode”:“救援”,
“cloudFiles。schemaLocation”: f / mnt / ecologi-data-landing /{來源}/{表}/模式/ ',
“checkpointLocation”: f ' / mnt / ecologi-data-landing /{來源}/{表}/檢查站”,
“schemaInference.sampleSize。numFiles”:“200”
“cloudFiles。形式at': file_format,
“columnNameOfCorruptRecord”:“_corrupt_record”,
“triggerOnce”:“真正的”,
“cloudFiles。backfillInterval”:“1周”
}
#創建一個實現分段表返回的記錄。
@dlt.table (name = f 'bronze_{來源}_{表}”,
評論= f '{來源}_{表}從著陸加工,銅流表。附加的數據,沒有變化。”
)
def bronze_billing_object ():
df = spark.readStream.format \ (“cloudFiles”)
.options (* * cloudfile) \
.load context_path \
.withColumn (“load_date”,點燃(datetime.datetime.now ())) \
.withColumn (“source_file input_file_name ())
返回df
dlt。create_streaming_live_table (name = f 'silver_staging_{來源}_{表}”,
評論= f '{來源}_{表}從青銅流表處理,銀流表/視圖。\
數據合並,而不是附加。
)
如果f“{表}”= =“AddonsOnSubcription”:
dlt.apply_changes (
目標= f 'silver_staging_{來源}_{表}”,
源= f 'bronze_{來源}_{表}”,
鍵= [' subscriptionId '),
sequence_by =坳(“createdAt”),
stored_as_scd_type = 1
)
elif f“{表}”= =“更新”:
dlt.apply_changes (
目標= f 'silver_staging_{來源}_{表}”,
源= f 'bronze_{來源}_{表}”,
鍵= [' invoiceId '),
sequence_by =坳(“createdAt”),
stored_as_scd_type = 1
)
其他:
dlt.apply_changes (
目標= f 'silver_staging_{來源}_{表}”,
源= f 'bronze_{來源}_{表}”,
鍵= (“id”),
sequence_by =坳(“createdAt”),
stored_as_scd_type = 1
)
@dlt.table (name = f 'silver_{來源}_{表}”,
評論= f '{來源}_{表}從銀分段處理流表/視圖,銀不分段,批住表。\
Dtypes應用。數據複製從上一頁。
)
def silver_billing_object ():
df1 = dlt.read (f 'silver_staging_{來源}_{表}”)
df2 = df1.withColumn (“createdAt坳(createdAt) .cast(“時間戳”))
返回(df2)
在table_list表:
load_to_silver (notebook_context表)