我試圖加載使用自動裝卸機鋪文件。下麵是代碼
def autoload_to_table (data_source、source_format table_name checkpoint_path):查詢=(火花。readStream .format .option (“cloudFiles (“cloudFiles”)。形式at', source_format) .schema("VendorID long,tpep_pickup_datetime timestamp, tpep_dropoff_datetime timestamp, passenger_count long, trip_distance long, RateCodeID long, Store_and_fwd_flag string,PULocationID int, DOLocationID long, payment_type long, fare_amount long, extra long, mta_tax long,Tip_amount long, tolls_amount long, improvement_surcharge long, total_amount long, congestion_Surcharge long, airport_fee long ") .option('cloudFiles.schemaLocation', checkpoint_path) .load(data_source) .writeStream .option('checkpointLocation', checkpoint_path) .option('mergeSchema', "true") .table(table_name) ) return query query = autoload_to_table (data_source = "/mnt/landing/nyctaxi", source_format = "parquet", table_name = "yellow_trip_data", checkpoint_path='/tmp/delta/yellowdata/_checkpoints' )
然而,我遇到以下錯誤。我也附加ipython筆記本/
引起的:org.apache.spark。SparkException:工作階段失敗而終止:任務0階段3011.0失敗了4次,最近的失敗:在舞台上失去了任務0.3 3011.0 (TID 11673)(10.139.64.5執行人0):. lang。UnsupportedOperationException:方式org.apache.parquet.column.values.dictionary.PlainValuesDictionary PlainDoubleDictionary美元
在org.apache.parquet.column.Dictionary.decodeToLong (Dictionary.java: 49)
org.apache.spark.sql.execution.datasources.parquet.ParquetDictionary.decodeToLong (ParquetDictionary.java: 54)