謝謝你的輸入。我以前試過自動加載程序,但停止了,頭在我的csv文件包含空格和非法字符。因此,錯誤
AnalysisException:發現無效的字符(s)在",{}()\ n \ t = "模式的列名。請啟用設置表屬性的delta.columnMapping列映射。模式的“名字”。有關更多細節,請參考https://docs.microsoft.com/azure/databricks/delta/delta-column-mapping或者你可以使用別名來重命名它。
通過一個選項來設置列映射到名稱並沒有解決它
.writeStream .format(“δ”).option (“checkpointLocation checkpoint_location) .option .option (“optimizeWrite”、“True”) (“schemaEvolutionMode”、“addNewColumns”) .option (“delta.columnMapping。模式”、“名稱”).trigger(一旦= True) .toTable (table_name))
創建映射的目標表的名字開始流之前沒有幫助,我有相關的錯誤消息模式不匹配。
我不能想到的任何方式80 +重命名列“動態”。
供參考,任何人都麵臨相同的問題。所有在線的例子使用自動加載程序被編寫為一個塊語句的形式:
(spark.readStream.format .option (“cloudFiles (“cloudFiles”)。形式at", "csv") # The schema location directory keeps track of your data schema over time .option("cloudFiles.schemaLocation", "") .load("") .writeStream .option("checkpointLocation", "") .start("
解決方案就是這個分割成三個,如下所示
df = (spark.readStream.format .option (“cloudFiles (“cloudFiles”)。形式at", "csv") # The schema location directory keeps track of your data schema over time .option("cloudFiles.schemaLocation", "") .load("")) for c in df.columns: df = df.withColumnRenamed(c, c.replace(" ", "_").replace("(","%28").replace(")","%29").replace("/","%2F")) df.writeStream .option("checkpointLocation", "") .start("