你好,
我有一個表在MongoDB阿特拉斯,我試圖不斷讀到內存,然後將最終寫文件。然而,當我看內存中的表沒有正確的模式。
代碼:
從pyspark.sql。類型進口StructType、LongType StringType IntegerType
從pyspark進口SparkContext
從pyspark。流進口StreamingContext
從pyspark。sql進口SparkSession
從pyspark.sql。功能導入*
火花= SparkSession。構建器\
.appName \ (“pdm_messagesStream”)
. config(“火花。瓶”、“org.mongodb.spark: mongo-spark-connector: 10.0.5”) \
.getOrCreate ()
readSchema = (StructType () \
閥門(_id, StringType ()) \
閥門(deviceToken, StringType ()) \
閥門(‘消息’,StringType ()) \
閥門(消息id, StringType ()) \
閥門(createdAt, StringType ()) \
閥門(createdAtEpochSeconds, StringType ())
)
dataStreamWriter =(火花。readStream \
.format \ (“mongodb”)
.option (“spark.mongodb.connection。uri”、“mongodb + srv: / / xxxx@ * * * / ? retryWrites = true&readPreference = secondary&readPreferenceTags = nodeType: ANALYTICS&w \ =多數”)
.option (“spark.mongodb。數據庫”、“數據”)\
.option (“spark.mongodb。收集”、“消息”)\
.option (“forceDeleteTempCheckpointLocation”,“真正的”)\
. schema (readSchema)
.load () \
.writeStream \
.format \(“內存”)
.queryName \(“信息”)
.trigger(連續=“1秒”)
)
查詢= dataStreamWriter.start ()
結果從spark.table (“pdm_messages”),告訴(截斷= False):
任何幫助將不勝感激。
謝謝
沙龍