我目前想使用此功能的“觸發工作新文件到達時”我的一個項目。我有一個s3 bucket中,隨機文件到達天。所以我創建了一個工作,並設置觸發“文件到來”類型。在s3的筆記本我試圖讀取位置如下:
df = (spark.read.format (csv) .option (“inferSchema”,真的).option(“標題”,真的).option .load (“9”, ", ") (" s3: / < bucket_name > / <子文件夾> / "))
工作時觸發一個新文件的到來。但是當新文件到它讀取以前的文件。我隻是想讀取新文件,並將它附加到任何現有的表。
有什麼辦法文件名,這樣我可以使用下麵的代碼隻讀取新文件:
file_name = dbutils.widgets.get (“file_name”) df = (spark.read.format (csv) .option (“inferSchema”,真的).option(“標題”,真的).option .load (“9”, ", ") (" s3: / / < bucket_name > / < folder_name > / < file_1.csv >”))
或者有其他方法來解決它。嗎?
@Nikhil Kumawat:
是的,你可以讓新來的文件的名稱使用filePaths DataFrame()方法,傳遞給筆記本。這個方法返回一個對應的路徑列表文件,添加了自上次觸發器。
下麵是一個示例代碼片段展示了如何得到新文件的名稱:
#從DataFrame獲取文件路徑列表file_paths = df.input_file_name() #獲得新文件的名稱new_file_path = file_paths [1] new_file_name = new_file_path.split(“/”)[1] #新文件加載到DataFrame df_new = (spark.read.format (csv) .option (“inferSchema”,真的).option(“標題”,真的).option (“9”, ", ") .load (new_file_path))
在這個代碼片段,df是DataFrame傳遞到筆記本的觸發器。input_file_name()方法返回一個包含文件路徑DataFrame列的每一行。通過調用file_paths[1],你可以得到最後的路徑(新來的)文件。分割(“/”)[1]調用提取文件名的路徑。最後,您可以使用該文件名加載DataFrame新文件。
請注意,如果你想要新文件附加到一個現有的表,您可以簡單地使用
模式(“追加”)選項當加載文件:
df_new.write.format(“δ”).mode(“追加”).saveAsTable (“my_table”)
這將對現有my_table表添加新數據。
@Nikhil Kumawat:
是的,你可以讓新來的文件的名稱使用filePaths DataFrame()方法,傳遞給筆記本。這個方法返回一個對應的路徑列表文件,添加了自上次觸發器。
下麵是一個示例代碼片段展示了如何得到新文件的名稱:
#從DataFrame獲取文件路徑列表file_paths = df.input_file_name() #獲得新文件的名稱new_file_path = file_paths [1] new_file_name = new_file_path.split(“/”)[1] #新文件加載到DataFrame df_new = (spark.read.format (csv) .option (“inferSchema”,真的).option(“標題”,真的).option (“9”, ", ") .load (new_file_path))
在這個代碼片段,df是DataFrame傳遞到筆記本的觸發器。input_file_name()方法返回一個包含文件路徑DataFrame列的每一行。通過調用file_paths[1],你可以得到最後的路徑(新來的)文件。分割(“/”)[1]調用提取文件名的路徑。最後,您可以使用該文件名加載DataFrame新文件。
請注意,如果你想要新文件附加到一個現有的表,您可以簡單地使用
模式(“追加”)選項當加載文件:
df_new.write.format(“δ”).mode(“追加”).saveAsTable (“my_table”)
這將對現有my_table表添加新數據。
由於@Suteja卡努裏人的回複。
一個問題,我怎麼能把這個dataframe觸發的工作?還是我丟失的東西。
我嚐試以下方法:
df = (spark.read.format (csv) .option (“inferSchema”,真的).option(“標題”,真的).option .load (“9”, ", ") (" s3: / < bucket_name > / <子文件夾> / "))
然後我試著使用input_file_name()方法在這個dataframe像下麵但它給了我錯誤說“沒有這樣的方法”:
讓我知道如果我遺漏了什麼東西?
我把筆記本供參考,引發的工作:
@Nikhil Kumawat:
似乎你想使用input_file_name DataFrame()函數,這是不可能的。input_file_name()函數是一個輸入文件元數據函數,可以使用隻有一個結構化流DataFrame正在處理的文件的名稱。
如果你想通過DataFrame觸發工作,你可以考慮寫DataFrame文件並把文件路徑作為參數傳遞給觸發的工作。這裏有一個例子如何編寫DataFrame CSV文件和通過文件路徑作為參數:
df = (spark.read.format (csv) .option (“inferSchema”,真的).option(“標題”,真的).option .load (“9”, ", ") (" s3: / < bucket_name > / <子文件夾> / "))#寫DataFrame csv文件output_path = " s3: / < bucket_name > / <子文件夾> /輸出。csv”df.write.format (csv)。選項(“標題”,真的).save (output_path) #文件路徑作為參數傳遞給工作引發trigger_args = {“output_path”: output_path} #觸發響應=客戶的工作參數。start_trigger (Name = < trigger_name >,參數= trigger_args)
在上麵的例子中,我們在寫DataFrame CSV文件使用DataFrame寫()方法,傳遞output_path輸出文件路徑。然後我們創建一個字典trigger_args參數名稱和值通過觸發器的工作。最後,我們與start_trigger觸發工作()方法的AWS膠客戶,通過觸發器名稱和參數。