我們的用例是“清理”列名(刪除空格等)攝入使用三角洲住CSV數據表的功能。我們希望使用模式推理能力在攝入模式規範(預先)將不會發生。
探索使用.withColumnRenamed冒犯列但迅速成為棘手的列數超過100。
一些研究後,我們選定了定義一個UDF的用例將.replace字符我們想要刪除列名稱。UDF使用python列表理解;然而,對df的調用。列不返回電梯的列名。
文件“/磚/火花/ python / pyspark / sql /列。在_to_seq py”, 87行
關口=[轉換器(c)在關口c]
文件“/磚/火花/ python / pyspark / sql /列。py”, 87行,在< listcomp >
關口=[轉換器(c)在關口c]
文件“/磚/火花/ python / pyspark / sql /列。在_to_java_column py”, 66行
提高TypeError (
TypeError:無效的論點,而不是一個字符串或列:DataFrame[運行#:字符串,坦克#:字符串,…
這是實現:
- - -
進口dlt
從pyspark.sql。功能導入my_col坳
從pyspark.sql。功能導入my_udf udf
@my_udf
def clean_columns (df):
返回df.select ([my_col (x) .alias (x
.replace (“”,“_”)
.replace (" / ", " ")
.replace (“%”、“pct”)
.replace (“(”、“”)
.replace (")”、“”)
)x df.columns])
@dlt.table (
評論= "測試"
)
def test_rawzone_data ():
回報(
spark.readStream.format (“cloudFiles”)
.option (“cloudFiles。格式”、“csv”)
.option (“pathGlobfilter”、“* . csv”)
.option(“標題”,真的)
.load (s3: / / obfuscated-of-course / etc / etc /管道/ /”)
.transform (clean_columns)
)
變換函數調用“上下文”,並將DataFrame但不是解決類型。名稱空間衝突?有人建議改進UDF或追求一種不同的方法?
謝謝你的任何指針。