取消
顯示的結果
而不是尋找
你的意思是:

用稍微不同的模式加載CSV文件

MRTN
新的貢獻者三世

我有一個CSV文件由係統生成,模式已經發展多年來的地方。添加了一些列,至少一列已經重命名新文件。有什麼辦法可以優雅的這些文件加載到dataframe嗎?

我曾經嚐試過spark.read.csv()使用不同的選項。我的下一個想法是使用熊貓加載單個文件,使用applyInPandas可能。

任何思想或想法嗎?

4回複4

karthik_p
尊敬的貢獻者

@Morten Stakkeland如果我沒有錯,你想處理模式變化從源到目標。你能請檢查模式演化的磚嗎

ajaypanday6781
尊敬的貢獻者二世

嗨@Morten Stakkeland,

請參考下麵的博客,也許能幫助你,

配置模式推理和進化自動加載器|磚在AWS上

MRTN
新的貢獻者三世

謝謝你的輸入。我以前試過自動加載程序,但停止了,頭在我的csv文件包含空格和非法字符。因此,錯誤

AnalysisException:發現無效的字符(s)在",{}()\ n \ t = "模式的列名。請啟用設置表屬性的delta.columnMapping列映射。模式的“名字”。有關更多細節,請參考https://docs.microsoft.com/azure/databricks/delta/delta-column-mapping或者你可以使用別名來重命名它。

通過一個選項來設置列映射到名稱並沒有解決它

.writeStream .format(“δ”).option (“checkpointLocation checkpoint_location) .option .option (“optimizeWrite”、“True”) (“schemaEvolutionMode”、“addNewColumns”) .option (“delta.columnMapping。模式”、“名稱”).trigger(一旦= True) .toTable (table_name))

創建映射的目標表的名字開始流之前沒有幫助,我有相關的錯誤消息模式不匹配。

我不能想到的任何方式80 +重命名列“動態”。

MRTN
新的貢獻者三世

供參考,任何人都麵臨相同的問題。所有在線的例子使用自動加載程序被編寫為一個塊語句的形式:

(spark.readStream.format .option (“cloudFiles (“cloudFiles”)。形式at", "csv") # The schema location directory keeps track of your data schema over time .option("cloudFiles.schemaLocation", "") .load("") .writeStream .option("checkpointLocation", "") .start("

解決方案就是這個分割成三個,如下所示

df = (spark.readStream.format .option (“cloudFiles (“cloudFiles”)。形式at", "csv") # The schema location directory keeps track of your data schema over time .option("cloudFiles.schemaLocation", "") .load("")) for c in df.columns: df = df.withColumnRenamed(c, c.replace(" ", "_").replace("(","%28").replace(")","%29").replace("/","%2F")) df.writeStream .option("checkpointLocation", "") .start("

歡迎來到磚社區:讓學習、網絡和一起慶祝

加入我們的快速增長的數據專業人員和專家的80 k +社區成員,準備發現,幫助和合作而做出有意義的聯係。

點擊在這裏注冊今天,加入!

參與令人興奮的技術討論,加入一個組與你的同事和滿足我們的成員。

Baidu
map