把數據輸入三角洲湖
Databricks提供多種方式幫助您將數據攝取到Delta Lake。
上傳CSV文件
方法可以安全地從CSV文件創建表在Databricks SQL中創建表UI.
合作夥伴集成
Databricks的合作夥伴集成使您能夠輕鬆地將數據加載到Databricks中。這些集成支持從各種來源將低代碼、易於實現和可伸縮的數據攝取到Databricks。看到磚的集成.
複製成
SQL命令
的複製成
SQL命令允許您將數據從文件位置加載到Delta表中。這是一個可重試的冪等運算;源位置中已經加載的文件將被跳過。
使用複製成
SQL命令代替自動加載程序當:
您希望從包含數千或更少文件的文件位置加載數據。
您的數據模式不需要頻繁地演變。
您計劃加載以前上傳的文件的子集。
的簡要概述和演示複製成
SQL命令以及自動加載程序在本文後麵,觀看這個YouTube視頻(2分鍾)。
下麵的示例演示如何創建Delta表,然後使用複製成
裝入樣本數據的SQL命令樣本數據集(databricks-datasets)到桌子上。您可以運行示例Python、R、Scala或SQL代碼筆記本附在數據庫上集群.還可以從查詢關聯到一個SQL倉庫在磚的SQL.
table_name=“default.loan_risks_upload”source_data=/ databricks-datasets / learning-spark-v2 /貸款/ loan-risks.snappy.parquet 'source_format=“鋪”火花.sql("刪除表如果存在"+table_name)火花.sql(“CREATE TABLE”+table_name+”(“\“loan_id BIGINT。”+\“funded_amnt INT。”+\“paid_amnt加倍,”+\“addr_state字符串)”)火花.sql(“複製到”+table_name+\“從”+source_data+“”+\" Fileformat = "+source_format)loan_risks_upload_data=火花.sql(" select * from "+table_name)顯示(loan_risks_upload_data)“‘結果:+---------+-------------+-----------+------------+| loan_id | funded_amnt | paid_amnt | addr_state |+=========+=============+===========+============+| 0 | 1000 | 182.22 | ca |+---------+-------------+-----------+------------+| 1 | 1000 | 361.19 | wa |+---------+-------------+-----------+------------+| 2 | 1000 | 176.26 | tx |+---------+-------------+-----------+------------+...“‘
圖書館(SparkR)sparkR.session()table_name=“default.loan_risks_upload”source_data=“/ databricks-datasets / learning-spark-v2 /貸款/ loan-risks.snappy.parquet”source_format=“鋪”sql(粘貼("刪除表如果存在",table_name,9月=""))sql(粘貼(“CREATE TABLE”,table_name,”(“,“loan_id BIGINT。”,“funded_amnt INT。”,“paid_amnt加倍,”,“addr_state字符串)”,9月=""))sql(粘貼(“複製到”,table_name,“從”,source_data,“”," Fileformat = ",source_format,9月=""))loan_risks_upload_data=tableToDF(table_name)顯示(loan_risks_upload_data)結果:# +---------+-------------+-----------+------------+# | loan_id | funded_amnt | paid_amnt | addr_state |# +=========+=============+===========+============+# | 0 | 1000 | 182.22 | ca |# +---------+-------------+-----------+------------+# | 1 | 1000 | 361.19 | wa |# +---------+-------------+-----------+------------+# | 2 | 1000 | 176.26 | tx |# +---------+-------------+-----------+------------+#……
瓦爾table_name=“default.loan_risks_upload”瓦爾source_data=“/ databricks-datasets / learning-spark-v2 /貸款/ loan-risks.snappy.parquet”瓦爾source_format=“鋪”火花.sql("刪除表如果存在"+table_name)火花.sql(“CREATE TABLE”+table_name+”(“+“loan_id BIGINT。”+“funded_amnt INT。”+“paid_amnt加倍,”+“addr_state字符串)”)火花.sql(“複製到”+table_name+“從”+source_data+“”+" Fileformat = "+source_format)瓦爾loan_risks_upload_data=火花.表格(table_name)顯示(loan_risks_upload_data)/*結果:+---------+-------------+-----------+------------+| loan_id | funded_amnt | paid_amnt | addr_state |+=========+=============+===========+============+| 0 | 1000 | 182.22 | ca |+---------+-------------+-----------+------------+| 1 | 1000 | 361.19 | wa |+---------+-------------+-----------+------------+| 2 | 1000 | 176.26 | tx |+---------+-------------+-----------+------------+...* /
下降表格如果存在默認的.loan_risks_upload;創建表格默認的.loan_risks_upload(loan_id長整型數字,funded_amntINT,paid_amnt雙,addr_state字符串);複製成默認的.loan_risks_upload從/ databricks-datasets / learning-spark-v2 /貸款/ loan-risks.snappy.parquet 'FILEFORMAT=拚花;選擇*從默認的.loan_risks_upload;——結果:-- +---------+-------------+-----------+------------+——| loan_id | funded_amnt | paid_amnt | addr_state |-- +=========+=============+===========+============+——| 0 | 1000 | 182.22 | ca |-- +---------+-------------+-----------+------------+——| 1 | 1000 | 361.19 | wa |-- +---------+-------------+-----------+------------+——| 2 | 1000 | 176.26 | tx |-- +---------+-------------+-----------+------------+——……
要清理,運行以下代碼,刪除表:
火花.sql(“刪除表”+table_name)
sql(粘貼(“刪除表”,table_name,9月=""))
火花.sql(“刪除表”+table_name)
下降表格默認的.loan_risks_upload
有關更多示例和詳細信息,請參見
磚運行時7。x,上圖:複製到
Databricks運行時5.5 LTS和6.x:複製到(Delta Lake on Databricks)
自動加載程序
Auto Loader在新數據文件到達雲存儲時可以增量高效地處理它們,無需任何額外設置。Auto Loader提供了一個新的結構化流源稱為cloudFiles
.給定雲文件存儲上的輸入目錄路徑,則cloudFiles
Source在新文件到達時自動處理它們,還可以選擇處理該目錄中的現有文件。
使用自動加載器代替複製到SQL命令當:
您希望從包含數百萬或更高級別文件的文件位置加載數據。自動加載器可以更有效地發現文件比
複製成
SQL命令,可以將文件處理拆分為多個批。您的數據模式經常變化。Auto Loader為模式推斷和演化提供了更好的支持。看到在自動加載器中配置模式推斷和演化.
您不打算加載以前上傳的文件的子集。使用自動加載器,重新處理文件子集可能會更加困難。但是,您可以使用
複製成
在自動加載程序流同時運行時重新加載文件子集的SQL命令。
關於自動加載器的簡要概述和演示,以及複製到SQL命令在這篇文章的前麵,觀看這個YouTube視頻(2分鍾)。
要了解更多Auto Loader的概述和演示,請觀看這個YouTube視頻(59分鍾)。
下麵的代碼示例演示了Auto Loader如何在到達雲存儲時檢測新數據文件。方法中運行示例代碼筆記本附在數據庫上集群.
創建文件上傳目錄,例如:
user_dir=' <我的名字> @ < my-organization.com > 'upload_path=“/ FileStore / shared-uploads /”+user_dir+“/ population_data_upload”dbutils.fs.mkdir(upload_path)
瓦爾user_dir=“<我的名字> @ < my-organization.com > "瓦爾upload_path=“/ FileStore / shared-uploads /”+user_dir+“/ population_data_upload”dbutils.fs.mkdir(upload_path)
方法創建以下示例CSV文件,然後將它們上傳到文件上傳目錄DBFS文件瀏覽器:
WA.csv
:城市,年,人口西雅圖地鐵,2019,3406000西雅圖地鐵,2020,3433000
OR.csv
:波特蘭地鐵,2019年,2127000波特蘭地鐵,2020年,2151000
運行以下代碼啟動Auto Loader。
checkpoint_path=“/ tmp /δ/ population_data / _checkpoints”write_path=“/ tmp /δ/ population_data”設置流開始讀取傳入的文件# upload_path位置。df=火花.readStream.格式(“cloudFiles”)\.選項(“cloudFiles.format”,“csv”)\.選項(“頭”,“真正的”)\.模式(“城市串,年,人口長”)\.負載(upload_path)#啟動流。使用checkpoint_path位置保存所有文件的記錄#已經被上傳到upload_path位置。#對於上次檢查後已經上傳的文件,#將新上傳文件的數據寫入write_path位置。df.writeStream.格式(“δ”)\.選項(“checkpointLocation”,checkpoint_path)\.開始(write_path)
瓦爾checkpoint_path=“/ tmp /δ/ population_data / _checkpoints”瓦爾write_path=“/ tmp /δ/ population_data”//設置流以開始從/ / upload_path位置。瓦爾df=火花.readStream.格式(“cloudFiles”).選項(“cloudFiles.format”,“csv”).選項(“頭”,“真正的”).模式(“城市串,年,人口長”).負載(upload_path)//啟動流。//使用checkpoint_path位置保存所有文件的記錄//已經上傳到upload_path位置。//對於上次檢查後已經上傳的文件,//將新上傳文件的數據寫入write_path位置。df.writeStream.格式(“δ”).選項(“checkpointLocation”,checkpoint_path).開始(write_path)
在第3步的代碼仍在運行的情況下,運行以下代碼查詢寫入目錄中的數據:
df_population=火花.讀.格式(“δ”).負載(write_path)顯示(df_population)“‘結果:+----------------+------+------------+城市,年,人口+================+======+============+西雅圖地鐵| 2019 | 3406000 |+----------------+------+------------+西雅圖地鐵| 2020 | 3433000 |+----------------+------+------------+波特蘭地鐵| 2019 | 2127000 |+----------------+------+------------+波特蘭地鐵| 2020 | 2151000 |+----------------+------+------------+“‘
瓦爾df_population=火花.讀.格式(“δ”).負載(write_path)顯示(df_population)/ *結果:+----------------+------+------------+城市,年,人口+================+======+============+西雅圖地鐵| 2019 | 3406000 |+----------------+------+------------+西雅圖地鐵| 2020 | 3433000 |+----------------+------+------------+波特蘭地鐵| 2019 | 2127000 |+----------------+------+------------+波特蘭地鐵| 2020 | 2151000 |+----------------+------+------------+* /
在第3步中的代碼仍在運行的情況下,創建以下附加的CSV文件,然後使用DBFS文件瀏覽器:
ID.csv
:城市,人口博伊西,2019年,438000博伊西,2020年、447000年
MT.csv
:城市,人口海倫娜,2019年,81653年海倫娜,2020,82590
Misc.csv
:西雅圖地鐵,2021年,3461000波特蘭地鐵,2021年,2174000博伊西,2021年,455000海倫娜,2021年,81653
在第3步的代碼還在運行的情況下,運行以下代碼來查詢寫目錄中的現有數據,以及Auto Loader在上傳目錄中檢測到的文件的新數據,然後寫入寫目錄:
df_population=火花.讀.格式(“δ”).負載(write_path)顯示(df_population)“‘結果:+----------------+------+------------+城市,年,人口+================+======+============+西雅圖地鐵| 2019 | 3406000 |+----------------+------+------------+西雅圖地鐵| 2020 | 3433000 |+----------------+------+------------+|海倫娜| 2019 | 81653 |+----------------+------+------------+|海倫娜| 2020 | 82590 |+----------------+------+------------+|博伊西| 2019 | 438000 |+----------------+------+------------+博伊西| 2020 | 447000 |+----------------+------+------------+波特蘭地鐵| 2019 | 2127000 |+----------------+------+------------+波特蘭地鐵| 2020 | 2151000 |+----------------+------+------------+西雅圖地鐵| 2021 | 3461000+----------------+------+------------+波特蘭地鐵| 2021 | 2174000 |+----------------+------+------------+博伊西| 2021 | 455000 |+----------------+------+------------+|海倫娜| 2021 | 81653 |+----------------+------+------------+“‘
瓦爾df_population=火花.讀.格式(“δ”).負載(write_path)顯示(df_population)/ *結果+----------------+------+------------+城市,年,人口+================+======+============+西雅圖地鐵| 2019 | 3406000 |+----------------+------+------------+西雅圖地鐵| 2020 | 3433000 |+----------------+------+------------+|海倫娜| 2019 | 81653 |+----------------+------+------------+|海倫娜| 2020 | 82590 |+----------------+------+------------+|博伊西| 2019 | 438000 |+----------------+------+------------+博伊西| 2020 | 447000 |+----------------+------+------------+波特蘭地鐵| 2019 | 2127000 |+----------------+------+------------+波特蘭地鐵| 2020 | 2151000 |+----------------+------+------------+西雅圖地鐵| 2021 | 3461000+----------------+------+------------+波特蘭地鐵| 2021 | 2174000 |+----------------+------+------------+博伊西| 2021 | 455000 |+----------------+------+------------+|海倫娜| 2021 | 81653 |+----------------+------+------------+* /
要進行清理,請取消步驟3中的運行代碼,然後運行以下代碼,刪除上傳、檢查點和寫入目錄:
dbutils.fs.rm(write_path,真正的)dbutils.fs.rm(upload_path,真正的)
dbutils.fs.rm(write_path,真正的)dbutils.fs.rm(upload_path,真正的)
有關更多細節,請參見自動加載程序.