我覺得我要瘋了。我已經測試了一個數據管道標準計算集群。我加載新文件批處理從穀歌雲存儲桶。自動裝卸機完全按預期工作從我的筆記本在我的計算集群。然後,我隻是用這個筆記本作為工作流中的第一個任務使用一個集群的新工作。為了測試這個管道作為工作流我第一次刪除了所有檢查點文件和目錄在開始運行之前使用這個命令。
dbutils.fs。rm (checkpoint_path,真的)
出於某種原因,代碼完全當測試工作,但在工作流,我得到了“流停止”,從自動裝卸機沒有數據。這是我的自動裝卸機配置:
file_path = " gs: / / raw_zone_twitter”
table_name = f“twitter_data_autoloader”
checkpoint_path = f“/ tmp / _checkpoint / twitter_checkpoint”
火花。sql (f“DROP TABLE如果存在{table_name}”)
查詢= (spark.readStream
.format (“cloudFiles”)
.option (“cloudFiles。格式”、“文本”)
.option (“cloudFiles。schemaLocation”, checkpoint_path)
.load (file_path)
.withColumn (“filePath input_file_name ())
.writeStream
checkpoint_path .option (“checkpointLocation”)
.trigger(一旦= True)
.toTable (table_name))
這個工作流運行時我看到檢查點創建目錄,但是裏麵沒有數據。
我計算集群上的代碼之間的測試,我工作流的任務是完全一樣的(筆記本),所以我真的不知道為什麼自動裝卸機不工作在我工作流程……
仍然沒有任何進展。我想確認我的集群配置是相同的在我的筆記本上運行我的通用計算集群和集群的工作。我也使用相同的GCP服務帳戶。在我的計算集群自動裝卸機完全按預期工作。這是代碼被用於自動裝卸機(這適用於計算集群)。
然而,當我運行相同的代碼(來自同一筆記本)作為工作自動裝卸機停止流(似乎.writeStream),我隻是看到“流停止”沒有真正知道為什麼,如下見過。
如果我去雲存儲我看到檢查點位置創建,但提交文件夾是空的,這意味著自動裝卸機無法寫流。
如果我運行工作流的筆記本外我看到提交文件夾被填充,如果我刪除dbutils.fs。rm (checkpoint_path,真的)command autoloader correctly does not write new files until new files are available in the source bucket.