取消
顯示的結果
而不是尋找
你的意思是:

小貼士正確使用大型廣播變量?

nthomas
新的因素

我用廣播變量規模大約100 MB泡菜,我近似:

> > > data =列表(範圍(int (10 * 1 e6))) > > >進口cPickle泡菜> > > len (pickle.dumps(數據))98888896

一個集群上運行3 c3.2xlarge執行人,m3。大的驅動程序,使用下麵的命令啟動交互式會話:

IPYTHON = 1 pyspark——executor-memory 10 g driver-memory 5 g - conf spark.driver.maxResultSize = 5 g

在一次抽樣,如果我堅持這個廣播變量引用,內存使用爆炸。100年一個100 MB的變量的引用,即使複製100次,我希望不超過10 GB的數據使用總(更不用說30 GB / 3節點)。然而,我看到的內存錯誤當我運行下麵的測試:

data =列表(範圍(int (10 * 1 e6))) = sc.broadcast元數據(數據)id = sc.parallelize (zip(範圍(100),(100)))joined_rdd = id。mapValues(λ_:metadata.value) joined_rdd.persist()打印(“數:{}”.format (joined_rdd.count ()))

堆棧跟蹤:

TaskSetManager:在舞台上失去了任務17.3 0.0 (TID 75, 10.22.10.13): org.apache.spark.api.python。PythonException:回溯(最近調用最後):文件“/ usr / lib /火花/ python / lib / pyspark.zip / pyspark /工人。py”, 111行,在主進程()文件“/ usr / lib /火花/ python / lib / pyspark.zip / pyspark /工人。py”, 106行,過程中序列化器。dump_stream (func (split_index,迭代器),輸出文件)文件“/ usr / lib /火花/ python / pyspark /抽樣。py”, 2355行,在pipeline_func返回func(分裂,prev_func(分裂,迭代器))文件“/ usr / lib /火花/ python / pyspark /抽樣。py”, 2355行,在pipeline_func返回func(分裂,prev_func(分裂,迭代器))文件“/ usr / lib /火花/ python / pyspark /抽樣。py”, 317行,在函數返回f (iterator)文件“/ usr / lib /火花/ python / pyspark /抽樣。py”, 1006行,在<λ>回歸自我。mapPartitions(λ我:[總和(在我1 _))).sum()文件“/ usr / lib /火花/ python / pyspark /抽樣。py”, 1006行,在< genexpr >回歸自我。mapPartitions(λ我:[總和(在我1 _))).sum()文件“/ usr / lib /火花/ python / lib / pyspark.zip / pyspark /序列化器。py”, 139行,在load_stream收益率self._read_with_length(流)文件“/ usr / lib /火花/ python / lib / pyspark.zip / pyspark /序列化器。py”, 164行,在_read_with_length返回self.loads (obj)文件“/ usr / lib /火花/ python / lib / pyspark.zip / pyspark /序列化器。py”, 422行,在負載返回pickle.loads (obj) MemoryError org.apache.spark.api.python.PythonRDD立刻1.美元讀(PythonRDD.scala: 138) org.apache.spark.api.python.PythonRDD立刻1美元。< init > (PythonRDD.scala: 179) org.apache.spark.api.python.PythonRDD.compute (PythonRDD.scala: 97) org.apache.spark.rdd.RDD.computeOrReadCheckpoint (RDD.scala: 297) org.apache.spark.rdd.RDD.iterator (RDD.scala: 264) org.apache.spark.scheduler.ResultTask.runTask (ResultTask.scala: 66) org.apache.spark.scheduler.Task.run (Task.scala: 88)美元org.apache.spark.executor.Executor TaskRunner.run (Executor.scala: 214) java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java: 1145) org.apache.spark.scheduler.Task.run (Task.scala: 88)美元org.apache.spark.executor.Executor TaskRunner.run (Executor.scala: 214) java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java: 1145)美元java.util.concurrent.ThreadPoolExecutor Worker.run (ThreadPoolExecutor.java: 615) java.lang.Thread.run (Thread.java: 745) 16/05/25 23:57:15錯誤TaskSetManager:任務17階段0.0失敗了4次;流產的工作- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - Py4JJavaError回溯(最近調用最後)< ipython-input-1-7a262fdfa561 > <模塊> ()7 joined_rdd.persist(8)打印(“堅持叫”)——> 9打印(“數:{}”.format (joined_rdd.count ())) /usr/lib/spark/python/pyspark/rdd.py 1005年計數(自我)1004 3”“”- > 1006回歸自我。mapPartitions(λ我:[總和(在我1 _))).sum () 1007 1008 def統計(自我):/usr/lib/spark/python/pyspark/rdd.py總之(自我)995 6.0 996年”“”——> 997年回歸自我。mapPartitions(λx: [sum (x)])。fold(0, operator.add) 998 999 def count(self): /usr/lib/spark/python/pyspark/rdd.py in fold(self, zeroValue, op) 869 # zeroValue provided to each partition is unique from the one provided 870 # to the final reduce call --> 871 vals = self.mapPartitions(func).collect() 872 return reduce(op, vals, zeroValue) 873 /usr/lib/spark/python/pyspark/rdd.py in collect(self) 771 """ 772 with SCCallSiteSync(self.context) as css: --> 773 port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd()) 774 return list(_load_from_socket(port, self._jrdd_deserializer)) 775 /usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:745)

我看過以前的線程的內存使用泡菜反序列化是一個問題。然而,我希望一個廣播變量隻能反序列化(和加載到內存中一個執行人)一次,和隨後的引用

value

引用內存中的地址。似乎並沒有這種情況,但是。我遺漏了什麼東西?廣播的例子我見過變量字典,使用一次將一組數據(即取代機場與機場縮寫名稱)。堅持他們的動機是創建對象的知識廣播變量以及如何與它交互,堅持這些對象,並使用它們執行多個計算(火花照顧著它們在內存中)。

有哪些建議使用大(100 MB +)廣播變量?堅持一個廣播變量錯誤?這是一個問題,可能是特定於PySpark嗎?

謝謝你!我是感激你的幫助。

注意,我也出現了這個問題stackoverflow

5回複5

_throwaway
新的貢獻者二世

nbajason
新的貢獻者二世

請幫助.....

我們絕對需要一個最佳實踐對於這個場景,我們需要從巨大的負載重量和偏見numpy文件(500或5 g),並分發給每個工人的推理。

僅供參考:

1。加載大文件在工人:每個工人可以加載這些文件但是會有很多例外,在測試期間,將迭代將有20%這樣的例外。

2。負載在司機:巨大的文件引發不能播出”_io。FileIO的變量

= = = = = = = = 1。代碼= = = = = = = = pcml.models。multiTrialClassifier進口multiTrialClassifier嚐試:模型= multiTrialClassifier () model.setup_in_tf()除了:進口tensorflow tf tf.reset_default_graph()模型= multiTrialClassifier () model.setup_in_tf ()

tdeNeuralClassifierModelBV = sc.broadcast(模型)

= = = = = = = = 2。錯誤= = = = = = = = = configApiStack: qa加載{“分”:“違約”,“規模”:0.8756789083034756,“tokenlist_ref”:“attrs_labels”}加載{“分”:“違約”,“規模”:2.618619150801559,“tokenlist_ref”:“sch_labels”}回溯(最近的電話最後):文件“/磚/ / python / pyspark /火花broadcast.py”,83行轉儲泡菜。轉儲(f值,,2)_io TypeError:不能序列化”。FileIO的對象

PicklingError:不能序列化廣播:TypeError:不能序列化io。FileIO '對象= = = = = = = 3。錯誤詳細信息= = = = = = PicklingError回溯(最近調用最後)<命令- 1376727837833372 > <模塊> 39 model.setup_in_tf () () 40 - - - > 41 tdeneuralclassifiermodelbv = sc.broadcast(模型)/磚/ / python / pyspark /火花context.py797年廣播(自我價值)被發送到每個集群隻有一次。798”“”- - > 799返回廣播(自我價值,self.pickled_broadcast_vars) 800 801 def蓄電池(自我價值,accum_param = None): /磚/ / python / pyspark /火花broadcast.py在初始化(自我、sc、價值、pickle_registry路徑)72如果sc isnotNone: 73 f = NamedTemporaryFile(刪除= False, dir = sc._temp_dir) - - - > 74的自我。_path =自我。轉儲(價值,f) 75年的自我。_jbroadcast = sc。jvm.PythonRDD.readBroadcastFromFile (sc。jsc self.path) 76年的自我。pickle_registry = pickle_registry /磚/ / python / pyspark /火花broadcast.py在轉儲(自我價值,f) 88% (e.class.name_exception_message (e)) 89年print_exec (sys.stderr) - - > 90年籌集pickle.PicklingError(味精)91 f.close(92)返回f.name

dewyyydewttt
新的貢獻者二世

好工作我的團隊男孩:DDD

dewhhhdewggg
新的貢獻者二世

好喜歡เกมสล็อต我的朋友,DDD

Baidu
map