我讀過三角洲生活表將持續7天的曆史。然而在創建一個流表和使用dlt生活。apply_changes函數。有了此代碼
def run_pipeline (table_name、鑰匙、sequence_by): lower_table_name = table_name.lower @dlt ()。視圖(name = f“{lower_table_name} _schema”,評論=“測試”)def create_raw_schema():返回(spark.read.format .option(“鋪”)(“inferschema”,真的).load (f”s3: / / mybucket /測試/ dbo / {table_name} /”) .limit(10)) #創建表@dlt噓。表(name = f {lower_table_name} _hist”,評論=“測試”)def create_hist_table():返回(spark.readStream.format .option (“cloudFiles (“cloudFiles”)。形式at", "parquet") .schema(dlt.read(f"{lower_table_name}_schema").schema) .load(f"s3://mybucket/test/dbo/{table_name}/") ) #creating current table dlt.create_streaming_live_table( name = f"{lower_table_name}", path = f"s3://mybucket/test/cdc/{table_name}__ct/") dlt.apply_changes( target = f"{lower_table_name}", source = f"{lower_table_name}_hist", keys = keys, sequence_by = col(sequence_by) )
當我試圖訪問任何版本曆史
從dlt選擇*。my_table時間戳的“2022-10-10”
我得到這個消息“不能時間旅行的觀點。”