從pyspark進口SparkContext
從pyspark進口SparkConf
從pyspark.sql。導入類型*
從pyspark.sql。功能導入*
從pyspark。sql進口*
從pyspark.sql。類型進口StringType
從pyspark.sql。功能導入udf
df1 = spark.read.format (csv) .option .load(“標題”、“true”)(“文件:/ / / home / cloudera /數據/ a.csv”)
坳def func_udf (df):
列=列表(df.columns)
如果在列(col):
返回df.col
其他:
返回NULL
func_udf spark.udf.register (“columncheck”)
resultdf = df1。withColumn (“ref_date”, expr(“當國旗= 3那麼其他‘06 may2022 columncheck (ref_date df1)結束”))
resultdf.show ()
這是代碼我試圖檢查是否一個列中存在dataframe如果不是,那麼我必須給零如果是的,那麼我需要給列本身通過使用UDF和UDF的投擲錯誤不知道我做錯了什麼。請幫助如何解決錯誤resultdf dataframe如下拋出錯誤
回溯(最近的電話):
文件“< stdin >”, 1號線,在<模塊>
文件“/ usr /地方/ / python / pyspark / sql / dataframe火花。在withColumn py”, 1849行
返回DataFrame (self._jdf。withColumn (colName col._jc)、self.sql_ctx)
文件“/ usr /本地/火花/ python / lib / py4j-0.10.7-src.zip / py4j / java_gateway。在__call__ py”, 1257行
文件“/ usr /地方/火花/ python / pyspark / sql /跑龍套。py”, 69行,在裝飾
提高AnalysisException (s。分割(':',1)[1],加亮)
pyspark.sql.utils。AnalysisException:“不能解決“df1”給定的輸入列:[ref_date、Salary_qtr4 Salary_qtr3,國旗,名字,Salary_qtr1,薪水,性病,Salary_qtr2, Id,意味著);1號線pos 53; \ n 'Project [Id # 10 # 11,工資# 12,當(cast(國旗# 20 int) = 3)然後其他06 may2022 columncheck (“df1 ref_date # 13)結束ref_date # 35, Salary_qtr1 # 14, Salary_qtr2 # 15日Salary_qtr3 # 16日Salary_qtr4 # 17日# 18,性病# 19,國旗# 20]\ n + - AnalysisBarrier \ n +關係
所以讓我們保持它的簡單性:
df1 = spark.read.format (csv) .option .load(“標題”、“true”)(“文件:/ / / home / cloudera /數據/ a.csv”)如果df1“上校”。列:df2 =其他df1: df2 = df1。withColumn(“上校”,點燃(沒有).cast (TypeYouWant)
接下來你可以使用你的案子df2等等。
嗨werner感謝輸入但仍拋出錯誤
AnalysisException:列“a”是不存在的。你的意思的嗎?(一個。id,一個。性病,。部門,。國旗,a。意思是,一個。工資,。neg_std,。new_var,。ref_date,。mean20perc a.neg_mean20perc); line 1 pos 57;
這是我更新的代碼我使用case語句.Kindly幫助
坳def func_udf (df):
列=列表(df.columns)
上校的列:
返回坳
其他:
點燃(空)
func_udf spark.udf.register (“ColumnChecker”)
一個=。withColumn (ref_date, expr (f”情況下當國旗= 3 ' {sf_start_dt} '其他ColumnChecker (a, ref_date)結束”))
從pyspark.sql。功能導入*
從pyspark.sql。功能導入坳
進口pyspark.sql。函數作為F
從datetime進口日期,日期時間
導入的時間
從dateutil。relativedelta進口relativedelta
從解析器dateutil進口
從pyspark.sql。窗口導入窗口
從pyspark.sql。導入類型*
進口地區
# - - - - - - - - - -合並語句
df1 = spark.read.format (csv)。選項(“頭”,“真正的”).load (“dbfs: / FileStore / shared_uploads / a.csv”)
df1.show ()
df2 = spark.read.format (csv)。選項(“頭”,“真正的”).load (“dbfs: / FileStore / shared_uploads / b.csv”)
df2.show ()
# - - - - - - - - - -合並語句
sf_start_dt = ' 02 may2022 '
一個= df1
= a.filter(坳(ref_date) = = f“{sf_start_dt}”)
一個= a.drop(名字)
= a.select (“id”、“工資”,“ref_date”、“性病”,“的意思是”)
b = df2
a.createOrReplaceTempView (“”)
b.createOrReplaceTempView (“b”)
= a.join (b,“id”、“外”)
df1 = spark.sql(“選擇tbl1.id(選擇1),從tbl1 tempCol1內連接b tbl2tbl1.id=tbl2.id”)
df2 = spark.sql(“選擇tbl1.id(選擇2)從tbl1 tempCol2離開加入b tbl2tbl1.id=tbl2.id在哪裏tbl2.id為空”)
df3 = spark.sql(“選擇tbl1.id(選擇3)作為tempCol3從b tbl1離開加入tbl2tbl1.id=tbl2.id在哪裏tbl2.id為空”)
= a.join (df1“id”、“外”). join (df2、“id”、“外”). join (df3、“id”、“外”)
一個= a.na.fill (0, ' tempCol1 ')
一個= a.na.fill (0, ' tempCol2 ')
一個= a.na.fill (0, ' tempCol3 ')
一個=。withColumn(“國旗”,合並(坳(tempCol1) +坳(tempCol2) +坳(tempCol3)))
一個= a.drop (“tempCol1”)
一個= a.drop (“tempCol2”)
一個= a.drop (“tempCol3”)
一個= \
.withColumn (“neg_std F.expr (f (std * (1))))
一個= \
.withColumn (“mean20perc F.expr (f(0.20 *意思)))
一個= \
.withColumn (“neg_mean20perc F.expr (f (mean20perc * (1))))
一個= \
.withColumn (“new_var F.expr (f”{sf_start_dt}”))
columnsToDrop = []
selectClause = "
a.createOrReplaceTempView (“”)
一個=火花。sql (“select *”)
從pyspark.sql。類型進口StringType
從pyspark.sql。功能導入udf
列=列表(a.columns)
打印(列)
坳def func_udf (df):
列=列表(df.columns)
如果卡紮菲在列:
返回df
其他:
df。withColumn(“上校”,點燃(null))
func_udf spark.udf.register (“ColumnChecker”)
一個=。withColumn (ref_date, expr (f”情況下當國旗= 3 ' {sf_start_dt} '其他ColumnChecker (a, ref_date)結束”))
一個=。withColumn(‘平衡’,expr (f”情況下當國旗= 3 0結束”))
a.show ()
一個=。withColumn (new_col, expr (f”情況下當國旗= 3 {sf_start_dt}結束”))
a.show ()
work_ppcin_bal2_2019_1 =一個
work_ppcin_bal2_2019_1.show ()
# merge語句結束- - - - - - - - - -
這是完整的長羽毛的代碼