取消
顯示的結果
而不是尋找
你的意思是:

PySpark工作加入&寫拚花與FetchFailedException操作失敗

arz”id=
新的因素

我在做一個任務,我將一個數據集,重新保存一個S3 bucket。這涉及到加入了其他兩個數據集,將字段從初始數據集重疊字段從其他兩個散列與pyspark.sql.functions某些字段。sha2(坳,256),和寫作結果S3。還有第三個加入,但這是< 1 mb的數據集,應該很容易通過廣播加入處理。我的一個簡化版本代碼如下:

gcol = <列在分組數據生成過程> initial_names = df.schema.names #加載數據集df_600MB = spark.read.parquet (…)。withColumnRenamed (id_col id_col + _600MB) df_400MB = spark.read.parquet (…) df_tiny = pd.read_csv (…) # 3 kb df_tiny.loc [df_tiny [gcol] .isna (), gcol] =沒有df_tiny = spark.createDataFrame (df_3)。withColumnRenamed (gcol gcol + _tmp) #找到哪些字段重疊放df colset_1 = (c c df_600MB.schema.names如果(c, c = gcol和! = id_col)] colset_2 = [c c df_400MB.schema.names如果c ! = gcol] df = df。下降(* (colset_1 + colset_2)) df_400MB = df_400MB。withColumnRenamed (gcol gcol + _400MB) #分配行數字隨機創建多對一加入df和df_400MB win_df = Window.partitionBy (gcol) .orderBy (F.rand ()) win_df_400MB = Window.partitionBy (gcol + _400MB) .orderBy (F.rand ()) df_400MB = df_400MB。withColumn (rn, F.row_number () .over (win_df_400MB) - 1) df = df.join (F.broadcast (df_tiny),在= df (gcol) .eqNullSafe (df_tiny [gcol + ' _tmp '])) \ .drop (gcol + _tmp) #打印(df.count()) # 26億#帽的rn df,所以總是< =“rn”df_400MB #最大價值的rn df_400MB df_tiny包含“數量”字段,因此上述加入df = df。withColumn (rn, F.row_number () .over (win_df_2) % F.col(“計數”))df = df。加入(df_400MB =“內心”,如何在= ((df (gcol) .eqNullSafe (df_400MB [gcol + ' _400MB '])) & (df (rn”) = = df_400MB [' rn ']))) #打印(df.count()) # 26億# id_col +‘_600MB df_600MB df = df是一個獨特的關鍵。加入(df_600MB = df [id_col] = = df_600MB [id_col + ' _600MB '],如何= '左')df = df.select保持相同的模式(initial_names) # #打印(df.count()) 26億如果save_df:打印(“散列”)df = hash_di (df fields_to_hash)打印df.write(“儲蓄”)。拚花(S3 bucket名稱)

一些數字:最初的數據集是694 gb 26億行和100字段。另外兩個是400 mb和600 mb,分別約為200萬和1億行,大約6在每個字段。加入多對一,即更大的數據集的每一行隻能匹配一行在每一個較小的數據集。我包括調用df.count()來驗證dataframe仍同樣大小的加入後,也在1%。(奇怪的是,df.count()連續三次去順利。)

我的集群配置24執行人8核和61 gb的內存——r4.2xlarge,對於那些使用磚。這是內存和192年1.4結核病核心。這些火花配置變量我設置:

spark.dynamicAllocation。真正的spark.executor啟用。40 g spark.shuffle.file記憶。緩衝1024 k spark.sql.shuffle。分區720 spark.network.timeout 360年代火花。maxRemoteBlockSizeFetchToMem 2147483135 spark.sql.adaptive。真正的spark.sql.execution.arrow.pyspark啟用。使真正的spark.default.parallelism 720

就像我說的,這個集群能夠執行df.count()很好。然而,當我叫df.write。拚花,事情進展得很糟糕。的工作,泄漏達到多個字節,好幾次我的數據的大小;洗牌的讀和寫是幾十gb;運行數小時後,工作失敗有以下錯誤:

引起的:org.apache.spark。SparkException:工作階段失敗而終止:ShuffleMapStage 19(拚花NativeMethodAccessorImpl.java: 0)未能最大允許的次數:4。最近的失敗原因:

org.apache.spark.shuffle.FetchFailedException

引起的:java。10.41.61.123:4048 IOException:連接失敗

我還應該提到階段這是失敗,因為一些原因,隻有82任務,並行性遠比我想象的火花會使用。

我猜一個執行人死。誰有建議我可以得到這個工作如何運行?我應該配置集群不同?有一個火花變量我可以設置會有所不同嗎?任何幫助將不勝感激。

0回答0
Baidu
map