獲取過去24小時的記錄從Pub / Sub數,您可以使用publishTimestampInMillis在發布/訂閱模式過濾記錄根據發布時間戳。您可以使用current_timestamp ()函數在磚當前時間戳和減去24小時得到24小時前的時間戳。然後您可以使用filter ()根據他們的功能來過濾記錄publishTimestampInMillis字段。
這裏有一個例子代碼片段演示了如何獲取過去24小時的記錄使用磚從Pub / Sub數:
進口org.apache.spark.sql.functions。_ val authOptions:地圖(字符串,字符串)= (- > clientId“clientId”,“clientEmail”- > clientEmail,“privateKey”- > privateKey,“privateKeyId”- > privateKeyId) val pubsubDF =火花。readStream .format (“pubsub”) .option (“subscriptionId”、“mysub”) .option .option (“topicId”、“mytopic”) (“projectId”、“。”) .options (authOptions) .load () val last24HoursTimestamp = current_timestamp () - expr(“間隔24小時”)val last24HoursCount = pubsubDF .filter (col (publishTimestampInMillis) > = last24HoursTimestamp.cast(“長”)).count () println (s“記錄數:24小時last24HoursCount美元”)
注意,這個代碼片段假設您已經配置了Pub / Sub連接器在磚和有必要的授權選項。如果你還沒有這樣做,請參閱文檔訂閱穀歌Pub / Sub |磚在穀歌的雲為更多的信息。
謝謝你的快速回複,我在尋找直接數據指望PUBSUB磚作為我們必須驗證有多少記錄PUBSUB在磚和多少記錄我們已經收到最後的24小時。