我有150 k小csv文件(~ 50 mb)存儲在S3中我想要下載到三角洲表。
CSV文件都存儲在S3以下結構:
桶/文件夾/ name_00000000_00000100.csv
桶/文件夾/ name_00000100_00000200.csv
這是我使用的代碼來啟動自動加載程序:
# #外部s3 bucket dbutils.fs山。山(f”s3a: / / {access_key}: {encoded_secret_key} @ {aws_bucket_name}”, f“/ mnt / {mount_name}”) # #自動裝卸機功能def autoload_csv (data_source, table_name、checkpoint_path模式):查詢=(火花。readStream .format .option (“cloudFiles (“cloudFiles”)。形式at", "csv") .option("header","true") .option("delimiter", ";") .option("rescuedDataColumn", "_rescue") .schema(schema) .load(data_source) .withColumn("timestamp",col("timestamp").cast(TimestampType())) .writeStream.format("delta") .trigger(once=True) .option("mergeSchema", "true") .option("checkpointLocation", checkpoint_path) .toTable(tableName=table_name) ) return query ## Define schema schema = StructType([ StructField("timestamp", LongType(), True), StructField(“aaa”, LongType(), True), StructField(“bbb”, LongType(), True), StructField(“ccc”, LongType(), True), StructField(“eee”, LongType(), True), StructField(“fff”, LongType(), True), StructField(“ggg”, StringType(), True), ]) ## start script (schema is input_data_path = ‘/input_data table_name = ‘default.input_data’ chkpt_path = '/tmp/input_data/_checkpoints' query = autoload_csv(data_source=input_data_path, table_name=table_name,checkpoint_path=chkpt_path, schema=schema)
需要兩小時八4核心/ 32 gb原始工人現在進口的所有文件。一定是錯的。
我有附加以下圖片:
我怎麼能加快數據導入?
我怎麼能自己進一步調試問題?
編輯:
我隻是試著相同的進口管道相同數量但是小文件(< 1 mb)。我發現它運行更平穩,當我刪除觸發器(一旦= True)。不幸的是這並不是幫助更大的文件。更大的文件自動加載程序初始化流需要永遠。
您正在使用.trigger(一旦= True)所以它將試圖在單個micro-batch處理所有數據。我將推薦使用在多個micro-batches availableNow觸發處理您的數據。然後你可以檢查流度量來檢查你有多少記錄每批處理和多長時間
良好的指針謝謝!在Python中可用的特性還嗎?看文檔看起來隻有在Scala中可用。