我有問題想攝取自動裝卸機作為一個批處理成dataframe。它主要用於直接寫一個表或流。我認為最好的方法是自動裝載到青銅然後做一個火花。讀到dataframe變換,然後寫/插入與spark.sql表
你可以添加一個列,給它一個價值的天日期競選新添加的數據與selectExpr在自動裝卸機()函數。Itd這個樣子……
從pyspark.sql。功能導入current_timestamp spark.readStream.format (cloudFiles) \ .option (“cloudFiles。形式at", "json") \ # The schema location directory keeps track of your data schema over time .option("cloudFiles.schemaLocation", "") \ .load("") \ .selectExpr( "*", "current_timestamp() as `Date_Pulled`", )
自動裝卸機跟蹤文件所以它隻讀取一次,防止重複。如果你做一個計數之前和之後自動裝卸機每次你隻看到它添加新數據。現在你有@timestamp列?我不確定你的邏輯是什麼樣子的管道,但如果你有一個時間戳或date_pulled列可以過濾管道塔爾查詢的數據不存在在下表中管道通過檢查它在過去的時間戳/ date_pilled數據。但是如果你隻是抓住所有的數據到dataframe你可以做一個插入新表,更新現有記錄(如果你想)和插入新的。我隻能猜測你的邏輯是什麼樣子雖然沒有更多的信息