df_source_records.filter (F.col (“_change_type”)。isin(“刪除”,“插入”,“update_postimage”)) .withColumn (“ROW_NUMBER F.row_number () .over(窗口)).filter .withColumn (“ROW_NUMBER = 1”) (“change_indicator F.col (“_change_type”)) .drop (“_commit_version”、“_commit_timestamp”,“ROW_NUMBER”) df_source_records。createOrReplaceGlobalTempView (temporary_view_name)
當從temporary_view_name現在選擇,無論是_change_type列還是change_indicator列可供選擇。為什麼會出現這樣的情況?似乎是一個錯誤嗎?或有解決方案嗎?
謝謝!
現在似乎工作。不知道改變,正如我多次嚐試以這種方式完全did.not.work。
從pyspark.sql。從pyspark.sql進口expr函數。跑龍套進口AnalysisException pyspark.sql進口。函數作為f數據=[(“約翰”,25),(“愛麗絲”,30),(“Bob”, 35)] df =火花。createDataFrame(數據、[“名稱”,“年齡”])試題:ENTER_YOUR_DATASTORE_HERE = =”“路徑”abfss: / / bronze@ + ENTER_YOUR_DATASTORE_HERE + '.dfs.core.windows.net/TEST/test_table' df.write.format .mode(“δ”)(“覆蓋”).save(路徑)除了AnalysisException e:打印(“錯誤在寫dataframe三角洲湖:“,e)試題:火花。sql(“創建數據庫如果不存在測試”)火花。sql(“使用測試”)火花。sql (f”“使用三角洲位置創建表如果不存在test_table{路徑}”" ")除了AnalysisException e:打印(“錯誤在創建目錄條目:e)試題:火花。sql (" " " ALTER TABLE test_table TBLPROPERTIES(δ。enableChangeDataCapture”=“真正的”)”“”)除了AnalysisException e:打印(錯誤而使疾病預防控製中心在桌子上:“,e) def process_batch (df):顯示(df) # CDC列可用。df.createOrReplaceGlobalTempView (test_view) #他們現在似乎出現df_new =火花。sql(“從global_temp.test_view選擇_change_type”)顯示(df_new) streaming_query =火花。readStream \ .format(“δ”)\ .option (“readChangeFeed”,“真正的”)\ .load \ (streaming_query(路徑)。writeStream .foreachBatch(λbatch_df, batch_id: process_batch (df = batch_df) .outputMode(“追加”).trigger(一旦= True) .start () .awaitTermination ())