DataFrames介紹- Scala
本文演示了一些使用Scala的常見Spark DataFrame函數。
創建DataFrames
//為域創建case類情況下類部門(id:字符串,的名字:字符串)情況下類員工(firstName:字符串,姓:字符串,電子郵件:字符串,工資:Int)情況下類DepartmentWithEmployees(部門:部門,員工:Seq[員工])//創建部門瓦爾department1=新部門(“123456”,“計算機科學”)瓦爾department2=新部門(“789012”,“機械工程”)瓦爾department3=新部門(“345678”,“戲劇和戲劇”)瓦爾department4=新部門(“901234”,“室內娛樂”)//創建員工瓦爾employee1=新員工(“邁克爾”,“時常要”,“no-reply@berkeley.edu”,100000)瓦爾employee2=新員工(“xiangrui”,“孟”,“no-reply@stanford.edu”,120000)瓦爾employee3=新員工(“馬泰”,零,“no-reply@waterloo.edu”,140000)瓦爾employee4=新員工(零,“溫德爾”,“no-reply@princeton.edu”,160000)瓦爾employee5=新員工(“邁克爾”,“傑克遜”,“no-reply@neverla.nd”,80000)//從Departments和Employees創建departmentwiemployees實例瓦爾departmentWithEmployees1=新DepartmentWithEmployees(department1,Seq(employee1,employee2))瓦爾departmentWithEmployees2=新DepartmentWithEmployees(department2,Seq(employee3,employee4))瓦爾departmentWithEmployees3=新DepartmentWithEmployees(department3,Seq(employee5,employee4))瓦爾departmentWithEmployees4=新DepartmentWithEmployees(department4,Seq(employee2,employee3))
使用DataFrames
將聯合的DataFrame寫入Parquet文件
//如果文件存在,刪除它dbutils。fs。rm(“/ tmp / databricks-df-example.parquet”,真正的)unionDF。寫。格式(“鋪”).保存(“/ tmp / databricks-df-example.parquet”)
爆炸員工列
進口org。apache。火花。sql。功能。_瓦爾explodeDF=parquetDF。選擇(爆炸($“員工”))顯示(explodeDF)
將employee類的字段平鋪成列
瓦爾flattenDF=explodeDF。選擇($“上校*”)flattenDF。顯示()
+---------+--------+--------------------+------+|firstName|姓|電子郵件|工資|+---------+--------+--------------------+------+|馬泰|零|沒有-回複@waterloo…|140000||零|溫德爾|沒有-回複@princeto…|160000||邁克爾|時常要|沒有-回複@berkeley…|100000||xiangrui|孟|沒有-回複@stanford…|120000||邁克爾|傑克遜|沒有-回複@neverla。nd|80000||零|溫德爾|沒有-回複@princeto…|160000||xiangrui|孟|沒有-回複@stanford…|120000||馬泰|零|沒有-回複@waterloo…|140000|+---------+--------+--------------------+------+
使用filter ()
返回與謂詞匹配的行
瓦爾filterDF=flattenDF。過濾器($“firstName”= = =“xiangrui”||$“firstName”= = =“邁克爾”)。排序($“姓”。asc)顯示(filterDF)
的在()
從句相當於filter ()
瓦爾whereDF=flattenDF。在哪裏($“firstName”= = =“xiangrui”||$“firstName”= = =“邁克爾”)。排序($“姓”。asc)顯示(whereDF)
檢索缺少firstName或lastName的行
瓦爾filterNonNullDF=nonNullDF。過濾器($“firstName”= = =”——“||$“姓”= = =”——“).排序($“電子郵件”。asc)顯示(filterNonNullDF)
示例聚合使用gg ()
而且countDistinct ()
//查找每個名字的不同姓氏瓦爾countDistinctDF=nonNullDF。選擇($“firstName”,$“姓”)。groupBy($“firstName”)。gg(countDistinct($“姓”)作為“distinct_last_names”)顯示(countDistinctDF)
常見問題(FAQ)
本FAQ介紹了使用可用api的常見用例和示例用法。有關更詳細的API描述,請參見DataFrameReader而且DataFrameWriter文檔。
如何使用DataFrame udf獲得更好的性能?
如果該功能存在於可用的內置函數中,那麼使用這些函數將會執行得更好。
我們使用內置函數和withColumn ()
添加新列的API。我們也可以用withColumnRenamed ()
在轉換後替換現有的列。
進口org。apache。火花。sql。功能。_進口org。apache。火花。sql。類型。_進口org。apache。火花。sql。_進口org。apache。hadoop。io。LongWritable進口org。apache。hadoop。io。文本進口org。apache。hadoop。相依。配置進口org。apache。hadoop。mapreduce。自由。輸入。TextInputFormat//構建一個示例DataFrame數據集。dbutils。fs。rm(“/ tmp / dataframe_sample.csv”,真正的)dbutils。fs。把(“/ tmp / dataframe_sample.csv”,"""id | end_date | start_date |位置1 | 2015-10-14就是| 2015-09-14就是| CA-SF2 | 2015-10-15 01:00:20 | 2015-08-14就是| CA-SD3 | 2015-10-16 02:30:00 | 2015-01-14就是| NY-NY4 | 2015-10-17 03:00:20 | 2015-02-14就是| NY-NY5 | 2015-10-18 04:30:00 | 2014-04-14就是|可以""",真正的)瓦爾相依=新配置相依。集(“textinputformat.record.delimiter”,“\ n”)瓦爾抽樣=sc。newAPIHadoopFile(“/ tmp / dataframe_sample.csv”,classOf[TextInputFormat],classOf[LongWritable],classOf[文本],相依).地圖(_。_2。toString).過濾器(_。非空的)瓦爾頭=抽樣。第一個()//解析標題行瓦爾rdd_noheader=抽樣。過濾器(x= >!x。包含(“id”))//將RDD[String]轉換為RDD[Rows]。使用分隔符創建一個數組,並使用Row.fromSeq()瓦爾row_rdd=rdd_noheader。地圖(x= >x。分裂(“|”))。地圖(x= >行。fromSeq(x))瓦爾df_schema=StructType(頭。分裂(“|”).地圖(字段名= >StructField(字段名,StringType,真正的)))vardf=火花。createDataFrame(row_rdd,df_schema)df。printSchema
//調用內置函數對列執行操作,而不是注冊UDF。//這將提供性能改進,因為內置程序在平台的JVM中編譯和運行。Beplay体育安卓版本//轉換為Date類型瓦爾timestamp2datetype:(列)= >列=(x)= >{to_date(x)}df=df。withColumn(“日期”,timestamp2datetype(上校(“end_date”)))//隻解析日期瓦爾timestamp2date:(列)= >列=(x)= >{regexp_replace(x,”(\ \ d +) [:] (\ \ d +) [:] (\ \ d +)。* $”,"")}df=df。withColumn(“date_only”,timestamp2date(上校(“end_date”)))//拆分字符串,索引字段瓦爾parse_city:(列)= >列=(x)= >{分裂(x,“-”) (1)}df=df。withColumn(“城市”,parse_city(上校(“位置”)))//執行日期差函數瓦爾dateDiff:(列,列)= >列=(x,y)= >{datediff(to_date(y),to_date(x))}df=df。withColumn(“date_diff”,dateDiff(上校(“start_date”),上校(“end_date”)))
df。createOrReplaceTempView(“sample_df”)顯示(sql("select * from sample_df"))
我想把DataFrame轉換回JSON字符串發送回Kafka。
有一個toJSON ()
函數,該函數使用列名和模式來生成JSON記錄,返回JSON字符串的RDD。
瓦爾rdd_json=df。toJSONrdd_json。取(2).foreach(println)
我的UDF接受一個參數,包括要操作的列。我如何傳遞這個參數?
有一個函數被調用點燃()
這會創建一個靜態列。
瓦爾add_n=udf((x:整數,y:整數)= >x+y)//注冊一個UDF,它向DataFrame添加一個列,並將id列轉換為Integer類型。df=df。withColumn(“id_offset”,add_n(點燃(1000),上校(“id”).投(“int”)))顯示(df)
瓦爾last_n_days=udf((x:整數,y:整數)= >{如果(x<y)真正的其他的假})//last_n_days = udf(lambda x, y:如果x < y則為真,否則為假,boolean ())瓦爾df_filtered=df。過濾器(last_n_days(上校(“date_diff”),點燃(90)))顯示(df_filtered)
我有一個表在Hive metastore和我想訪問表作為一個DataFrame。定義它最好的方法是什麼?
有多種方法可以從注冊表中定義DataFrame。調用表(表)
或使用SQL查詢選擇和過濾特定的列:
//都返回DataFrame類型瓦爾df_1=表格(“sample_df”)瓦爾df_2=火花。sql("select * from sample_df")
我想清除當前集群上的所有緩存表。
有一個API可以在全局或每個表級別執行此操作。
火花。目錄。clearCache()火花。目錄。cacheTable(“sample_df”)火花。目錄。uncacheTable(“sample_df”)
我想計算列的總和。做這件事最好的方法是什麼?
有一個API叫gg (* exprs)
它接受要計算的聚合類型的列名稱和表達式的列表。您可以利用上麵提到的內置函數作為每個列表達式的一部分。
//根據位置列提供min, count, avg和group。結果還varagg_df=df。groupBy(“位置”).gg(最小值(“id”),數(“id”),avg(“date_diff”))顯示(agg_df)
我想將DataFrames寫出來到Parquet,但是想在特定的列上進行分區。
您可以使用以下api來完成此任務。確保代碼不會使用數據集創建大量的分區列,否則元數據的開銷會導致顯著的速度下降。如果此目錄後麵有一個SQL表,則需要調用刷新表格<表名稱>
在查詢之前更新元數據。
df=df。withColumn(“end_month”,月(上校(“end_date”)))df=df。withColumn(“end_year”,一年(上校(“end_date”)))dbutils。fs。rm(“/ tmp / sample_table”,真正的)df。寫。partitionBy(“end_year”,“end_month”).格式(“鋪”).保存(“/ tmp / sample_table”)顯示(dbutils。fs。ls(“/ tmp / sample_table”))
如何正確處理想要過濾掉NULL數據的情況?
您可以使用filter ()
並提供與SQL查詢類似的語法。
瓦爾null_item_schema=StructType(數組(StructField(“col1”,StringType,真正的),StructField(“col2”,IntegerType,真正的)))瓦爾null_dataset=sc。並行化(數組((“測試”,1),(零,2)))。地圖(x= >行。fromTuple(x))瓦爾null_df=火花。createDataFrame(null_dataset,null_item_schema)顯示(null_df。過濾器("col1不是NULL"))
如何使用csv
orgydF4y2Baspark-avro
庫嗎?
有一個inferSchema
選擇標記。提供標題允許您對列進行適當的命名。
瓦爾adult_df=火花。讀。格式(“csv”).選項(“頭”,“假”).選項(“inferSchema”,“真正的”).負載(“dbfs: / databricks-datasets /成人/ adult.data”)adult_df。printSchema()
您有一個分隔字符串數據集,希望將其轉換為數據類型。你會如何做到這一點?
使用RDD api過濾掉不規範的行,並將值映射到適當的類型。