我試圖應用一個函數來pyspark DataFrame並保存API響應一個新列,然後使用“json_normalize”解析。熊貓是很好的,但是,我遇到一個異常與“pyspark”。
進口pyspark。熊貓作為ps大熊貓作為pd導入導入請求def get_vals(行):#讓api調用返回行[A] *行[B] #創建一個熊貓DataFrame pdf = pd。DataFrame ({A: [1, 2, 3] B:(4、5、6)}) #應用api函數,得到反應pdf [' api_response '] = pdf。應用(λ行:get_vals(行),軸= 1)pdf.sample(5) #解壓JSON API反應嚐試:dff = pd.json_normalize (pdf (' api_response '] .str['位置'])除了TypeError e:打印(f“錯誤:{e}”)打印(f”有問題的數據:{數據(“數據”)}”)#,pySpark DataFrame psdf = ps.DataFrame (df) psdf.head (5)
預期的輸出是一個json DataFrame正常化。當我試圖應用函數在pyspark DataFrame,它拋出一個異常:
psdf [' api_response '] = psdf。應用(λ行:get_vals(行),軸= 1)- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - TypeError回溯(最近稱去年)文件<命令- 4372401754138893 >:2 1 - - - - - > 2 psdf [' api_response '] = psdf。應用(λ行:get_vals(行),軸= 1)TypeError:嵌套StructType不支持轉換從箭:struct <數據:struct <地理位置:
@Keval沙:
錯誤消息表明,問題是模式的psdf DataFrame,專門的嵌套的結構體類型api_response列。
不幸的是,PySpark不支持嵌套的結構體類型轉換從箭頭格式時,內部使用的PySpark DataFrames。因此,你不能直接應用一個函數來PySpark DataFrame返回一個嵌套的結構體類型。
一種解決方法是使用PySpark pandas_udf功能,它允許您使用熊貓UDF(用戶定義函數)PySpark DataFrame。您可以定義一個熊貓UDF,熊貓DataFrame作為輸入,該函數適用於DataFrame,並返回一個新的熊貓DataFrame規範化JSON數據。這裏有一個例子,您可以修改您的代碼如何使用pandas_udf:
進口pyspark.sql。函數從pyspark.sql F。類型進口StructType、StructField倍增式#定義的模式輸出DataFrame output_schema = StructType ([StructField(“位置。緯度”,倍增式()),StructField(“位置。經度”,倍增式()))#定義API調用的函數,應用和規範JSON響應的def api_call_udf (pdf): #應用API調用的輸入熊貓DataFrame pdf [' api_response '] = pdf。應用(λ行:get_vals(行),軸= 1)#規範化使用熊貓的JSON數據。json_normalize normalized_df = pd.json_normalize (pdf (' api_response '] .str['位置'])#返回規範化熊貓DataFrame如表返回normalized_df.to_arrow箭()#轉換PySpark DataFrame熊貓DataFrame pdf = psdf.to_pandas() #應用函數使用pandas_udf normalized_df = psdf。withColumn(“輸出”,F。pandas_udf (api_call_udf output_schema) (F.struct ([F.col (x)對x pdf.columns)))) #轉換輸出PySpark DataFrame回PySpark熊貓DataFrame psdf_output = ps.DataFrame (normalized_df)
在這段代碼中,我們定義了一個模式輸出DataFrame和定義一個函數,應用API調用的輸入大熊貓DataFrame,規範化使用熊貓的JSON數據。json_normalize並返回規範化熊貓DataFrame箭頭表。然後我們把PySpark DataFrame熊貓DataFrame,使用pandas_udf應用功能,轉換輸出PySpark DataFrame回PySpark熊貓DataFrame。