問題
寫信給一個S3 bucket使用抽樣失敗。司機節點可以寫,但工人(執行者)節點返回一個拒絕訪問錯誤。寫作與DataFrame API,但是效果很好。
例如,假設你運行以下代碼:
% scala . io .進口java文件導入。可序列化的import org.apache.spark.{SparkConf, SparkContext} import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.conf.Configuration import java.net.URI import scala.collection.mutable import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.dstream.InputDStream val ssc = new StreamingContext(sc, Seconds(10)) val rdd1 = sc.parallelize(Seq(1,2)) val rdd2 = sc.parallelize(Seq(3,4)) val inputStream = ssc.queueStream[Int](mutable.Queue(rdd1,rdd2)) val result = inputStream.map(x => x*x) val count = result.foreachRDD { rdd => val config = new Configuration(sc.hadoopConfiguration) with Serializable rdd.mapPartitions { _.map { entry => val fs = FileSystem.get(URI.create("s3://dx.lz.company.fldr.dev/part_0000000-123"), config) val path = new Path("s3://dx.lz.company.fldr.dev/part_0000000-123") val file = fs.create(path) file.write("foobar".getBytes) file.close() } }.count() } println(s"Count is $count") ssc.start()
返回下列錯誤:
org.apache.spark。SparkException:工作階段失敗而終止:任務3階段0.0失敗了4次,最近的失敗:在舞台上失去了任務3.3 0.0 (TID 7 10.205.244.228執行人0):java.rmi。RemoteException異常:com.amazonaws.services.s3.model。AmazonS3Exception:拒絕訪問;請求ID: F81ADFACBCDFE626,擴展請求ID: 1 dncbuhsmuffi9a1lz0ygt4dnrjdy5v3c + J / DiEeg8Z4tMOLphZwW2U + sdxmr8fluQZ1R / 3 bcep,
導致
當您編寫使用抽樣工作節點,我政策拒絕訪問如果你使用可序列化的,如val配置=新配置(sc.hadoopConfiguration)可序列化的。
解決方案
有兩種方法可以解決這個問題:
選項1:使用DataFrames
% scala dbutils.fs.put (“s3a: / / dx.lz.company.fldr.dev / test-gopi / test0.txt”、“foobar”) val df = spark.read.text (“s3a: / / dx.lz.company.fldr.dev / test-gopi / test0.txt”) df.write.text (“s3a: / / dx.lz.company.fldr.dev / test-gopi / text1.txt”) val df1 = spark.read.text (“s3a: / / dx.lz.company.fldr.dev / test-gopi / text1.txt”)
選項2:使用SerializableConfiguration
如果你想使用抽樣,使用:
% scala val配置= sc.broadcast(新SerializableConfiguration (sc.hadoopConfiguration))
例如:
% scala . io .進口java文件導入。可序列化的import org.apache.spark.{SparkConf, SparkContext} import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.conf.Configuration import java.net.URI import scala.collection.mutable import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.dstream.InputDStream import org.apache.spark.util.SerializableConfiguration val ssc = new StreamingContext(sc, Seconds(10)) val rdd1 = sc.parallelize(Seq(1,2)) val rdd2 = sc.parallelize(Seq(3,4)) val inputStream = ssc.queueStream[Int](mutable.Queue(rdd1,rdd2)) val result = inputStream.map(x => x*x) val count = result.foreachRDD { rdd => //val config = new Configuration(sc.hadoopConfiguration) with Serializable val config = sc.broadcast(new SerializableConfiguration(sc.hadoopConfiguration)) rdd.mapPartitions { _.map { entry => val fs = FileSystem.get(URI.create("s3://pathpart_0000000-123"), config.value.value) val path = new Path("s3:///path/part_0000000-123") val file = fs.create(path) file.write("foobar".getBytes) file.close() } }.count() } println(s"Count is $count") ssc.start()