取消
顯示的結果
而不是尋找
你的意思是:

試圖檢查是否一個列中存在dataframe如果不是,那麼我必須給零如果是的,那麼我需要給列本身使用UDF

cuteabhi32
新的貢獻者三世

從pyspark進口SparkContext

從pyspark進口SparkConf

從pyspark.sql。導入類型*

從pyspark.sql。功能導入*

從pyspark。sql進口*

從pyspark.sql。類型進口StringType

從pyspark.sql。功能導入udf

df1 = spark.read.format (csv) .option .load(“標題”、“true”)(“文件:/ / / home / cloudera /數據/ a.csv”)

坳def func_udf (df):

列=列表(df.columns)

如果在列(col):

返回df.col

其他:

返回NULL

func_udf spark.udf.register (“columncheck”)

resultdf = df1。withColumn (“ref_date”, expr(“當國旗= 3那麼其他‘06 may2022 columncheck (ref_date df1)結束”))

resultdf.show ()

這是代碼我試圖檢查是否一個列中存在dataframe如果不是,那麼我必須給零如果是的,那麼我需要給列本身通過使用UDF和UDF的投擲錯誤不知道我做錯了什麼。請幫助如何解決錯誤resultdf dataframe如下拋出錯誤

回溯(最近的電話):

文件“< stdin >”, 1號線,在<模塊>

文件“/ usr /地方/ / python / pyspark / sql / dataframe火花。在withColumn py”, 1849行

返回DataFrame (self._jdf。withColumn (colName col._jc)、self.sql_ctx)

文件“/ usr /本地/火花/ python / lib / py4j-0.10.7-src.zip / py4j / java_gateway。在__call__ py”, 1257行

文件“/ usr /地方/火花/ python / pyspark / sql /跑龍套。py”, 69行,在裝飾

提高AnalysisException (s。分割(':',1)[1],加亮)

pyspark.sql.utils。AnalysisException:“不能解決“df1”給定的輸入列:[ref_date、Salary_qtr4 Salary_qtr3,國旗,名字,Salary_qtr1,薪水,性病,Salary_qtr2, Id,意味著);1號線pos 53; \ n 'Project [Id # 10 # 11,工資# 12,當(cast(國旗# 20 int) = 3)然後其他06 may2022 columncheck (“df1 ref_date # 13)結束ref_date # 35, Salary_qtr1 # 14, Salary_qtr2 # 15日Salary_qtr3 # 16日Salary_qtr4 # 17日# 18,性病# 19,國旗# 20]\ n + - AnalysisBarrier \ n +關係

1接受解決方案

接受的解決方案

werners1
尊敬的貢獻者三世

所以讓我們保持它的簡單性:

df1 = spark.read.format (csv) .option .load(“標題”、“true”)(“文件:/ / / home / cloudera /數據/ a.csv”)如果df1“上校”。列:df2 =其他df1: df2 = df1。withColumn(“上校”,點燃(沒有).cast (TypeYouWant)

接下來你可以使用你的案子df2等等。

在原帖子查看解決方案

11日回複11

werners1
尊敬的貢獻者三世

試著返回一列內容= null代替純null。

所以:

如果在列(col):

返回df

其他:

df。withColumn(“上校”,點燃(null))

cuteabhi32
新的貢獻者三世

嗨werner感謝輸入但仍拋出錯誤

AnalysisException:列“a”是不存在的。你的意思的嗎?(一個。id,一個。性病,。部門,。國旗,a。意思是,一個。工資,。neg_std,。new_var,。ref_date,。mean20perc a.neg_mean20perc); line 1 pos 57;

這是我更新的代碼我使用case語句.Kindly幫助

坳def func_udf (df):

列=列表(df.columns)

上校的列:

返回坳

其他:

點燃(空)

func_udf spark.udf.register (“ColumnChecker”)

一個=。withColumn (ref_date, expr (f”情況下當國旗= 3 ' {sf_start_dt} '其他ColumnChecker (a, ref_date)結束”))

werners1
尊敬的貢獻者三世

所以你有這個列列表(df.columns)。

如果你那麼做:

如果“columnName”列:df返回

df其他回報。withColumn(“上校”,點燃(null))

你不需要循環。

你在循環返回坳,df。你想和一個可選的額外返回完整的dataframe列,

cuteabhi32
新的貢獻者三世

從pyspark.sql。功能導入*

從pyspark.sql。功能導入坳

進口pyspark.sql。函數作為F

從datetime進口日期,日期時間

導入的時間

從dateutil。relativedelta進口relativedelta

從解析器dateutil進口

從pyspark.sql。窗口導入窗口

從pyspark.sql。導入類型*

進口地區

# - - - - - - - - - -合並語句

df1 = spark.read.format (csv)。選項(“頭”,“真正的”).load (“dbfs: / FileStore / shared_uploads / a.csv”)

df1.show ()

df2 = spark.read.format (csv)。選項(“頭”,“真正的”).load (“dbfs: / FileStore / shared_uploads / b.csv”)

df2.show ()

# - - - - - - - - - -合並語句

sf_start_dt = ' 02 may2022 '

一個= df1

= a.filter(坳(ref_date) = = f“{sf_start_dt}”)

一個= a.drop(名字)

= a.select (“id”、“工資”,“ref_date”、“性病”,“的意思是”)

b = df2

a.createOrReplaceTempView (“”)

b.createOrReplaceTempView (“b”)

= a.join (b,“id”、“外”)

df1 = spark.sql(“選擇tbl1.id(選擇1),從tbl1 tempCol1內連接b tbl2tbl1.id=tbl2.id”)

df2 = spark.sql(“選擇tbl1.id(選擇2)從tbl1 tempCol2離開加入b tbl2tbl1.id=tbl2.id在哪裏tbl2.id為空”)

df3 = spark.sql(“選擇tbl1.id(選擇3)作為tempCol3從b tbl1離開加入tbl2tbl1.id=tbl2.id在哪裏tbl2.id為空”)

= a.join (df1“id”、“外”). join (df2、“id”、“外”). join (df3、“id”、“外”)

一個= a.na.fill (0, ' tempCol1 ')

一個= a.na.fill (0, ' tempCol2 ')

一個= a.na.fill (0, ' tempCol3 ')

一個=。withColumn(“國旗”,合並(坳(tempCol1) +坳(tempCol2) +坳(tempCol3)))

一個= a.drop (“tempCol1”)

一個= a.drop (“tempCol2”)

一個= a.drop (“tempCol3”)

一個= \

.withColumn (“neg_std F.expr (f (std * (1))))

一個= \

.withColumn (“mean20perc F.expr (f(0.20 *意思)))

一個= \

.withColumn (“neg_mean20perc F.expr (f (mean20perc * (1))))

一個= \

.withColumn (“new_var F.expr (f”{sf_start_dt}”))

columnsToDrop = []

selectClause = "

a.createOrReplaceTempView (“”)

一個=火花。sql (“select *”)

從pyspark.sql。類型進口StringType

從pyspark.sql。功能導入udf

列=列表(a.columns)

打印(列)

坳def func_udf (df):

列=列表(df.columns)

如果卡紮菲在列:

返回df

其他:

df。withColumn(“上校”,點燃(null))

func_udf spark.udf.register (“ColumnChecker”)

一個=。withColumn (ref_date, expr (f”情況下當國旗= 3 ' {sf_start_dt} '其他ColumnChecker (a, ref_date)結束”))

一個=。withColumn(‘平衡’,expr (f”情況下當國旗= 3 0結束”))

a.show ()

一個=。withColumn (new_col, expr (f”情況下當國旗= 3 {sf_start_dt}結束”))

a.show ()

work_ppcin_bal2_2019_1 =一個

work_ppcin_bal2_2019_1.show ()

# merge語句結束- - - - - - - - - -

這是完整的長羽毛的代碼

歡迎來到磚社區:讓學習、網絡和一起慶祝

加入我們的快速增長的數據專業人員和專家的80 k +社區成員,準備發現,幫助和合作而做出有意義的聯係。

點擊在這裏注冊今天,加入!

參與令人興奮的技術討論,加入一個組與你的同事和滿足我們的成員。

Baidu
map