我試圖從卡夫卡主題使用讀取消息spark.readstream,我用下麵的代碼閱讀它。
我的代碼:
df = spark.readStream
.format(“卡夫卡”)
.option (“kafka.bootstrap。服務器”、“192.1 xx.1.1xx: 9 xx”)
.option(“訂閱”、“json_topic”)
.option (“startingOffsets”、“最早”)/ /開始
.load ()
現在我隻想得到的計數df就像我們可以得到df.count ()當我們使用方法spark.read。
我需要一些條件,如果我沒有得到任何消息從這個話題。我運行這段代碼作為一個批處理和業務需求,我不想使用spark.read。
請建議最好的方法是什麼。
提前謝謝!
你可以試試這個方法:
ReadStream正在運行一個線程在後台沒有簡單的方法像df.show ()。