教程:連續攝入數據到三角洲湖與自動加載器

持續的增量數據攝取是一種常見的需求。例如,從手機遊戲到電子商務網站再到物聯網傳感器的應用程序都會產生連續的數據流。分析師希望獲得最新的數據,但由於以下幾個原因,實現起來可能具有挑戰性:

  • 您可能需要在數據到達時進行轉換和攝取,同時隻處理一次文件。

  • 您可能希望在向表寫入之前強製執行模式。這種邏輯編寫和維護起來可能很複雜。

  • 處理模式隨時間變化的數據具有挑戰性。例如,您必須決定如何處理有數據質量問題的輸入行,以及在解決了原始數據的問題後如何重新處理這些行。

  • 可伸縮的解決方案——每分鍾處理數千或數百萬個文件——需要集成諸如事件通知、消息隊列和觸發器等雲服務,這增加了開發的複雜性和長期維護。

構建一個持續的、具有成本效益的、可維護的和可擴展的數據轉換和攝入係統並非易事。磚提供自動加載程序作為一個內置的、優化的解決方案,它解決了上述問題,並為數據團隊提供了一種從雲對象存儲中以更低的成本和延遲加載原始數據的方法。Auto Loader自動配置和監聽新文件的通知服務,可以擴展到每秒數百萬個文件。它還關注常見問題,如模式推斷和模式演化。要了解更多,請參見自動加載程序

在本教程中,您將使用Auto Loader將數據增量地攝取(加載)到Delta表中。

需求

  1. 一個Databricks帳戶,以及帳戶內的一個Databricks工作空間。要創建這些,請參見注冊免費試用

  2. 工作區中的通用集群。要創建一個,請參見創建一個集群

  3. 熟悉Databricks工作空間用戶界麵。看到在工作區中

步驟1。創建示例數據

在此步驟中,您將創建一個筆記本在你的工作空間。在這個筆記本中,您每30秒在工作空間中運行生成一個以逗號分隔的隨機文件的代碼。每個文件都包含一組隨機數據。

請注意

自動加載器還工作與以下格式的數據:Avro,二進製,CSV, JSON, ORC, Parquet,和文本。

  1. 在您的工作區中,在側邊欄中單擊創建>筆記本

  2. 創建筆記本例如,在對話框中為筆記本輸入一個名稱假的數據發電機

  3. 默認的語言中,選擇Python

  4. 集群,選擇您在Requirements部分創建的集群,或者選擇您想要使用的另一個可用的集群。

  5. 點擊創建

  6. 在筆記本的菜單欄中,如果集群名稱旁邊的圓圈不包含綠色複選標記,請單擊集群名稱旁邊的下拉箭頭,然後單擊啟動集群.點擊確認,然後等待,直到圓圈包含綠色複選標記。

  7. 在筆記本的第一個單元格中,粘貼以下代碼:

    進口csv進口uuid進口隨機進口時間pathlib進口路徑0路徑“/ tmp / generated_raw_csv_data”路徑路徑mkdir父母真正的exist_ok真正的真正的row_list“id”“x_axis”“y_axis”),uuiduuid4(),隨機用於產生基質的均勻分布的隨機整數-One hundred.One hundred.),隨機用於產生基質的均勻分布的隨機整數-One hundred.One hundred.)),uuiduuid4(),隨機用於產生基質的均勻分布的隨機整數-One hundred.One hundred.),隨機用於產生基質的均勻分布的隨機整數-One hundred.One hundred.)),uuiduuid4(),隨機用於產生基質的均勻分布的隨機整數-One hundred.One hundred.),隨機用於產生基質的均勻分布的隨機整數-One hundred.One hundred.)]file_locationf路徑/ file_. csv”開放file_location' w '換行符作為文件作家csv作家文件作家writerowsrow_list文件關閉()+ =1dbutilsfsmvf的文件:file_locationf“dbfs:file_location時間睡眠30.打印f'在dbfs中創建新的CSV文件:file_location.內容:“開放f' / dbfsfile_location“r”作為文件讀者csv讀者文件分隔符' '讀者打印”、“加入))文件關閉()

    上述代碼執行以下操作:

    1. 創建目錄為/ tmp / generated_raw_csv_data在您的工作區中,如果目錄還不存在。

      提示

      如果此路徑已經存在於您的工作空間中,因為有人運行了本教程,那麼您可能希望首先清除此路徑中的所有現有文件。

    2. 創建一組隨機數據,例如:

      Id,x_axis,y_axis d033faf3-b6bd- 4bc -83a4-43a37ce7e994,88,-13 fde2bdb6-b0a1-41c2-9650-35af717549ca,-96,19 297a2dfe-99de-4c52-8310-b24bc2f83874,-23,43
    3. 30秒後,創建一個名為file_ <數字> . csv,將隨機數據集寫入文件,將文件存儲在dbfs: / tmp / generated_raw_csv_data,並報告文件的路徑及其內容。<數>起價0並在每次創建文件時加1(例如,file_0.csvfile_1.csv,等等)。

  8. 在筆記本的菜單欄中,單擊運行所有.讓這個筆記本繼續運行。

    請注意

    要查看生成的文件列表,請在側邊欄中單擊數據.點擊DBFS,根據提示選擇集群,單擊tmp > generated_raw_csv_data

步驟2:運行Auto Loader

在這一步中,您使用Auto Loader從您的工作區中的一個位置連續讀取原始數據,然後將數據轉移到同一個工作區中另一個位置的Delta表中。

  1. 在側邊欄中,單擊創建>筆記本

  2. 創建筆記本例如,在對話框中為筆記本輸入一個名稱汽車加載程序演示

  3. 默認的語言中,選擇Python

  4. 集群,選擇您在Requirements部分創建的集群,或者選擇您想要使用的另一個可用的集群。

  5. 點擊創建

  6. 在筆記本的菜單欄中,如果集群名稱旁邊的圓圈不包含綠色複選標記,請單擊集群名稱旁邊的下拉箭頭,然後單擊啟動集群.點擊確認,然後等待,直到圓圈包含綠色複選標記。

  7. 在筆記本的第一個單元格中,粘貼以下代碼:

    raw_data_location“dbfs: / tmp / generated_raw_csv_data”target_delta_table_location“dbfs: / tmp /表/坐標”schema_location“dbfs: / tmp / auto_loader /模式”checkpoint_location“dbfs: / tmp / auto_loader /檢查站”

    這段代碼在您的工作空間中定義了到原始數據和目標Delta表的路徑、到表模式的路徑以及到Auto Loader寫入位置的路徑檢查點文件Delta Lake交易日誌中的信息。檢查點使Auto Loader隻處理新的傳入數據,並跳過已經處理過的任何現有數據。

    提示

    如果這些路徑中的任何一個已經存在於您的工作空間中,因為有人運行了本教程,那麼您可能需要首先清除這些路徑中的所有現有文件。

  8. 在光標仍然位於第一個單元格的情況下,運行該單元格。(要運行單元格,請按Shift+Enter。)Databricks將指定的路徑讀入內存。

  9. 在第一個單元格下麵添加一個單元格,如果它還沒有。(若要添加單元格,請將鼠標指針放置在單元格的底部邊緣,然後單擊+圖標。)在第二個單元格中,粘貼以下代碼(注意cloudFiles代表了自動加載程序):

    火花readStream格式“cloudFiles”選項“cloudFiles.format”“csv”選項“頭”“真正的”選項“cloudFiles.schemaLocation”schema_location負載raw_data_location
  10. 運行這個細胞。

  11. 在筆記本的第三單元格中,粘貼以下代碼:

    顯示
  12. 運行這個細胞。中的自動加載程序開始處理已有的CSV文件raw_data_location以及到達該位置的任何傳入CSV文件。Auto Loader處理每個CSV文件時,使用文件中的第一行作為字段名,其餘行作為字段數據。Databricks在Auto Loader處理數據時顯示數據。

  13. 在筆記本的第四個單元格中,粘貼以下代碼:

    writeStream選項“checkpointLocation”checkpoint_location開始target_delta_table_location
  14. 運行這個細胞。自動加載程序將數據寫入Delta表target_data_table_location.自動加載器也寫入檢查點文件信息checkpoint_location

步驟3:發展和加強數據模式

如果數據的模式隨時間變化會發生什麼?例如,如果您想改進字段數據類型,以便將來能夠更好地執行數據質量問題,並使對數據的計算更容易,該怎麼辦呢?在這一步中,你進化您的數據允許的數據類型,然後您執行輸入數據上的這個模式。

記住要保持第1步開始的記事本運行狀態,以使用新生成的示例文件維護數據流。

  1. 從第二步開始停止使用筆記本。(要停止筆記本,單擊停止執行在筆記本的菜單欄。)

  2. 在第2步的筆記本中,替換第4單元格的內容(以stream.writeStream)及以下程式碼:

    printSchema()
  3. 運行筆記本上所有的單元格。(要運行所有單元格,請單擊運行所有在筆記本的菜單欄。)Databricks打印數據的模式,該模式將所有字段顯示為字符串。讓我們發展x_axis而且y_axis整數字段。

  4. 停止筆記本。

  5. 替換第二個單元格的內容(以spark.readStream)及以下程式碼:

    火花readStream格式“cloudFiles”選項“cloudFiles.format”“csv”選項“頭”“真正的”選項“cloudFiles.schemaLocation”schema_location選項“cloudFiles.schemaHints”"x_axis integer, y_axis integer"""負載raw_data_location
  6. 運行筆記本上所有的單元格。Databricks打印數據的新模式,該模式顯示x_axis而且y_axis列整數。現在讓我們通過使用這個新模式來加強數據質量。

  7. 停止筆記本。

  8. 用以下代碼替換第二個單元格的內容:

    pyspark.sql.types進口StructTypeStructFieldStringTypeIntegerType模式StructType([StructField“id”StringType(),真正的),StructField“x_axis”IntegerType(),真正的),StructField“y_axis”IntegerType(),真正的])火花readStream格式“cloudFiles”選項“cloudFiles.format”“csv”選項“頭”“真正的”選項“cloudFiles.schemaLocation”schema_location模式模式負載raw_data_location
  9. 運行筆記本上所有的單元格。自動加載器現在使用它模式推理和進化邏輯確定如何處理與新模式不匹配的傳入數據。

第四步:清理

在完成本教程之後,如果不再希望保留Databricks資源,則可以清理工作空間中的相關Databricks資源。

刪除的數據

  1. 停止這兩個筆記本。(要打開筆記本,請在側邊欄中單擊工作空間>用戶>你的用戶名,然後點擊筆記本。)

  2. 在第1步的筆記本中,在第一個單元格之後添加一個單元格,並將以下代碼粘貼到第二個單元格中。

    dbutilsfsrm“dbfs: / tmp / generated_raw_csv_data”真正的dbutilsfsrm“dbfs: / tmp /表”真正的dbutilsfsrm“dbfs: / tmp / auto_loader”真正的

    警告

    如果您在這些位置有任何其他信息,這些信息也將被刪除!

  3. 運行單元。Databricks刪除包含原始數據、Delta表、表的模式和Auto Loader檢查點信息的目錄。

刪除筆記本

  1. 在側邊欄中,單擊工作空間>用戶>你的用戶名

  2. 單擊第一個筆記本旁邊的下拉箭頭,然後單擊移動到垃圾

  3. 點擊確認並移動到垃圾處理

  4. 在第二個筆記本上重複步驟1 - 3。

停止集群

如果您沒有將該集群用於任何其他任務,您應該停止它,以避免額外的成本。

  1. 在側邊欄中,單擊計算

  2. 單擊集群的名稱。

  3. 點擊終止

  4. 點擊確認