教程:使用PySpark DataFrames磚
本文向您展示如何使用Apache火花Python加載和轉換數據(PySpark) DataFrame API在磚。
什麼是DataFrame ?
DataFrame是一個二維標簽數據結構可能不同類型的列。你能想到的DataFrame像電子表格,SQL表或詞典係列對象。Apache火花DataFrames提供一組豐富的功能(選擇列、過濾、連接、聚集),允許您來有效地解決常見的數據分析問題。
Apache火花DataFrames抽象之上的彈性分布數據集(抽樣)。火花DataFrames和火花SQL使用一個統一的規劃和優化引擎,允許您獲得幾乎相同的性能在所有受支持的語言在磚(Python、SQL、Scala和R)。
創建一個DataFrame Python
大多數Apache返回DataFrame火花查詢。這包括閱讀從表,從文件加載數據,操作轉換數據。
您還可以創建一個火花從列表或熊貓DataFrame DataFrame,比如下麵的例子:
進口熊貓作為pd數據=[[1,“伊利亞”),(2,“Teo”),(3,“方”]]pdf=pd。DataFrame(數據,列=(“id”,“名稱”])df1=火花。createDataFrame(pdf)df2=火花。createDataFrame(數據,模式=“id,名稱字符串”)
數據加載到DataFrame從文件
你可以從許多支持加載數據文件格式。下麵的例子使用了一個可用的數據集/ databricks-datasets
目錄,可以從大部分工作區。看到樣本數據集。
df=(火花。讀。格式(“csv”)。選項(“頭”,“真正的”)。選項(“inferSchema”,“真正的”)。負載(“/ databricks-datasets /樣本/ population-vs-price / data_geo.csv”))
結合DataFrames和加入工會
DataFrames使用標準的SQL連接操作語義。加入返回合並後的結果兩個DataFrames基於提供的匹配條件和連接類型。下麵的例子是一個內連接,也就是默認值:
joined_df=df1。加入(df2,如何=“內心”,在=“id”)
您可以添加的行DataFrame到另一個使用union操作,如以下示例:
unioned_df=df1。聯盟(df2)
過濾器DataFrame行
您可以過濾DataFrame使用行.filter ()
或其中()
。沒有區別在性能或語法,見下麵的例子:
filtered_df=df。過濾器(“id > 1”)filtered_df=df。在哪裏(“id > 1”)
使用過濾選擇一個子集的行返回DataFrame或修改。
從一個DataFrame選擇列
您可以選擇通過一個或多個列列名稱.select ()
,比如下麵的例子:
select_df=df。選擇(“id”,“名稱”)
您可以組合選擇和過濾查詢來限製返回的行和列。
subset_df=df。過濾器(“id > 1”)。選擇(“名稱”)
打印數據模式
火花使用術語模式引用列的名稱和數據類型DataFrame。
請注意
磚也使用術語“模式”來描述表登記目錄的集合。
你可以打印使用的模式.printSchema ()
方法,如以下示例:
df。printSchema()
寫一個DataFrame文件的集合
大多數火花應用程序被設計成工作在大型數據集和工作在一個分布式的方式,並引發寫出一個目錄的文件,而不是單個文件。許多數據係統配置為讀取這些文件的目錄。磚推薦使用的表在filepaths對於大多數應用程序。
下麵的示例JSON文件的保存目錄:
df。寫。格式(“json”)。保存(“/ tmp / json_data”)
在PySpark運行SQL查詢
火花DataFrames提供一個選項來將SQL與Python。
的selectExpr ()
方法允許您指定每一列作為一個SQL查詢,比如下麵的例子:
顯示(df。selectExpr(“id”,“上(名稱)作為big_name”))
您可以導入expr ()
函數pyspark.sql.functions
使用SQL語法一列將被指定的任何地方,如以下示例:
從pyspark.sql.functions進口expr顯示(df。選擇(“id”,expr(“低(名稱)作為little_name”)))
您還可以使用spark.sql ()
在Python內核運行任意SQL查詢,如以下示例:
query_df=火花。sql(“SELECT * FROM <表名稱>”)
因為邏輯是在Python執行內核和所有SQL查詢是作為string傳遞的,您可以使用Python格式化參數化SQL查詢,如以下示例:
table_name=“my_table”query_df=火花。sql(f“SELECT *{table_name}”)