取消
顯示的結果
而不是尋找
你的意思是:

如何解決集群由於GC當訓練pyspark分解。毫升隨機森林

llvu
新的貢獻者二世

我想火車和優化一個隨機森林。起初集群處理垃圾收集好,但幾小時後,集群分解垃圾收集顯著上升。

train_df有6365018條記錄的大小與31列數。在分裂data_df訓練和測試之前dataframes(都是引發dataframes)我寫一個檢查點位置,優化分區的數量與火花。sql(”“優化delta.位置”" ")和閱讀dataframe再次。

有人能幫我與進一步優化垃圾收集的方法嗎?我能想到的減少max_depth參數,但我努力完全理解這個問題。

#從pyspark.ml特性改變。從pyspark.ml進口MinMaxScaler特性。進口StringIndexer特性,從pyspark.ml VectorAssembler #模型和管道。分類從pyspark.ml進口RandomForestClassifier。評估從pyspark進口MulticlassClassificationEvaluator。毫升從pyspark.ml進口管道。優化進口CrossValidator ParamGridBuilder num_partitions = data_df.rdd.getNumPartitions()並行性= np.floor ((n_cores-16) / num_partitions) .astype (int) train_df test_df = data_df.randomSplit ((。8。2)種子= 42)index_output_cols = [x +“指數”在categorical_cols string_indexer = StringIndexer (inputCols = categorical_cols outputCols = index_output_cols handleInvalid =“跳過”)numeric_cols =[字段(字段數據類型)train_df。dtypes如果(((數據類型= =“雙”)| (“int”數據類型))&(字段! = "標簽")))assembler_inputs = index_output_cols + numeric_cols vec_assembler = VectorAssembler (inputCols = assembler_inputs outputCol =“特性”)標量= MinMaxScaler (inputCol =“特性”,outputCol =“ScaledFeatures”)階段= (string_indexer vec_assembler,標量)管道管道=階段(階段)pipeline_data = = pipeline.fit (train_df) scaled_train_data = pipeline_data.transform (train_df) .cache () scaled_test_data = pipeline_data.transform (test_df) .cache () max_depth_choices = [5、10、20] n_estimator_choices = [80、100、120] subset_choices = (“0.1”、“0.3”、“0.5”] impurtiy_choices =(“基尼”,“熵”)評估者= MulticlassClassificationEvaluator (labelCol =“標簽”,predictionCol =“預測”,metricName = f1)分類器= RandomForestClassifier (featuresCol =“ScaledFeatures labelCol =“標簽”,種子= 42,maxBins = max_cat_size) #創建網格,適用於火花隨機森林param_grid = (ParamGridBuilder () .addGrid(分類器。maxDepth max_depth_choices) .addGrid(分類器。numTrees n_estimator_choices) .addGrid(分類器。featureSubsetStrategy subset_choices) .addGrid(分類器。雜質,impurtiy_choices) .build mlflow ())。start_run(嵌套= True, run_name = CrossValidator): mlflow。autolog(log_models=True, log_model_signatures=False) cv = CrossValidator(estimator=classifier, evaluator=evaluator, estimatorParamMaps=param_grid, numFolds=3, seed=42, parallelism=parallelism) cv_model = cv.fit(scaled_train_data) mlflow.log_param("best_train", np.mean(cv_model.avgMetrics))

4回複4

Hubert_Dudek1
尊敬的貢獻者三世

緩存昂貴,想保存數據到內存和磁盤(id沒有更多的空間留在內存)。我知道,在理論上,它應該改善,但它可以使事情變得更糟。我隻會把

scaled_train_data = pipeline_data.transform (train_df)

scaled_test_data = pipeline_data.transform (test_df)

然後我將分析分區的數量和規模scaled_train_data scaled_test_data然後重新分區,所以每個分區將在100年和200 MB。分區的數量我將乘法器的核心員工(例如,16核心的工人,我們有64個分區150 MB)。

分析使用:

df.rdd.getNumPartitions

df.rdd.partitions.length

df.rdd.partitions.size

重新分區,使用:

scaled_train_data = pipeline_data.transform (train_df) .repartition (optimal_number)

scaled_test_data = pipeline_data.transform (test_df) .repartition (optimal_number)

llvu
新的貢獻者二世

謝謝你的快速回複,讓我試試這個!

我的確是在印象中緩存dataframe會提高性能,而不是讓它變得更糟。

你會知道為什麼優化保存表不給最優分區數量嗎?還是給最優分區數量的存儲,而不是計算目的?

Hubert_Dudek1
尊敬的貢獻者三世

是的,你提到的優化存儲有關(所以它加速加載從存儲隻有在進行任何轉換之前你需要操作的部分創建集群變換()後)

Kaniz
社區經理
社區經理

嗨@Liselotte van Unen(客戶),我們沒有收到你自從上次反應@Hubert杜德克,我檢查,看看他的建議幫助你。

否則,如果你有任何解決方案,請與社區分享,因為它可以幫助別人。

同時,請別忘了點擊“選擇最佳”按鈕時提供的信息幫助解決你的問題。

歡迎來到磚社區:讓學習、網絡和一起慶祝

加入我們的快速增長的數據專業人員和專家的80 k +社區成員,準備發現,幫助和合作而做出有意義的聯係。

點擊在這裏注冊今天,加入!

參與令人興奮的技術討論,加入一個組與你的同事和滿足我們的成員。

Baidu
map