你好,
我需要一些幫助這個例子。我們試圖創建一個linearRegression模型,可以為成千上萬的並行化符號/日期。當我們運行這個picklingError
任何建議都是感謝!
K
錯誤:
PicklingError:不能序列化對象:RuntimeError:看來你是試圖從廣播引用SparkContext變量,行動,或轉換。SparkContext隻能用於驅動程序,而不是在代碼上運行工人。有關更多信息,請參見火花- 5063。
代碼:
從pyspark。sql從pyspark.ml進口SparkSession。回歸從pyspark.ml進口LinearRegression。功能導入VectorAssembler #創建SparkSession火花= SparkSession.builder.getOrCreate() #創建一個與你的數據data_rdd = spark.sparkContext抽樣。並行化(((“symbol1”, 1、2、3), (“symbol2”, 4、5、6), (“symbol3”, 7、8、9)]) #將抽樣轉換成DataFrame data_df = data_rdd。toDF([“象征”、“Feature1”,“Feature2”、" Feature3 "]) #定義特征列彙編= VectorAssembler (inputCols = [“Feature1”、“Feature2”,“Feature3”], outputCol =“特性”)#適合模型在每個分區上,收集權重def fit_model(分區):#創建一個新的線性回歸模型模型= LinearRegression (featuresCol =“特性”,labelCol =“象征”)#創建一個空列表來存儲重量重量=[]#分區迭代器轉換為一個列表data_list =(分區)#列表轉換為一個DataFrame data_partition_df =火花。createDataFrame (data_list data_df.columns) #執行向量組裝data_partition_df = assembler.transform (data_partition_df) #適合模型對數據進行分區fitted_model = model.fit (data_partition_df) #得到權重模型權重= [fitted_model。係數(我)我的範圍(len (fitted_model.coefficients))) #產量權重產量重量#適合模型在每個分區上,收集權重partition_weights = data_df.rdd.mapPartitions (fit_model) .collect() #創建一個DataFrame收集權重weights_df =火花。createDataFrame (partition_weights [“Weight1”、“Weight2”,“劑量”])#顯示重量DataFrame weights_df.show ()