在使用SQS隊列作為流源時處理分區列值

寫的亞當Pavlacka

最後發布日期:2022年5月18日

問題

如果S3中的數據按分區存儲,則使用分區列值命名源目錄結構中的文件夾。但是,如果使用SQS隊列作為流源,則S3-SQS源無法檢測分區列值。

例如,如果您將以下DataFrame以JSON格式保存到S3:

%scala val df = spark.range(10).withColumn("date",current_date()) df.write. partitionby ("date").json("s3a://bucket-name/json")

下麵的文件結構將是:

%scala s3a://bucket-name/json/_SUCCESS s3a://bucket-name/json/date=2018-10-25/<單個json文件> . data =2018-10-25

假設您從為這個S3桶配置的隊列創建了一個S3- sqs輸入流。如果使用以下代碼直接從S3-SQS輸入流加載數據:

導入org.apache.spark.sql.types。_ val schema = StructType(List(StructField("id", IntegerType, false),StructField("date", DateType, false)))readStream .format("s3-sqs") .option("fileFormat", "json") .option("queueUrl", "https://sqs.us-east-1.amazonaws.com/826763667205/sqs-queue") .option("sqsFetchInterval", "1m") .option("ignoreFileDeletion", true) .schema(schema) .load())

輸出將是:

錯誤的SQS流結果。

您可以看到日期列值沒有正確填充。

解決方案

你可以使用的組合input_file_name ()而且regexp_extract ()udf來正確提取日期值,如下麵的代碼片段所示:

導入org.apache.spark.sql.functions。_ val df = spark。readStream .format("s3-sqs") .option("fileFormat", "json") .option("queueUrl", "https://sqs.us-east-1.amazonaws.com/826763667205/sqs-queue") .option("sqsFetchInterval", fetch_interval) .option("ignoreFileDeletion", true) .schema(schema) .load() display(df.withColumn("date",regexp_extract(input_file_name(), "/date=(\\d{4}-\\d{2}-\\d{2})/", 1)))

現在,您可以在以下輸出中看到date列的正確值:

糾正SQS流結果。