我有一個火花結構化流的工作從2δ表讀取流,處理數據,然後寫到第三δ表。磚的工作正在運行服務豐富。
有時工作失敗有以下例外。例外是間歇性的,到目前為止,我還沒有能夠重現這個問題的步驟。這個異常的原因可以什麼?
引起的:java。IOException:錯誤讀取流狀態文件dbfs: / checkpoint_loc /州/ 0/183/26。快照HDFSStateStoreProvider [id = (op = 0 = 183), dir = dbfs: / checkpoint_loc /狀態/ 0/183):過早EOF
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.readSnapshotFile (HDFSBackedStateStoreProvider.scala: 642)
在org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider。anonfun loadMap美元3美元(HDFSBackedStateStoreProvider.scala: 437)
scala.Option.orElse (Option.scala: 447)
在org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider。anonfun loadMap美元2美元(HDFSBackedStateStoreProvider.scala: 437)
org.apache.spark.util.Utils .timeTakenMs美元(Utils.scala: 642)
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.loadMap (HDFSBackedStateStoreProvider.scala: 417)
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.getLoadedMapForStore (HDFSBackedStateStoreProvider.scala: 245)
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.getStore (HDFSBackedStateStoreProvider.scala: 229)
在org.apache.spark.sql.execution.streaming.state.StateStore $ . get (StateStore.scala: 500)
org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute (StateStoreRDD.scala: 125)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint (RDD.scala: 380)
org.apache.spark.rdd.RDD.iterator (RDD.scala: 344)
org.apache.spark.rdd.MapPartitionsRDD.compute (MapPartitionsRDD.scala: 60)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint (RDD.scala: 380)
org.apache.spark.rdd.RDD.iterator (RDD.scala: 344)
在org.apache.spark.scheduler.ResultTask。anonfun runTask美元3美元(ResultTask.scala: 75)
在美元com.databricks.spark.util.ExecutorFrameProfiler知根知底(ExecutorFrameProfiler.scala: 110)
在org.apache.spark.scheduler.ResultTask。anonfun runTask美元1美元(ResultTask.scala: 75)
在美元com.databricks.spark.util.ExecutorFrameProfiler知根知底(ExecutorFrameProfiler.scala: 110)
org.apache.spark.scheduler.ResultTask.runTask (ResultTask.scala: 55)
org.apache.spark.scheduler.Task.doRunTask (Task.scala: 150)
在org.apache.spark.scheduler.Task。anonfun運行$ 1美元(Task.scala: 119)
在美元com.databricks.spark.util.ExecutorFrameProfiler知根知底(ExecutorFrameProfiler.scala: 110)
org.apache.spark.scheduler.Task.run (Task.scala: 91)
在org.apache.spark.executor.Executor TaskRunner。美元anonfun運行13美元(Executor.scala: 813)
org.apache.spark.util.Utils .tryWithSafeFinally美元(Utils.scala: 1620)
在org.apache.spark.executor.Executor TaskRunner。美元anonfun運行4美元(Executor.scala: 816)
在scala.runtime.java8.JFunction0專門sp.apply美元(美元JFunction0 mcV $ sp.java: 23)
在美元com.databricks.spark.util.ExecutorFrameProfiler知根知底(ExecutorFrameProfiler.scala: 110)
org.apache.spark.executor.Executor TaskRunner.run美元(Executor.scala: 672)
java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java: 1149)
java.util.concurrent.ThreadPoolExecutor Worker.run美元(ThreadPoolExecutor.java: 624)
java.lang.Thread.run (Thread.java: 748)
您正在使用開源HDFS狀態存儲提供商。有一個問題在閱讀你的檢查站。我要強烈推薦使用RocksDB狀態存儲https://docs.www.eheci.com/spark/latest/structured-streaming/production.html configure-rocksdb-stat……
嗨@Jose岡薩雷斯,
謝謝你的回複。
寫流如下
數據集。writeStream .format(“δ”).outputMode(“追加”).option (“checkpointLocation”,“dbfs: / checkpoints_v1 / < table_name >”) .option .table (“mergeSchema”、“true”) (“< table_name >”)
創建數據集獲取記錄從三角洲表後流和應用flatMapGroupsWithState API的記錄。
一些觀察:
1)錯誤發生的概率更大更高的負載。如果輸入率flatMapGroupsWithState API 12000條記錄/秒,然後經常發生錯誤,一般2小時內開始的工作。低負荷的4000條記錄/秒,經常出現的錯誤。
2)的快照文件的大小發生錯誤是0字節。
讓我知道如果你需要任何其他信息。
感謝和問候,
羅翰