開始
加載和管理數據
處理數據
政府
引用和資源
更新6月29日,2023年
給我們反饋
本文向您展示了如何創建和部署一個端到端的數據處理管道,包括如何攝取原始數據,轉換數據和運行分析處理數據。
請注意
盡管本文演示如何創建一個完整的數據管道使用磚<一個class="reference internal" href="//www.eheci.com/docs/notebooks/index.html">筆記本電腦和一個磚<一個class="reference internal" href="//www.eheci.com/docs/workflows/index.html">工作編排工作流,磚推薦使用<一個class="reference internal" href="//www.eheci.com/docs/delta-live-tables/index.html">三角洲生活表聲明接口為構建可靠、可維護、可測試的數據處理管道。
一個數據管道實現所需的步驟,將數據從源係統,基於需求的數據轉換,將數據存儲在一個目標係統。一個數據管道包括所有必要的流程將原始數據轉化為準備數據,用戶可以使用。例如,一個數據管道可能準備數據,數據分析師和數據科學家可以從數據中提取價值分析和報告。
提取、轉換和加載(ETL)流程是一種常見的數據管道。在ETL處理中,數據從源係統攝取和書麵暫存區域,改變了基於需求(確保數據質量,徹底刪除記錄,等等),然後寫入到目標係統數據倉庫或數據等。
來幫助你開始構建數據管道磚,本文中的例子包括走過創建一個數據處理流程:
使用磚特性探索原始數據集。
創建一個磚筆記本攝取原始源數據和原始數據寫入目標表。
創建一個磚筆記本將原始的源數據,轉換後的數據寫入目標表。
創建一個磚筆記本查詢轉換後的數據。
自動化的數據管道磚的工作。
你登錄到磚和數據科學與工程的工作區。
你有<一個class="reference internal" href="//www.eheci.com/docs/clusters/clusters-manage.html">允許創建一個集群或<一個class="reference internal" href="//www.eheci.com/docs/clusters/clusters-manage.html">訪問一個集群。
(可選)發布表統一目錄,你必須創建一個<一個class="reference internal" href="//www.eheci.com/docs/data-governance/unity-catalog/create-catalogs.html">目錄和<一個class="reference internal" href="//www.eheci.com/docs/data-governance/unity-catalog/create-schemas.html">模式在統一目錄。
在這個例子中使用的數據集的一個子集<一個class="reference external" href="http://labrosa.ee.columbia.edu/millionsong/">百萬歌曲數據集,當代音樂特征和元數據的集合。這個數據是可用的<一個class="reference internal" href="//www.eheci.com/docs/dbfs/databricks-datasets.html">樣本數據集包括在你的磚工作區。
執行數據處理和分析在這個示例中,創建一個集群來提供所需的計算資源運行命令。
因為這個例子使用一個示例數據集存儲在DBFS和建議堅持表<一個class="reference internal" href="//www.eheci.com/docs/data-governance/unity-catalog/index.html">統一目錄,您將創建一個集群配置單用戶訪問模式。單個用戶集群提供了完全訪問DBFS同時啟用統一目錄的訪問。看到<一個class="reference internal" href="//www.eheci.com/docs/dbfs/unity-catalog.html">最佳實踐DBFS和統一目錄。
點擊計算在側邊欄。
在計算頁麵,點擊創建集群。
在新的集群頁麵,輸入惟一名稱的集群。
在訪問模式中,選擇分配。
在單用戶或服務主體訪問,請選擇您的用戶名。
保留剩餘的值默認狀態,然後單擊創建集群。
更多地了解磚集群,明白了<一個class="reference internal" href="//www.eheci.com/docs/clusters/index.html">集群。
學習如何使用磚接口來探索原始源數據,看看<一個class="reference internal" href="//www.eheci.com/docs/getting-started/data-pipeline-explore-data.html">探索數據管道的源數據。如果你想去直接攝取和準備數據,繼續<一個class="reference internal" href="//www.eheci.com/docs/getting-started/#ingest-prepare-data">步驟3:攝取原始數據。
在這個步驟中,您的原始數據加載到一個表,使其可用於進一步的處理。等磚平台來管理數據資產表、磚建議Beplay体育安卓版本<一個class="reference internal" href="//www.eheci.com/docs/data-governance/unity-catalog/index.html">統一目錄。然而,如果你沒有權限創建所需的目錄和模式發布表統一目錄,你仍然可以完成以下步驟通過發布蜂巢metastore表。
攝取數據,數據磚推薦使用<一個class="reference internal" href="//www.eheci.com/docs/ingestion/auto-loader/index.html">自動加載程序。自動加載程序自動檢測和過程到達雲對象存儲新文件。
您可以配置自動加載程序加載數據的自動檢測模式,允許您初始化表沒有顯式地聲明數據模式和發展介紹了新列的表模式。這消除了需要手動跟蹤和應用模式會隨著時間而改變。磚建議模式推理在使用自動加載程序。然而,當看到在數據探索一步,歌曲的數據不包含標題信息。因為頭沒有存儲的數據,你需要明確定義的模式,在下一個示例中所示。
在側邊欄中,單擊新並選擇筆記本從菜單中。的創建筆記本對話框出現了。
例如,輸入一個名稱的筆記本攝取歌曲數據。默認情況下:
攝取歌曲數據
Python是所選的語言。
筆記本連接到最後您使用集群。在這種情況下,集群中創建<一個class="reference internal" href="//www.eheci.com/docs/getting-started/#create-a-cluster">步驟1:創建一個集群。
點擊創建。
輸入以下的第一個細胞筆記本:
從pyspark.sql.types進口倍增式,IntegerType,StringType,StructType,StructField#定義變量中使用以下代碼file_path=“/ databricks-datasets /歌曲/數據- 001 /”table_name=“<表名稱>”checkpoint_path=“/ tmp / pipeline_get_started / _checkpoint / song_data”模式=StructType((StructField(“artist_id”,StringType(),真正的),StructField(“artist_lat”,倍增式(),真正的),StructField(“artist_long”,倍增式(),真正的),StructField(“artist_location”,StringType(),真正的),StructField(“artist_name”,StringType(),真正的),StructField(“持續時間”,倍增式(),真正的),StructField(“end_of_fade_in”,倍增式(),真正的),StructField(“關鍵”,IntegerType(),真正的),StructField(“key_confidence”,倍增式(),真正的),StructField(“響度”,倍增式(),真正的),StructField(“發布”,StringType(),真正的),StructField(“song_hotnes”,倍增式(),真正的),StructField(“song_id”,StringType(),真正的),StructField(“start_of_fade_out”,倍增式(),真正的),StructField(“節奏”,倍增式(),真正的),StructField(“time_signature”,倍增式(),真正的),StructField(“time_signature_confidence”,倍增式(),真正的),StructField(“標題”,StringType(),真正的),StructField(“年”,IntegerType(),真正的),StructField(“partial_sequence”,IntegerType(),真正的)])(火花。readStream。格式(“cloudFiles”)。模式(模式)。選項(“cloudFiles.format”,“csv”)。選項(“9”,”\ t”)。負載(file_path)。writeStream。選項(“checkpointLocation”,checkpoint_path)。觸發(availableNow=真正的)。toTable(table_name))
如果您使用的是統一目錄,替換<表名稱>與目錄、模式和表名控製攝入記錄(例如,data_pipelines.songs_data.raw_song_data)。否則,取代<表名稱>與一個表的名稱包含攝取記錄,例如,raw_song_data。
<表名稱>
data_pipelines.songs_data.raw_song_data
raw_song_data
取代< checkpoint-path >的路徑的目錄DBFS維持檢查點文件,例如,/ tmp / pipeline_get_started / _checkpoint / song_data。
< checkpoint-path >
/ tmp / pipeline_get_started / _checkpoint / song_data
點擊,並選擇運行單元。這個例子定義了使用信息從數據模式自述、攝食的歌曲中包含的所有文件的數據file_path,並將數據寫入指定的表table_name。
自述
file_path
table_name
準備原始數據進行分析,以下步驟,將原始的歌曲數據過濾掉不需要的列和添加一個新的字段,其中包含一個時間戳的創建新記錄。
輸入一個名稱的筆記本。例如,準備歌曲數據。改變默認的語言SQL。
準備歌曲數據
第一個單元格輸入以下的筆記本:
創建或取代表<表- - - - - -的名字>(artist_id字符串,artist_name字符串,持續時間雙,釋放字符串,節奏雙,time_signature雙,標題字符串,一年雙,processed_time時間戳);插入成<表- - - - - -的名字>選擇artist_id,artist_name,持續時間,釋放,節奏,time_signature,標題,一年,current_timestamp()從<生- - - - - -歌曲- - - - - -表- - - - - -的名字>
如果您使用的是統一目錄,替換<表名稱>與目錄、模式和表名包含過濾和轉換記錄(例如,data_pipelines.songs_data.prepared_song_data)。否則,取代<表名稱>與一個表的名稱包含過濾和轉換記錄(例如,prepared_song_data)。
data_pipelines.songs_data.prepared_song_data
prepared_song_data
取代< raw-songs-table-name >包含原始的歌曲的名字表記錄前麵步驟中攝取。
< raw-songs-table-name >
點擊,並選擇運行單元。
在這個步驟中,您將通過添加查詢分析歌曲數據處理管道。這些查詢使用前麵步驟中創建的準備記錄。
輸入一個名稱的筆記本。例如,分析歌曲數據。改變默認的語言SQL。
分析歌曲數據
——藝術家每年發布的大部分歌曲嗎?選擇artist_name,數(artist_name)作為num_songs,一年從<準備- - - - - -歌曲- - - - - -表- - - - - -的名字>在哪裏一年>0集團通過artist_name,一年訂單通過num_songsDESC,一年DESC
取代< prepared-songs-table-name >包含準備數據的表的名稱。例如,data_pipelines.songs_data.prepared_song_data。
< prepared-songs-table-name >
點擊在細胞操作菜單,選擇添加下麵的細胞並輸入以下新細胞:
——為你的DJ列表找到歌曲選擇artist_name,標題,節奏從<準備- - - - - -歌曲- - - - - -表- - - - - -的名字>在哪裏time_signature=4和節奏之間的One hundred.和140年;
取代< prepared-songs-table-name >準備表的名稱在上一步中創建的。例如,data_pipelines.songs_data.prepared_song_data。
運行查詢和查看輸出,點擊運行所有。
您可以創建一個工作流自動化運行數據攝入,處理和分析步驟使用磚的工作。
在你的數據科學與工程工作區,做以下之一:
點擊工作流在側邊欄,然後單擊。
在側邊欄中,單擊新並選擇工作。
在任務對話框上任務選項卡中,取代添加一個名稱為你的工作…對你的工作名稱。例如,“歌工作流”。
在任務名稱第一個任務,輸入一個名稱,例如,Ingest_songs_data。
Ingest_songs_data
在類型,選擇筆記本任務類型。
在源中,選擇工作空間。
使用文件瀏覽器找到數據攝入筆記本,點擊筆記本名稱,點擊確認。
在集群中,選擇Shared_job_cluster或集群中創建創建一個集群的一步。
創建一個集群
點擊您剛才創建的任務並選擇以下筆記本。
在任務名稱任務,輸入一個名稱,例如,Prepare_songs_data。
Prepare_songs_data
使用文件瀏覽器找到數據準備筆記本,點擊筆記本名稱,點擊確認。
在任務名稱任務,輸入一個名稱,例如,Analyze_songs_data。
Analyze_songs_data
使用文件瀏覽器找到數據分析筆記本,點擊筆記本名稱,點擊確認。
工作流運行,點擊。查看<一個class="reference internal" href="//www.eheci.com/docs/workflows/jobs/monitor-job-runs.html">細節的運行,單擊鏈接開始時間列的運行<一個class="reference internal" href="//www.eheci.com/docs/workflows/jobs/monitor-job-runs.html">工作運行視圖。單擊每個任務視圖細節任務運行。
查看結果當工作流程完成後,點擊最後的數據分析任務。的輸出頁麵出現並顯示查詢結果。
演示使用磚的工作安排計劃的工作流程,這個開始的例子將攝入,準備,和分析步驟為獨立的筆記本,然後每個筆記本是用來創建一個任務的工作。如果所有的處理都包含在一個筆記本電腦,您可以輕鬆地安排直接從磚筆記本筆記本UI。看到<一個class="reference internal" href="//www.eheci.com/docs/notebooks/schedule-notebook-jobs.html">創建和管理計劃的筆記本工作。
一個常見需求是運行一個數據管道計劃的基礎上。定義一個時間表運行管道的工作:
點擊工作流在側邊欄。
在的名字列,單擊工作名稱。側板顯示工作細節。
點擊添加觸發器在工作細節麵板並選擇計劃在觸發類型。
指定時間、起始時間和時區。可選的選擇顯示Cron語法複選框來顯示和編輯的時間表<一個class="reference external" href="http://www.quartz-scheduler.org/documentation/quartz-2.3.0/tutorials/crontrigger.html">石英Cron語法。
點擊保存。
更多地了解磚筆記本,看到的<一個class="reference internal" href="//www.eheci.com/docs/notebooks/index.html">介紹磚筆記本。
了解更多關於磚工作,明白了<一個class="reference internal" href="//www.eheci.com/docs/workflows/index.html">磚的工作是什麼?。
了解更多關於三角洲湖,看到<一個class="reference internal" href="//www.eheci.com/docs/delta/index.html">三角洲湖是什麼?。
學習更多關於數據處理管道與達美住表,看看<一個class="reference internal" href="//www.eheci.com/docs/delta-live-tables/index.html">δ生活是什麼表?。