從pyspark進口SparkContext
從pyspark進口SparkConf
從pyspark.sql。導入類型*
從pyspark.sql。功能導入*
從pyspark。sql進口*
從pyspark.sql。類型進口StringType
從pyspark.sql。功能導入udf
df1 = spark.read.format (csv) .option .load(“標題”、“true”)(“文件:/ / / home / cloudera /數據/ a.csv”)
坳def func_udf (df):
列=列表(df.columns)
如果在列(col):
返回df.col
其他:
返回NULL
func_udf spark.udf.register (“columncheck”)
resultdf = df1。withColumn (“ref_date”, expr(“當國旗= 3那麼其他‘06 may2022 columncheck (ref_date df1)結束”))
resultdf.show ()
這是代碼我試圖檢查是否一個列中存在dataframe如果不是,那麼我必須給零如果是的,那麼我需要給列本身通過使用UDF和UDF的投擲錯誤不知道我做錯了什麼。請幫助如何解決錯誤resultdf dataframe如下拋出錯誤
回溯(最近的電話):
文件“< stdin >”, 1號線,在<模塊>
文件“/ usr /地方/ / python / pyspark / sql / dataframe火花。在withColumn py”, 1849行
返回DataFrame (self._jdf。withColumn (colName col._jc)、self.sql_ctx)
文件“/ usr /本地/火花/ python / lib / py4j-0.10.7-src.zip / py4j / java_gateway。在__call__ py”, 1257行
文件“/ usr /地方/火花/ python / pyspark / sql /跑龍套。py”, 69行,在裝飾
提高AnalysisException (s。分割(':',1)[1],加亮)
pyspark.sql.utils。AnalysisException:“不能解決“df1”給定的輸入列:[ref_date、Salary_qtr4 Salary_qtr3,國旗,名字,Salary_qtr1,薪水,性病,Salary_qtr2, Id,意味著);1號線pos 53; \ n 'Project [Id # 10 # 11,工資# 12,當(cast(國旗# 20 int) = 3)然後其他06 may2022 columncheck (“df1 ref_date # 13)結束ref_date # 35, Salary_qtr1 # 14, Salary_qtr2 # 15日Salary_qtr3 # 16日Salary_qtr4 # 17日# 18,性病# 19,國旗# 20]\ n + - AnalysisBarrier \ n +關係
請查收附加的文件
AttributeError:“DataFrame”對象沒有屬性“ColumnChecker”
坳def func_udf (df):
列=列表(df.columns)
如果卡紮菲在列:
返回df.col
其他:
df。withColumn(“上校”,點燃(null))
func_udf spark.udf.register (“ColumnChecker”)
ref_date dfg = a.ColumnChecker(一個)
dfg.show ()
這是我的代碼運行
是的,你的理解是正確的,我需要一個列可能會動態地創建或既存的dataframe如果列存在的函數應該返回列作為輸出如果新創建它應該返回null值