我用廣播變量規模大約100 MB泡菜,我近似:
> > > data =列表(範圍(int (10 * 1 e6))) > > >進口cPickle泡菜> > > len (pickle.dumps(數據))98888896
一個集群上運行3 c3.2xlarge執行人,m3。大的驅動程序,使用下麵的命令啟動交互式會話:
IPYTHON = 1 pyspark——executor-memory 10 g driver-memory 5 g - conf spark.driver.maxResultSize = 5 g
在一次抽樣,如果我堅持這個廣播變量引用,內存使用爆炸。100年一個100 MB的變量的引用,即使複製100次,我希望不超過10 GB的數據使用總(更不用說30 GB / 3節點)。然而,我看到的內存錯誤當我運行下麵的測試:
data =列表(範圍(int (10 * 1 e6))) = sc.broadcast元數據(數據)id = sc.parallelize (zip(範圍(100),(100)))joined_rdd = id。mapValues(λ_:metadata.value) joined_rdd.persist()打印(“數:{}”.format (joined_rdd.count ()))
堆棧跟蹤:
TaskSetManager:在舞台上失去了任務17.3 0.0 (TID 75, 10.22.10.13): org.apache.spark.api.python。PythonException:回溯(最近調用最後):文件“/ usr / lib /火花/ python / lib / pyspark.zip / pyspark /工人。py”, 111行,在主進程()文件“/ usr / lib /火花/ python / lib / pyspark.zip / pyspark /工人。py”, 106行,過程中序列化器。dump_stream (func (split_index,迭代器),輸出文件)文件“/ usr / lib /火花/ python / pyspark /抽樣。py”, 2355行,在pipeline_func返回func(分裂,prev_func(分裂,迭代器))文件“/ usr / lib /火花/ python / pyspark /抽樣。py”, 2355行,在pipeline_func返回func(分裂,prev_func(分裂,迭代器))文件“/ usr / lib /火花/ python / pyspark /抽樣。py”, 317行,在函數返回f (iterator)文件“/ usr / lib /火花/ python / pyspark /抽樣。py”, 1006行,在<λ>回歸自我。mapPartitions(λ我:[總和(在我1 _))).sum()文件“/ usr / lib /火花/ python / pyspark /抽樣。py”, 1006行,在< genexpr >回歸自我。mapPartitions(λ我:[總和(在我1 _))).sum()文件“/ usr / lib /火花/ python / lib / pyspark.zip / pyspark /序列化器。py”, 139行,在load_stream收益率self._read_with_length(流)文件“/ usr / lib /火花/ python / lib / pyspark.zip / pyspark /序列化器。py”, 164行,在_read_with_length返回self.loads (obj)文件“/ usr / lib /火花/ python / lib / pyspark.zip / pyspark /序列化器。py”, 422行,在負載返回pickle.loads (obj) MemoryError org.apache.spark.api.python.PythonRDD立刻1.美元讀(PythonRDD.scala: 138) org.apache.spark.api.python.PythonRDD立刻1美元。< init > (PythonRDD.scala: 179) org.apache.spark.api.python.PythonRDD.compute (PythonRDD.scala: 97) org.apache.spark.rdd.RDD.computeOrReadCheckpoint (RDD.scala: 297) org.apache.spark.rdd.RDD.iterator (RDD.scala: 264) org.apache.spark.scheduler.ResultTask.runTask (ResultTask.scala: 66) org.apache.spark.scheduler.Task.run (Task.scala: 88)美元org.apache.spark.executor.Executor TaskRunner.run (Executor.scala: 214) java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java: 1145) org.apache.spark.scheduler.Task.run (Task.scala: 88)美元org.apache.spark.executor.Executor TaskRunner.run (Executor.scala: 214) java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java: 1145)美元java.util.concurrent.ThreadPoolExecutor Worker.run (ThreadPoolExecutor.java: 615) java.lang.Thread.run (Thread.java: 745) 16/05/25 23:57:15錯誤TaskSetManager:任務17階段0.0失敗了4次;流產的工作- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - Py4JJavaError回溯(最近調用最後)< ipython-input-1-7a262fdfa561 > <模塊> ()7 joined_rdd.persist(8)打印(“堅持叫”)——> 9打印(“數:{}”.format (joined_rdd.count ())) /usr/lib/spark/python/pyspark/rdd.py 1005年計數(自我)1004 3”“”- > 1006回歸自我。mapPartitions(λ我:[總和(在我1 _))).sum () 1007 1008 def統計(自我):/usr/lib/spark/python/pyspark/rdd.py總之(自我)995 6.0 996年”“”——> 997年回歸自我。mapPartitions(λx: [sum (x)])。fold(0, operator.add) 998 999 def count(self): /usr/lib/spark/python/pyspark/rdd.py in fold(self, zeroValue, op) 869 # zeroValue provided to each partition is unique from the one provided 870 # to the final reduce call --> 871 vals = self.mapPartitions(func).collect() 872 return reduce(op, vals, zeroValue) 873 /usr/lib/spark/python/pyspark/rdd.py in collect(self) 771 """ 772 with SCCallSiteSync(self.context) as css: --> 773 port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd()) 774 return list(_load_from_socket(port, self._jrdd_deserializer)) 775 /usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:745)
我看過以前的線程的內存使用泡菜反序列化是一個問題。然而,我希望一個廣播變量隻能反序列化(和加載到內存中一個執行人)一次,和隨後的引用
value
引用內存中的地址。似乎並沒有這種情況,但是。我遺漏了什麼東西?廣播的例子我見過變量字典,使用一次將一組數據(即取代機場與機場縮寫名稱)。堅持他們的動機是創建對象的知識廣播變量以及如何與它交互,堅持這些對象,並使用它們執行多個計算(火花照顧著它們在內存中)。
有哪些建議使用大(100 MB +)廣播變量?堅持一個廣播變量錯誤?這是一個問題,可能是特定於PySpark嗎?
謝謝你!我是感激你的幫助。
注意,我也出現了這個問題stackoverflow。