取消
顯示的結果
而不是尋找
你的意思是:

結構化流

sudhanshu1
新的貢獻者三世

我需要一些解決以下問題。

我們組json文件不斷aws s3,這些文件包含一個屬性的詳細信息。請注意1在這個json文件屬性可以有10 - 12行。附件是示例json文件。

我們需要閱讀這些文件流,然後我們需要創建壓實視圖的屬性,這意味著所有財產和行合並,創建一個單一的行/屬性。一旦完成,我們可以寫出,δ/ documentDB / DynamoDB流。

我試過下麵

def mount_s3_bucket (access_key secret_key、bucket_name mount_folder):

ACCESS_KEY_ID = access_key

SECRET_ACCESS_KEY = secret_key

ENCODED_SECRET_KEY = SECRET_ACCESS_KEY。替換(“/”、“% 2 f”)

打印(“安裝”,bucket_name)

試一試:

#卸載數據,以防它已經安裝。

dbutils.fs。卸載(/ mnt / % s % mount_folder)

除了:

#如果不能卸載它最有可能沒有安裝在第一個地方

打印(“目錄不卸載:”,mount_folder)

最後:

#最後,我們鬥山。

dbutils.fs。山(“s3a: / / % s: % s@ % s % (ACCESS_KEY_ID, ENCODED_SECRET_KEY bucket_name)“/ mnt / % s”% mount_folder)

# dbutils.fs。山(“s3a: / /”+ ACCESS_KEY_ID +”:“+ ENCODED_SECRET_KEY +“@”+ bucket_name mount_folder)

print (bucket_name“鬥”,“安裝”,mount_folder,“\ n”)

#設置AWS編程訪問憑證

從pyspark。sql進口SparkSession

從pyspark.sql。功能導入*

從pyspark.sql。導入類型*

火花= SparkSession.builder.appName (Comparis-data-stream-app) .getOrCreate ()

打印(會話創建的)

# JSONschema = StructType ([

# StructField(“用戶名”,StringType(),真的),

# StructField(“貨幣”,StringType(),真的),

# StructField(“量”,LongType(),真的),

#))

JSONschema = StructType ([

StructField (“id”, StringType(),假),

StructField (“address1 StringType(),真的,也沒有),

StructField (“address2 StringType(),真的,也沒有),

StructField(“城”,StringType(),真的,也沒有),

StructField(“狀態”,StringType(),真的,沒有),

StructField (“postalCode StringType(),真的,沒有),

StructField(“價格”,IntegerType(),真的,也沒有),

StructField(“尺寸”,IntegerType(),真的,沒有),

StructField(“房間”,IntegerType(),真的,也沒有),

StructField(“緯度”,DecimalType(),真的,沒有),

StructField(“液化天然氣”,DecimalType(),真的,沒有),

StructField (“hash_lat_lng StringType(),真的,也沒有),

StructField(“源”,StringType(),真的,也沒有),

StructField (“source_premium StringType(),真的,也沒有),

StructField(“時間戳”,TimestampType(),真的,也沒有),

StructField(“屬性”,StructType ([

StructField(“類型”,StringType ()),

StructField(“提升”,StringType ()),

StructField(“花園”,StringType ()),

StructField(“加熱”,StringType ()),

StructField (“washing_machine StringType ()),

StructField(“地板”,StringType ()),

StructField (“year_of_construction IntegerType ())

)))))

ds = (spark.readStream

. schema (JSONschema)

.format (json)

.option (“maxFilesPerTrigger”, 1)

.load (“/ mnt / raj-zuk-comparis-poc / * . json消息”))

flattened_df = (ds。withColumn (“property_type expr (“attributes.type”))

.withColumn (“property_lift expr (“attributes.lift”))

.withColumn (“property_garden expr (“attributes.garden”))

.withColumn (“property_heating expr (“attributes.heating”))

.withColumn (“property_washing_machine expr (“attributes.washing_machine”))

.withColumn (“property_floor expr (“attributes.floor”))

.withColumn (“property_year_of_construction expr (“attributes.year_of_construction”))

)

tumbling_df = (df_a44ed14d77a096c5197933f1e02b7a47

.groupBy(窗口(坳(“時間戳”)、“4小時”),(“源”)上校,上校(“hash_lat_lng”))

.agg (max(“時間戳”).alias(“時間戳”),

第一個(“address1”) .alias (“address1”),

第一個(“address2”) .alias (“address2”),

第一個(“城”).alias(“城市”),

第一個(“狀態”).alias(“狀態”),

第一個(“postalCode”) .alias (“postalCode”),

第一個(“價格”).alias(“價格”),

第一個(“大小”).alias(“大小”),

第一個(“房間”).alias(“房間”),

第一個(“property_type”) .alias (“property_type”),

第一個(“property_lift”) .alias (“property_lift”),

第一個(“property_garden”) .alias (“property_garden”),

第一個(“property_heating”) .alias (“property_heating”),

第一個(“property_washing_machine”) .alias (“property_washing_machine”),

第一個(“property_floor”) .alias (“property_floor”),

第一個(“property_year_of_construction”) .alias (“property_year_of_construction”),

第一個(“source_premium”) .alias (“source_premium”))

.orderBy(坳(“window.start”))

)

這之後我不知道如何寫這個嗎?你能建議什麼應該編寫或解決這個問題的正確方法嗎?

0回答0
歡迎來到磚社區:讓學習、網絡和一起慶祝

加入我們的快速增長的數據專業人員和專家的80 k +社區成員,準備發現,幫助和合作而做出有意義的聯係。

點擊在這裏注冊今天,加入!

參與令人興奮的技術討論,加入一個組與你的同事和滿足我們的成員。

Baidu
map