你好,我是一個新用戶,我使用Azure磚過程~ 1000直布羅陀JSON嵌套文件包含保險數據。我上傳Azure數據的JSON文件湖Gen2 dataframe存儲和讀取JSON文件。
df = spark.read.option(“多行”,“真正的”). json (mnt / mount /國歌/ 2022-10-11 / IndexFile / 2022 - 10 - 01 _anthem_index.json.gz”)
JSON文件的閱讀~ 25分鍾33鑲條文件。
我的集群配置概要:
JSON文件的模式是這樣的
根|——reporting_entity_name:字符串(nullable = true) |——reporting_entity_type:字符串(nullable = true) |——reporting_structure:數組(nullable = true) | |——元素:結構(containsNull = true) | | |——allowed_amount_files:結構(可空= true) | | | | -描述:字符串(nullable = true) | | | |,地點:字符串(nullable = true) | | |——in_network_files:數組(nullable = true) | | | |——元素:結構(containsNull = true) | | | | | -描述:字符串(nullable = true) | | | | |,地點:字符串(nullable = true) | | |——reporting_plans:數組(nullable = true) | | | |——元素:結構(containsNull = true) | | | | |——plan_id:字符串(nullable = true) | | | | |——plan_id_type:字符串(nullable = true) | | | | |——plan_market_type:字符串(nullable = true) | | | | |——plan_name:字符串(nullable = true)
因為數據是一個嵌套的JSON文件,我用下麵的函數代碼來平數據。
從pyspark.sql。從pyspark.sql進口*類型。功能導入* def平(df): #計算複雜的字段(列表和結構)模式complex_fields = dict ([df.schema (field.name field.dataType)領域。如果字段類型(field.dataType) = = ArrayType或類型(field.dataType) = = StructType]),而len (complex_fields) != 0:col_name =列表(complex_fields.keys())[0]打印(“處理:”+ col_name +“類型:”+ str(類型(complex_fields [col_name]))) #如果StructType那麼所有子元素轉換為列。#即扁平結構如果(類型(complex_fields [col_name]) = = StructType):擴大=[坳(col_name +‘。’+ k) .alias (col_name +“_”+ k)對k (n.name n complex_fields [col_name]]] df = df。選擇(“*”,*擴展).drop (col_name) #如果ArrayType然後添加使用爆炸函數數組元素的行#即爆炸數組elif(類型(complex_fields [col_name]) = = ArrayType): df = df.withColumn (col_name, explode_outer (col_name)) #驗算剩下複雜的領域模式complex_fields = dict ([df.schema (field.name field.dataType)領域。如果字段類型(field.dataType) = = ArrayType或類型(field.dataType) = = StructType])返回df
調用平函數
df_flatten =平(df)
這給了如下所示的輸出
處理:reporting_structure類型:< pyspark.sql.types類”。ArrayType >處理:reporting_structure類型:< pyspark.sql.types類”。StructType >處理:reporting_structure_allowed_amount_files類型:< pyspark.sql.types類”。StructType >處理:reporting_structure_in_network_files類型:< pyspark.sql.types類”。ArrayType >處理:reporting_structure_in_network_files類型:< pyspark.sql.types類”。StructType >處理:reporting_structure_reporting_plans類型:< pyspark.sql.types類”。ArrayType >處理:reporting_structure_reporting_plans類型:<類“pyspark.sql.types.StructType”>
然後我試圖顯示夷為平地dataframe df_flatten
df_flatten.display ()
這給了下麵的錯誤~ 50分鍾後執行
FileReadException:讀取文件時錯誤mnt / monut / mnt / mount /國歌/ 2022-10-11 / IndexFile / 2022 - 10 - 01 - _anthem_index.json。廣州所致:OutOfMemoryError: GC開銷限製超過
如果我試著寫數據到ADLS創2位置δ火花工作運行和一些~ 60分鍾後就失敗了。
df.write.mode(覆蓋).format(δ).save (mnt / monut / mnt / mount /國歌/ 2022-10-11 /加工/”)