使用NullPointerException訪問Redshift失敗

學習如何解決讀取Redshift表時發生的' NullPointerException '錯誤。

寫的亞當Pavlacka

最後發布日期:2022年6月1日

問題

有時當你讀紅移表時:

%scala val original_df = spark.read。格式(“com.databricks.spark.redshift”)。選項(“url”,url)。用戶選項(“用戶”)。選項(“密碼”,密碼)。選項(“查詢”,查詢)。選項(“forward_spark_s3_credentials”,真正的)。選項(“tempdir”、“路徑”)。load ()

Spark作業將拋出一個NullPointerException

org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(UnsafeRowWriter.java:194)

導致

問題來自Spark從Redshift讀取數據的方式。Amazon Redshift數據源使用Redshift的unload格式從Redshift讀取數據:Spark首先發布一個卸載命令Redshift,使其將表的內容轉儲到卸載格式到臨時文件,然後Spark掃描這些臨時文件。這個基於文本的卸載格式默認情況下不區分空字符串和空字符串-兩者在結果文件中都被編碼為空字符串。當spark-redshift以unload格式讀取數據時,沒有足夠的信息讓它判斷輸入是空字符串還是空字符串,目前它隻是認為它是空字符串。

解決方案

在Scala中設置可以為空真正的盡管如此字符串列:

%scala導入org.apache.spark.sql.types。{StructField, StructType, StringType}導入org.apache.spark.sql{DataFrame, SQLContext} def setNullableStateForAllStringColumns(df: DataFrame, nullable: Boolean) = {StructType(df.schema. context)。map {case StructField(c, StringType, _, m) => StructField(c, StringType, nullable = nullable, m) case StructField(c, t, n, m) => StructField(c, t, n, m)})}

在Python中:

%python def set_nullable_for_all_string_columns(df, nullable): from pyspark.sql.types import StructType, StructField, StringType new_schema = StructType([StructField(f.name, f. datatype, nullable, f.metadata) if (isinstance(f. name, f. datatype, null, f.metadata))(f.name, f.dataType, f.nullable, f.metadata) for f in df.schema.fields])返回new_schema

的模式來使用這個函數original_df,然後修改模式以生成所有字符串可以為空,然後重新閱讀紅移:

%scala val df = spark.read。模式(setNullableStateForAllStringColumns (original_df真實))。格式(“com.databricks.spark.redshift”)。選項(“url”,url)。用戶選項(“用戶”)。選項(“密碼”,密碼)。選項(“查詢”,查詢)。選項(“forward_spark_s3_credentials”,真正的)。選項(“tempdir”、“路徑”)。load ()