你好!
我用databricks-connector推出引發工作使用python。
我已經驗證的python版本(3.8.10)和運行時支持的版本(8.1)安裝databricks-connect (8.1.10)。
每次創建一個mapPartitions / foreachPartition行動結果兩個火花工作執行一個接一個,複製/步驟,每一個階段都發生過。
一個示例代碼如下所示:
# !從pyspark /usr/bin/env python。sql從pyspark.sql進口SparkSession。類型進口StructType、StructField StringType LongType模式= StructType ([StructField(“關鍵”,LongType(),真的),StructField(“價值”,StringType(),真的)])火花= SparkSession.builder.appName(測試).getOrCreate () data = spark.read.schema(模式)\ .option(“頭”,“真正的”)\ . csv (s3: / /道路/ to.csv) def有趣(行):打印(f”有一個分區len(列表(行)){}行”)#這些隻觸發一個工作# data.collect () # data.count() #這個觸發兩個!data.foreachPartition(有趣)
這兩份工作執行(在這個例子中而不是現實世界代碼!):
第一份工作,這是我不確定它產生原因:
org.apache.spark.rdd.RDD.foreach (RDD.scala: 1015) com.databricks.service.RemoteServiceExec.doExecute (RemoteServiceExec.scala: 244) org.apache.spark.sql.execution.SparkPlan。anonfun執行美元1美元(SparkPlan.scala: 196) org.apache.spark.sql.execution.SparkPlan。anonfun executeQuery美元1美元(SparkPlan.scala: 240) org.apache.spark.rdd.RDDOperationScope .withScope美元(RDDOperationScope.scala: 165) org.apache.spark.sql.execution.SparkPlan.executeQuery (SparkPlan.scala: 236) org.apache.spark.sql.execution.SparkPlan.execute (SparkPlan.scala: 192)美元org.apache.spark.sql.execution.QueryExecution.toRdd lzycompute (QueryExecution.scala: 163) org.apache.spark.sql.execution.QueryExecution.toRdd (QueryExecution.scala: 162) org.apache.spark.sql.Dataset.javaToPython sun.reflect.NativeMethodAccessorImpl (Dataset.scala: 3569)。invoke0(本地方法)sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java: 62) sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java: 43) java.lang.reflect.Method.invoke (Method.java: 498) py4j.reflection.MethodInvoker.invoke (MethodInvoker.java: 244) py4j.reflection.ReflectionEngine.invoke (ReflectionEngine.java: 380) py4j.Gateway.invoke (Gateway.java: 295) py4j.commands.AbstractCommand.invokeMethod (AbstractCommand.java: 132) py4j.commands.CallCommand.execute (CallCommand.java: 79) py4j.GatewayConnection.run (GatewayConnection.java: 251)
然後是實際工作:
org.apache.spark.rdd.RDD.collect (RDD.scala: 1034) org.apache.spark.api.python.PythonRDD .collectAndServe美元(PythonRDD.scala: 260) org.apache.spark.api.python.PythonRDD.collectAndServe sun.reflect.NativeMethodAccessorImpl (PythonRDD.scala)。invoke0(本地方法)sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java: 62) sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java: 43) java.lang.reflect.Method.invoke (Method.java: 498) py4j.reflection.MethodInvoker.invoke (MethodInvoker.java: 244) py4j.reflection.ReflectionEngine.invoke (ReflectionEngine.java: 380) py4j.Gateway.invoke (Gateway.java: 295) py4j.commands.AbstractCommand.invokeMethod (AbstractCommand.java: 132) py4j.commands.CallCommand.execute (CallCommand.java: 79) py4j.GatewayConnection.run (GatewayConnection.java: 251) java.lang.Thread.run (Thread.java: 748)
知道為什麼發生這種情況,我隻能防止第一份工作運行和運行實際的代碼?
我已經證實,在第一遍,foreachPartitions運行的代碼。
不推薦使用.cache()對於真實世界場景,因為數據集很大甚至會持續的時間要比再次執行工作(可能失敗的磁盤上的可用性)。
這說明的一件事是,它看起來與磚的RemoteServiceExec代碼。也許它不知不覺地物化導致數據集/抽樣嗎?
任何人都可以幫忙嗎?
謝謝