取消
顯示的結果
而不是尋找
你的意思是:

注冊一個dataframe來自CDC數據流從生成的臨時視圖中刪除CDC列,即使明確dataframe添加一份列。

Louis_Databrick
新的貢獻者二世
df_source_records.filter (F.col (“_change_type”)。isin(“刪除”,“插入”,“update_postimage”)) .withColumn (“ROW_NUMBER F.row_number () .over(窗口)).filter .withColumn (“ROW_NUMBER = 1”) (“change_indicator F.col (“_change_type”)) .drop (“_commit_version”、“_commit_timestamp”,“ROW_NUMBER”) df_source_records。createOrReplaceGlobalTempView (temporary_view_name)

當從temporary_view_name現在選擇,無論是_change_type列還是change_indicator列可供選擇。為什麼會出現這樣的情況?似乎是一個錯誤嗎?或有解決方案嗎?

謝謝!

2回答2

ajaypanday6781
尊敬的貢獻者二世

嗨@Louis De地表古積

這似乎是一個錯誤。

你能發送你錯誤的片段得到嗎?

# DAIS2023

Louis_Databrick
新的貢獻者二世

現在似乎工作。不知道改變,正如我多次嚐試以這種方式完全did.not.work。

從pyspark.sql。從pyspark.sql進口expr函數。跑龍套進口AnalysisException pyspark.sql進口。函數作為f數據=[(“約翰”,25),(“愛麗絲”,30),(“Bob”, 35)] df =火花。createDataFrame(數據、[“名稱”,“年齡”])試題:ENTER_YOUR_DATASTORE_HERE = =”“路徑”abfss: / / bronze@ + ENTER_YOUR_DATASTORE_HERE + '.dfs.core.windows.net/TEST/test_table' df.write.format .mode(“δ”)(“覆蓋”).save(路徑)除了AnalysisException e:打印(“錯誤在寫dataframe三角洲湖:“,e)試題:火花。sql(“創建數據庫如果不存在測試”)火花。sql(“使用測試”)火花。sql (f”“使用三角洲位置創建表如果不存在test_table{路徑}”" ")除了AnalysisException e:打印(“錯誤在創建目錄條目:e)試題:火花。sql (" " " ALTER TABLE test_table TBLPROPERTIES(δ。enableChangeDataCapture”=“真正的”)”“”)除了AnalysisException e:打印(錯誤而使疾病預防控製中心在桌子上:“,e) def process_batch (df):顯示(df) # CDC列可用。df.createOrReplaceGlobalTempView (test_view) #他們現在似乎出現df_new =火花。sql(“從global_temp.test_view選擇_change_type”)顯示(df_new) streaming_query =火花。readStream \ .format(“δ”)\ .option (“readChangeFeed”,“真正的”)\ .load \ (streaming_query(路徑)。writeStream .foreachBatch(λbatch_df, batch_id: process_batch (df = batch_df) .outputMode(“追加”).trigger(一旦= True) .start () .awaitTermination ())

歡迎來到磚社區:讓學習、網絡和一起慶祝

加入我們的快速增長的數據專業人員和專家的80 k +社區成員,準備發現,幫助和合作而做出有意義的聯係。

點擊在這裏注冊今天,加入!

參與令人興奮的技術討論,加入一個組與你的同事和滿足我們的成員。

Baidu
map