教程:連續攝入數據到三角洲湖與自動加載器
持續的增量數據攝取是一種常見的需求。例如,從手機遊戲到電子商務網站再到物聯網傳感器的應用程序都會產生連續的數據流。分析師希望獲得最新的數據,但由於以下幾個原因,實現起來可能具有挑戰性:
您可能需要在數據到達時進行轉換和攝取,同時隻處理一次文件。
您可能希望在向表寫入之前強製執行模式。這種邏輯編寫和維護起來可能很複雜。
處理模式隨時間變化的數據具有挑戰性。例如,您必須決定如何處理有數據質量問題的輸入行,以及在解決了原始數據的問題後如何重新處理這些行。
可伸縮的解決方案——每分鍾處理數千或數百萬個文件——需要集成諸如事件通知、消息隊列和觸發器等雲服務,這增加了開發的複雜性和長期維護。
構建一個持續的、具有成本效益的、可維護的和可擴展的數據轉換和攝入係統並非易事。磚提供自動加載程序作為一個內置的、優化的解決方案,它解決了上述問題,並為數據團隊提供了一種從雲對象存儲中以更低的成本和延遲加載原始數據的方法。Auto Loader自動配置和監聽新文件的通知服務,可以擴展到每秒數百萬個文件。它還關注常見問題,如模式推斷和模式演化。要了解更多,請參見自動加載程序.
在本教程中,您將使用Auto Loader將數據增量地攝取(加載)到Delta表中。
步驟1。創建示例數據
在此步驟中,您將創建一個筆記本在你的工作空間。在這個筆記本中,您每30秒在工作空間中運行生成一個以逗號分隔的隨機文件的代碼。每個文件都包含一組隨機數據。
請注意
自動加載器還工作與以下格式的數據:Avro,二進製,CSV, JSON, ORC, Parquet,和文本。
在您的工作區中,在側邊欄中單擊創建>筆記本.
在創建筆記本例如,在對話框中為筆記本輸入一個名稱
假的數據發電機
.為默認的語言中,選擇Python.
為集群,選擇您在Requirements部分創建的集群,或者選擇您想要使用的另一個可用的集群。
點擊創建.
在筆記本的菜單欄中,如果集群名稱旁邊的圓圈不包含綠色複選標記,請單擊集群名稱旁邊的下拉箭頭,然後單擊啟動集群.點擊確認,然後等待,直到圓圈包含綠色複選標記。
在筆記本的第一個單元格中,粘貼以下代碼:
進口csv進口uuid進口隨機進口時間從pathlib進口路徑數=0路徑=“/ tmp / generated_raw_csv_data”路徑(路徑).mkdir(父母=真正的,exist_ok=真正的)而真正的:row_list=[[“id”,“x_axis”,“y_axis”),[uuid.uuid4(),隨機.用於產生基質的均勻分布的隨機整數(-One hundred.,One hundred.),隨機.用於產生基質的均勻分布的隨機整數(-One hundred.,One hundred.)),[uuid.uuid4(),隨機.用於產生基質的均勻分布的隨機整數(-One hundred.,One hundred.),隨機.用於產生基質的均勻分布的隨機整數(-One hundred.,One hundred.)),[uuid.uuid4(),隨機.用於產生基質的均勻分布的隨機整數(-One hundred.,One hundred.),隨機.用於產生基質的均勻分布的隨機整數(-One hundred.,One hundred.)]]file_location=f'{路徑}/ file_{數}. csv”與開放(file_location,' w ',換行符=”)作為文件:作家=csv.作家(文件)作家.writerows(row_list)文件.關閉()數+ =1dbutils.fs.mv(f的文件:{file_location}',f“dbfs:{file_location}')時間.睡眠(30.)打印(f'在dbfs中創建新的CSV文件:{file_location}.內容:“)與開放(f' / dbfs{file_location}',“r”)作為文件:讀者=csv.讀者(文件,分隔符=' ')為行在讀者:打印(”、“.加入(行))文件.關閉()
上述代碼執行以下操作:
創建目錄為
/ tmp / generated_raw_csv_data
在您的工作區中,如果目錄還不存在。提示
如果此路徑已經存在於您的工作空間中,因為有人運行了本教程,那麼您可能希望首先清除此路徑中的所有現有文件。
創建一組隨機數據,例如:
Id,x_axis,y_axis d033faf3-b6bd- 4bc -83a4-43a37ce7e994,88,-13 fde2bdb6-b0a1-41c2-9650-35af717549ca,-96,19 297a2dfe-99de-4c52-8310-b24bc2f83874,-23,43
30秒後,創建一個名為
file_ <數字> . csv
,將隨機數據集寫入文件,將文件存儲在dbfs: / tmp / generated_raw_csv_data
,並報告文件的路徑及其內容。<數>
起價0
並在每次創建文件時加1(例如,file_0.csv
,file_1.csv
,等等)。
在筆記本的菜單欄中,單擊運行所有.讓這個筆記本繼續運行。
請注意
要查看生成的文件列表,請在側邊欄中單擊數據.點擊DBFS,根據提示選擇集群,單擊tmp > generated_raw_csv_data.
步驟2:運行Auto Loader
在這一步中,您使用Auto Loader從您的工作區中的一個位置連續讀取原始數據,然後流將數據轉移到同一個工作區中另一個位置的Delta表中。
在側邊欄中,單擊創建>筆記本.
在創建筆記本例如,在對話框中為筆記本輸入一個名稱
汽車加載程序演示
.為默認的語言中,選擇Python.
為集群,選擇您在Requirements部分創建的集群,或者選擇您想要使用的另一個可用的集群。
點擊創建.
在筆記本的菜單欄中,如果集群名稱旁邊的圓圈不包含綠色複選標記,請單擊集群名稱旁邊的下拉箭頭,然後單擊啟動集群.點擊確認,然後等待,直到圓圈包含綠色複選標記。
在筆記本的第一個單元格中,粘貼以下代碼:
raw_data_location=“dbfs: / tmp / generated_raw_csv_data”target_delta_table_location=“dbfs: / tmp /表/坐標”schema_location=“dbfs: / tmp / auto_loader /模式”checkpoint_location=“dbfs: / tmp / auto_loader /檢查站”
這段代碼在您的工作空間中定義了到原始數據和目標Delta表的路徑、到表模式的路徑以及到Auto Loader寫入位置的路徑檢查點文件Delta Lake交易日誌中的信息。檢查點使Auto Loader隻處理新的傳入數據,並跳過已經處理過的任何現有數據。
提示
如果這些路徑中的任何一個已經存在於您的工作空間中,因為有人運行了本教程,那麼您可能需要首先清除這些路徑中的所有現有文件。
在光標仍然位於第一個單元格的情況下,運行該單元格。(要運行單元格,請按Shift+Enter。)Databricks將指定的路徑讀入內存。
在第一個單元格下麵添加一個單元格,如果它還沒有。(若要添加單元格,請將鼠標指針放置在單元格的底部邊緣,然後單擊+圖標。)在第二個單元格中,粘貼以下代碼(注意
cloudFiles
代表了自動加載程序):流=火花.readStream\.格式(“cloudFiles”)\.選項(“cloudFiles.format”,“csv”)\.選項(“頭”,“真正的”)\.選項(“cloudFiles.schemaLocation”,schema_location)\.負載(raw_data_location)
運行這個細胞。
在筆記本的第三單元格中,粘貼以下代碼:
顯示(流)
運行這個細胞。中的自動加載程序開始處理已有的CSV文件
raw_data_location
以及到達該位置的任何傳入CSV文件。Auto Loader處理每個CSV文件時,使用文件中的第一行作為字段名,其餘行作為字段數據。Databricks在Auto Loader處理數據時顯示數據。在筆記本的第四個單元格中,粘貼以下代碼:
流.writeStream\.選項(“checkpointLocation”,checkpoint_location)\.開始(target_delta_table_location)
運行這個細胞。自動加載程序將數據寫入Delta表
target_data_table_location
.自動加載器也寫入檢查點文件信息checkpoint_location
.
步驟3:發展和加強數據模式
如果數據的模式隨時間變化會發生什麼?例如,如果您想改進字段數據類型,以便將來能夠更好地執行數據質量問題,並使對數據的計算更容易,該怎麼辦呢?在這一步中,你進化您的數據允許的數據類型,然後您執行輸入數據上的這個模式。
記住要保持第1步開始的記事本運行狀態,以使用新生成的示例文件維護數據流。
從第二步開始停止使用筆記本。(要停止筆記本,單擊停止執行在筆記本的菜單欄。)
在第2步的筆記本中,替換第4單元格的內容(以
stream.writeStream
)及以下程式碼:流.printSchema()
運行筆記本上所有的單元格。(要運行所有單元格,請單擊運行所有在筆記本的菜單欄。)Databricks打印數據的模式,該模式將所有字段顯示為字符串。讓我們發展
x_axis
而且y_axis
整數字段。停止筆記本。
替換第二個單元格的內容(以
流=spark.readStream
)及以下程式碼:流=火花.readStream\.格式(“cloudFiles”)\.選項(“cloudFiles.format”,“csv”)\.選項(“頭”,“真正的”)\.選項(“cloudFiles.schemaLocation”,schema_location)\.選項(“cloudFiles.schemaHints”,"x_axis integer, y_axis integer""")\.負載(raw_data_location)
運行筆記本上所有的單元格。Databricks打印數據的新模式,該模式顯示
x_axis
而且y_axis
列整數。現在讓我們通過使用這個新模式來加強數據質量。停止筆記本。
用以下代碼替換第二個單元格的內容:
從pyspark.sql.types進口StructType,StructField,StringType,IntegerType模式=StructType([StructField(“id”,StringType(),真正的),StructField(“x_axis”,IntegerType(),真正的),StructField(“y_axis”,IntegerType(),真正的)])流=火花.readStream\.格式(“cloudFiles”)\.選項(“cloudFiles.format”,“csv”)\.選項(“頭”,“真正的”)\.選項(“cloudFiles.schemaLocation”,schema_location)\.模式(模式)\.負載(raw_data_location)
運行筆記本上所有的單元格。自動加載器現在使用它模式推理和進化邏輯確定如何處理與新模式不匹配的傳入數據。
第四步:清理
在完成本教程之後,如果不再希望保留Databricks資源,則可以清理工作空間中的相關Databricks資源。
刪除的數據
停止這兩個筆記本。(要打開筆記本,請在側邊欄中單擊工作空間>用戶>你的用戶名,然後點擊筆記本。)
在第1步的筆記本中,在第一個單元格之後添加一個單元格,並將以下代碼粘貼到第二個單元格中。
dbutils.fs.rm(“dbfs: / tmp / generated_raw_csv_data”,真正的)dbutils.fs.rm(“dbfs: / tmp /表”,真正的)dbutils.fs.rm(“dbfs: / tmp / auto_loader”,真正的)
警告
如果您在這些位置有任何其他信息,這些信息也將被刪除!
運行單元。Databricks刪除包含原始數據、Delta表、表的模式和Auto Loader檢查點信息的目錄。
額外的資源
自動加載程序技術文檔
利用文件通知對於更大的數據量
一站式服務的數據攝入按需網絡研討會係列