從JSON字符串或Python字典創建一個DataFrame

從包含JSON字符串或Python字典的變量中創建一個Apache Spark DataFrame。

寫的ram.sankarasubramanian

最後發布時間:2022年7月1日

在本文中,我們將回顧如何從包含JSON字符串或Python字典的變量創建Apache Spark DataFrame。

從JSON字符串創建一個Spark DataFrame

  1. 將變量中的JSON內容添加到列表中。
    %scala import scala.collection.mutable.ListBuffer val json_content1 = "{'json_col1': 'hello', 'json_col2': ' 32}" val json_content2 = "{'json_col1': 'hello', 'json_col2': 'world'}" var json_seq = new ListBuffer[String]() json_seq += json_content1 json_seq += json_content2
  2. 從列表中創建一個Spark數據集。
    %scala val json_ds = json_seq.toDS()
  3. 使用spark.read.json來解析Spark數據集。
    %scala val df= spark.read.json(json_ds) display(df)

組合樣例代碼

這些示例代碼塊將前麵的步驟組合為單獨的示例。Python和Scala示例執行相同的任務。

%python json_content1 = "{'json_col1': 'hello', 'json_col2': ' 32}" json_content2 = "{'json_col1': 'hello', 'json_col2': 'world'}" json_list = [] json_list.append(json_content1) json_list.append(json_content2) df = spark.read.json(sc.parallelize(json_list)) display(df)
%scala import scala.collection.mutable.ListBuffer val json_content1 = "{'json_col1': 'hello', 'json_col2': ' 32}" val json_content2 = "{'json_col1': 'hello', 'json_col2': 'world'}" var json_seq = new ListBuffer[String]() json_seq += json_content1 json_seq += json_content2 val json_ds = json_seq. tods () val df= spark.read.json(json_ds) display(df)

從DataFrame中提取帶有JSON數據的字符串列並解析它

  1. 從DataFrame中選擇JSON列並將其轉換為RDD類型抽樣(行)
    %scala導入org.apache.spark.sql.functions。_ val test_df = Seq((“1”,“{“json_col1”:“你好”,“json_col2”:32}”,“1.0”),(“1”,“{json_col1:‘你好’,‘json_col2’:‘世界’}”,“1.0”))。toDF("row_number", "json", "token") val row_rdd = test_df.select(col("json"))。rdd //隻選擇JSON列並將其轉換為rdd。
  2. 轉換抽樣(行)抽樣(字符串)
    %scala val string_rdd = row_rdd.map(_.mkString(","))
  3. 使用spark.read.json來解析抽樣(字符串)
    %scala val df1= spark.read.json(string_rdd) display(df1)

組合樣例代碼

這個示例代碼塊將前麵的步驟組合成一個示例。

%scala導入org.apache.spark.sql.functions。_ val test_df = Seq((“1”,“{“json_col1”:“你好”,“json_col2”:32}”,“1.0”),(“1”,“{json_col1:‘你好’,‘json_col2’:‘世界’}”,“1.0”))。toDF("row_number", "json", "token") val row_rdd = test_df.select(col("json"))。rdd val string_rdd = row_rdd.map(_.mkString(",")) val df1= spark.read.json(string_rdd) display(df1)

從Python字典創建一個Spark DataFrame

  1. 檢查數據類型並確認它是字典類型。
    %python jsonDataDict = {"job_id":33100,"run_id":1048560,"number_in_job":1,"state":{"life_cycle_state":"PENDING","state_message":"Waiting for .集群”},“任務”:{" notebook_task ":{“notebook_path”:“/用戶/ user@www.eheci.com/path/test_notebook”}},“cluster_spec ": {" new_cluster ":{“spark_version”:“4.3.x-scala2.11”,“屬性”:{“類型”:“fixed_node”、“記憶”:“8 g”},“enable_elastic_disk”:“false”,“num_workers”:1}},“cluster_instance ":{“cluster_id”:“0000 - 000000 wares10”},“start_time”:1584689872601,“setup_duration”:0,”execution_duration”:0,”cleanup_duration”:0,”creator_user_name”:“user@www.eheci.com”、“run_name”:“我的測試工作”、“run_page_url”:“https://testurl.www.eheci.com作業/ 33100 /運行/ 1”、“run_type”:“SUBMIT_RUN“}類型(jsonDataDict)
  2. 使用json.dumps將Python字典轉換為JSON字符串。
    %python import json jsonData = json.dumps(jsonDataDict)
  3. 將JSON內容添加到列表中。
    %python jsonDataList = [] jsonDataList.append(jsonData)
  4. 將列表轉換為RDD並使用spark.read.json
    %python jsonRDD = sc.parallelize(jsonDataList) df = spark.read.json(jsonRDD) display(df)

組合樣例代碼

這些示例代碼塊將前麵的步驟組合成一個示例。

%python jsonDataDict = {"job_id":33100,"run_id":1048560,"number_in_job":1,"state":{"life_cycle_state":"PENDING","state_message":"Waiting for .集群”},“任務”:{" notebook_task ":{“notebook_path”:“/用戶/ user@www.eheci.com/path/test_notebook”}},“cluster_spec ": {" new_cluster ":{“spark_version”:“4.3.x-scala2.11”,“屬性”:{“類型”:“fixed_node”、“記憶”:“8 g”},“enable_elastic_disk”:“false”,“num_workers”:1}},“cluster_instance ":{“cluster_id”:“0000 - 000000 wares10”},“start_time”:1584689872601,“setup_duration”:0,”execution_duration”:0,”cleanup_duration”:0,”creator_user_name”:“user@www.eheci.com”、“run_name”:“我的測試job","run_page_url":"https://testurl.www.eheci.com#job/33100/run/1","run_type":"SUBMIT_RUN"} type(jsonDataDict) import json jsonData = json.dumps(jsonDataDict) jsonDataList = [] jsonData .append(jsonData) jsonRDD = sc.parallelize(jsonDataList) df = spark.read.json(jsonRDD) display(df)

例如筆記本電腦

檢查解析JSON字符串或Python字典示例筆記本