我的道歉,我讀它錯誤的最初。
我將使用你的用例複製到這隻會加載的文件還沒有處理。您可以使用結構化流這樣做或磚自動裝卸機但這些會更複雜。
對於結構化流可以使用“.trigger(一旦= True)”使用流API作為一個批處理過程。你會使用檢查點位置寫跟蹤哪些文件已經處理。
與自動裝卸機可以使用“文件清單”選項來確定哪些文件已經被使用。你仍然想要使用.trigger(一次= True)參數。
下麵是例子如何使用複製命令:
#複製成δ通過提供一個文件位置複製到delta. abfss: / /(電子郵件保護)/ deltaTables /目標”(選擇_c0::長整型數字鍵,_c1:: int指數,_c2 textData從“abfss: / /(電子郵件保護)/基地/路徑”)FILEFORMAT = CSV模式= ' folder1 / file_ [g]。csv ' #複製到三角洲通過提供一個表,但必須現有三角洲表你創建它首先創建表目標(_c0長,_c1整數,_c2字符串)使用三角洲複製到target_table從“abfss: / /(電子郵件保護)/基地/路徑' FILEFORMAT = CSV模式= ' folder1 / file_ [g] . CSV”
什麼類型的文件?文件存儲在一個存儲賬戶嗎?
通常,你會讀和寫數據與類似下麵的代碼:
#讀取鋪文件df = spark.read.format(“鋪”).load(“/道路/ /文件”)#作為一個文件寫數據df.write.format(“δ”).save(路徑“/ / /δ/表”)#寫作為管理的數據表df.write.format .saveAsTable(“δ”)(“table_name”)
請參考這個文檔對於一些更多的信息。
謝謝你們的反饋@Ryan Chynoweth
例如,假設情況:
time1——我有一些CSV文件降落在我的hdfs目錄(/ file1著陸。csv、著陸/ file2.csv)
time2——我批PySpark讀hdfs著陸目錄和寫在hdfs青銅目錄(青銅/);
曆史問題——新的CSV文件到達hdfs著陸目錄(/ file3著陸。csv、著陸/ file4.csv)
time4——在這一點上批PySpark隻需要讀取新文件(/ file3著陸。csv、著陸/ file4.csv)附加到和尚hdfs目錄(青銅/)
在na流(WriteStream)“checkpointLocation”選項,但在na批嗎?我需要開發一個python控製這種情況嗎?
你能理解嗎?
嘖嘖
我的道歉,我讀它錯誤的最初。
我將使用你的用例複製到這隻會加載的文件還沒有處理。您可以使用結構化流這樣做或磚自動裝卸機但這些會更複雜。
對於結構化流可以使用“.trigger(一旦= True)”使用流API作為一個批處理過程。你會使用檢查點位置寫跟蹤哪些文件已經處理。
與自動裝卸機可以使用“文件清單”選項來確定哪些文件已經被使用。你仍然想要使用.trigger(一次= True)參數。
下麵是例子如何使用複製命令:
#複製成δ通過提供一個文件位置複製到delta. abfss: / /(電子郵件保護)/ deltaTables /目標”(選擇_c0::長整型數字鍵,_c1:: int指數,_c2 textData從“abfss: / /(電子郵件保護)/基地/路徑”)FILEFORMAT = CSV模式= ' folder1 / file_ [g]。csv ' #複製到三角洲通過提供一個表,但必須現有三角洲表你創建它首先創建表目標(_c0長,_c1整數,_c2字符串)使用三角洲複製到target_table從“abfss: / /(電子郵件保護)/基地/路徑' FILEFORMAT = CSV模式= ' folder1 / file_ [g] . CSV”
wowwwww沒錯@Ryan Chynoweth,我可以使用“一旦= True”流API
非常感謝你的人