DataFrames介紹- Scala

本文演示了一些使用Scala的常見Spark DataFrame函數。

創建DataFrames

//為域創建case類情況下部門id:字符串,的名字:字符串情況下員工firstName:字符串,:字符串,電子郵件:字符串,工資:Int情況下DepartmentWithEmployees部門:部門,員工:Seq員工])//創建部門瓦爾department1部門“123456”,“計算機科學”瓦爾department2部門“789012”,“機械工程”瓦爾department3部門“345678”,“戲劇和戲劇”瓦爾department4部門“901234”,“室內娛樂”//創建員工瓦爾employee1員工“邁克爾”,“時常要”,“no-reply@berkeley.edu”,100000瓦爾employee2員工“xiangrui”,“孟”,“no-reply@stanford.edu”,120000瓦爾employee3員工“馬泰”,,“no-reply@waterloo.edu”,140000瓦爾employee4員工,“溫德爾”,“no-reply@princeton.edu”,160000瓦爾employee5員工“邁克爾”,“傑克遜”,“no-reply@neverla.nd”,80000//從Departments和Employees創建departmentwiemployees實例瓦爾departmentWithEmployees1DepartmentWithEmployeesdepartment1,Seqemployee1,employee2))瓦爾departmentWithEmployees2DepartmentWithEmployeesdepartment2,Seqemployee3,employee4))瓦爾departmentWithEmployees3DepartmentWithEmployeesdepartment3,Seqemployee5,employee4))瓦爾departmentWithEmployees4DepartmentWithEmployeesdepartment4,Seqemployee2,employee3))

從案例類的列表中創建DataFrames

瓦爾departmentsWithEmployeesSeq1SeqdepartmentWithEmployees1,departmentWithEmployees2瓦爾df1departmentsWithEmployeesSeq1toDF()顯示df1瓦爾departmentsWithEmployeesSeq2SeqdepartmentWithEmployees3,departmentWithEmployees4瓦爾df2departmentsWithEmployeesSeq2toDF()顯示df2

使用DataFrames

聯盟兩個DataFrames

瓦爾unionDFdf1聯盟df2顯示unionDF

將聯合的DataFrame寫入Parquet文件

//如果文件存在,刪除它dbutilsfsrm“/ tmp / databricks-df-example.parquet”,真正的unionDF格式“鋪”).保存“/ tmp / databricks-df-example.parquet”

從Parquet文件中讀取DataFrame

瓦爾parquetDF火花格式“鋪”).負載“/ tmp / databricks-df-example.parquet”

爆炸員工列

進口orgapache火花sql功能_瓦爾explodeDFparquetDF選擇爆炸“員工”))顯示explodeDF

將employee類的字段平鋪成列

瓦爾flattenDFexplodeDF選擇“上校*”flattenDF顯示()
+---------+--------+--------------------+------+|firstName||電子郵件|工資|+---------+--------+--------------------+------+|馬泰||沒有-回複@waterloo…|140000|||溫德爾|沒有-回複@princeto…|160000||邁克爾|時常要|沒有-回複@berkeley…|100000||xiangrui||沒有-回複@stanford…|120000||邁克爾|傑克遜|沒有-回複@neverland|80000|||溫德爾|沒有-回複@princeto…|160000||xiangrui||沒有-回複@stanford…|120000||馬泰||沒有-回複@waterloo…|140000|+---------+--------+--------------------+------+

使用filter ()返回與謂詞匹配的行

瓦爾filterDFflattenDF過濾器“firstName”= = =“xiangrui”||“firstName”= = =“邁克爾”排序“姓”asc顯示filterDF

在()從句相當於filter ()

瓦爾whereDFflattenDF在哪裏“firstName”= = =“xiangrui”||“firstName”= = =“邁克爾”排序“姓”asc顯示whereDF

取代值和--使用DataFrame Na函數

瓦爾nonNullDFflattenDFna填滿”——“顯示nonNullDF

檢索缺少firstName或lastName的行

瓦爾filterNonNullDFnonNullDF過濾器“firstName”= = =”——“||“姓”= = =”——“).排序“電子郵件”asc顯示filterNonNullDF

示例聚合使用gg ()而且countDistinct ()

//查找每個名字的不同姓氏瓦爾countDistinctDFnonNullDF選擇“firstName”,“姓”groupBy“firstName”ggcountDistinct“姓”作為“distinct_last_names”顯示countDistinctDF

比較DataFrame和SQL查詢物理計劃

提示

它們應該是一樣的。

countDistinctDF解釋()
//將DataFrame注冊為臨時視圖,這樣我們就可以使用SQL查詢它nonNullDFcreateOrReplaceTempView“databricks_df_example”火花sql"""SELECT firstName, count(distinct lastName)為distinct_last_names從databricks_df_example集團的名字""").解釋

把所有的工資加起來

瓦爾salarySumDFnonNullDFgg“工資”->“和”顯示salarySumDF

清理:刪除Parquet文件

dbutilsfsrm“/ tmp / databricks-df-example.parquet”,真正的

常見問題(FAQ)

本FAQ介紹了使用可用api的常見用例和示例用法。有關更詳細的API描述,請參見DataFrameReader而且DataFrameWriter文檔。

如何使用DataFrame udf獲得更好的性能?

如果該功能存在於可用的內置函數中,那麼使用這些函數將會執行得更好。

我們使用內置函數和withColumn ()添加新列的API。我們也可以用withColumnRenamed ()在轉換後替換現有的列。

進口orgapache火花sql功能_進口orgapache火花sql類型_進口orgapache火花sql_進口orgapachehadoopioLongWritable進口orgapachehadoopio文本進口orgapachehadoop相依配置進口orgapachehadoopmapreduce自由輸入TextInputFormat//構建一個示例DataFrame數據集。dbutilsfsrm“/ tmp / dataframe_sample.csv”,真正的dbutilsfs“/ tmp / dataframe_sample.csv”,"""id | end_date | start_date |位置1 | 2015-10-14就是| 2015-09-14就是| CA-SF2 | 2015-10-15 01:00:20 | 2015-08-14就是| CA-SD3 | 2015-10-16 02:30:00 | 2015-01-14就是| NY-NY4 | 2015-10-17 03:00:20 | 2015-02-14就是| NY-NY5 | 2015-10-18 04:30:00 | 2014-04-14就是|可以""",真正的瓦爾相依配置相依“textinputformat.record.delimiter”,“\ n”瓦爾抽樣scnewAPIHadoopFile“/ tmp / dataframe_sample.csv”,classOfTextInputFormat],classOfLongWritable],classOf文本],相依).地圖__2toString).過濾器_非空的瓦爾抽樣第一個()//解析標題行瓦爾rdd_noheader抽樣過濾器x= >!x包含“id”))//將RDD[String]轉換為RDD[Rows]。使用分隔符創建一個數組,並使用Row.fromSeq()瓦爾row_rddrdd_noheader地圖x= >x分裂“|”))。地圖x= >fromSeqx))瓦爾df_schemaStructType分裂“|”).地圖字段名= >StructField字段名,StringType,真正的)))vardf火花createDataFramerow_rdd,df_schemadfprintSchema
//調用內置函數對列執行操作,而不是注冊UDF。//這將提供性能改進,因為內置程序在平台的JVM中編譯和運行。Beplay体育安卓版本//轉換為Date類型瓦爾timestamp2datetype:= >x= >to_datex}dfdfwithColumn“日期”,timestamp2datetype上校“end_date”)))//隻解析日期瓦爾timestamp2date:= >x= >regexp_replacex,”(\ \ d +) [:] (\ \ d +) [:] (\ \ d +)。* $”,""}dfdfwithColumn“date_only”,timestamp2date上校“end_date”)))//拆分字符串,索引字段瓦爾parse_city:= >x= >分裂x,“-”) (1}dfdfwithColumn“城市”,parse_city上校“位置”)))//執行日期差函數瓦爾dateDiff:,= >x,y= >datediffto_datey),to_datex))}dfdfwithColumn“date_diff”,dateDiff上校“start_date”),上校“end_date”)))
dfcreateOrReplaceTempView“sample_df”顯示sql"select * from sample_df"))

我想把DataFrame轉換回JSON字符串發送回Kafka。

有一個toJSON ()函數,該函數使用列名和模式來生成JSON記錄,返回JSON字符串的RDD。

瓦爾rdd_jsondftoJSONrdd_json2).foreachprintln

我的UDF接受一個參數,包括要操作的列。我如何傳遞這個參數?

有一個函數被調用點燃()這會創建一個靜態列。

瓦爾add_nudf((x:整數,y:整數= >x+y//注冊一個UDF,它向DataFrame添加一個列,並將id列轉換為Integer類型。dfdfwithColumn“id_offset”,add_n點燃1000),上校“id”).“int”)))顯示df
瓦爾last_n_daysudf((x:整數,y:整數= >如果x<y真正的其他的})//last_n_days = udf(lambda x, y:如果x < y則為真,否則為假,boolean ())瓦爾df_filtereddf過濾器last_n_days上校“date_diff”),點燃90)))顯示df_filtered

我有一個表在Hive metastore和我想訪問表作為一個DataFrame。定義它最好的方法是什麼?

有多種方法可以從注冊表中定義DataFrame。調用表(表)或使用SQL查詢選擇和過濾特定的列:

//都返回DataFrame類型瓦爾df_1表格“sample_df”瓦爾df_2火花sql"select * from sample_df"

我想清除當前集群上的所有緩存表。

有一個API可以在全局或每個表級別執行此操作。

火花目錄clearCache()火花目錄cacheTable“sample_df”火花目錄uncacheTable“sample_df”

我想計算列的總和。做這件事最好的方法是什麼?

有一個API叫gg (* exprs)它接受要計算的聚合類型的列名稱和表達式的列表。您可以利用上麵提到的內置函數作為每個列表達式的一部分。

//根據位置列提供min, count, avg和group。結果還varagg_dfdfgroupBy“位置”).gg最小值“id”),“id”),avg“date_diff”))顯示agg_df

我想將DataFrames寫出來到Parquet,但是想在特定的列上進行分區。

您可以使用以下api來完成此任務。確保代碼不會使用數據集創建大量的分區列,否則元數據的開銷會導致顯著的速度下降。如果此目錄後麵有一個SQL表,則需要調用刷新表格<表名稱>在查詢之前更新元數據。

dfdfwithColumn“end_month”,上校“end_date”)))dfdfwithColumn“end_year”,一年上校“end_date”)))dbutilsfsrm“/ tmp / sample_table”,真正的dfpartitionBy“end_year”,“end_month”).格式“鋪”).保存“/ tmp / sample_table”顯示dbutilsfsls“/ tmp / sample_table”))

如何正確處理想要過濾掉NULL數據的情況?

您可以使用filter ()並提供與SQL查詢類似的語法。

瓦爾null_item_schemaStructType數組StructField“col1”,StringType,真正的),StructField“col2”,IntegerType,真正的)))瓦爾null_datasetsc並行化數組((“測試”,1),,2)))。地圖x= >fromTuplex))瓦爾null_df火花createDataFramenull_dataset,null_item_schema顯示null_df過濾器"col1不是NULL"))

如何使用csvorgydF4y2Baspark-avro庫嗎?

有一個inferSchema選擇標記。提供標題允許您對列進行適當的命名。

瓦爾adult_df火花格式“csv”).選項“頭”,“假”).選項“inferSchema”,“真正的”).負載“dbfs: / databricks-datasets /成人/ adult.data”adult_dfprintSchema()

您有一個分隔字符串數據集,希望將其轉換為數據類型。你會如何做到這一點?

使用RDD api過濾掉不規範的行,並將值映射到適當的類型。