取消
顯示的結果
而不是尋找
你的意思是:

spark-streaming讀取特定事件中心的分區

Sandesh87
新的貢獻者三世

azure事件中心“my_event_hub”共有5個分區(“0”、“1”、“2”、“3”、“4”)

了readstream應該隻讀取事件從分區“0”和“4”

活動中心配置流媒體來源:-

val name = " my_event_hub " val connectionString = " my_event_hub_connection_string " val max_events = 50 val位置=地圖(新NameAndPartition(名字,0)- > EventPosition.fromEndOfStream,新NameAndPartition(名稱、4)- > EventPosition.fromEndOfStream) val eventHubsConf = eventHubsConf (connectionString) .setStartingPositions (start) .setMaxEventsPerTrigger (max_events)

structured-streaming-eventhubs-integation官方文檔:https://github.com/Azure/azure-event-hubs-spark/blob/master/docs/structured-streaming-eventhubs-inte..。

使用上麵的配置流媒體應用程序讀取所有5個分區的活動中心。我們可以隻讀取特定分區嗎?

例如讀取事件隻有從2分區“0”和“4”檢查點和抵消指出特定的分區。

4回複4

UmaMahesh1
尊敬的貢獻者三世

嗨@Sandesh Puligundla

對於那些特定的分區,你能給的開始和結束位置作為一個值或一個非存在的數量和是否排除了嗎?

Sandesh87
新的貢獻者三世

@Uma Maheswara Rao Desula你指的是創建一個配置地圖這樣的抵消(事件位置)開始和結束事件位置分區是一樣的嗎?

例二:開始和結束位置:-

EventPosition.fromOffset (“1”)

例二:開始和結束位置:-

EventPosition.fromSequenceNumber(100升)

理論上有這種配置地圖開始和結束位置具有相同的抵消或序列號,我們可以控製攝入多少記錄每個分區的活動中心。

vivek_rawat
新的貢獻者三世

嗨@Sandesh Puligundla,

你所寫的方法是唯一的方法來讀取特定的分區。這樣做的原因是一個不正確的行為書寫錯誤,我認為你應該使用職位變量,而不是開始。

val位置=地圖(新NameAndPartition(名字,0)- > EventPosition.fromEndOfStream,新NameAndPartition(名稱、4)- > EventPosition.fromEndOfStream) val eventHubsConf = eventHubsConf (connectionString) .setStartingPositions(職位).setMaxEventsPerTrigger (max_Events)

keshav
新的貢獻者二世

我試著使用以下代碼片段來接收消息隻有從分區id = 0

ehName = " < < EVENT-HUB-NAME > > " #創建事件位置分區0 positionKey1 = {“ehName”: ehName,“partitionId”: 0} eventPosition1 ={“抵消”:“@latest”、“seqNo”: 1、“enqueuedTime”:沒有,“isInclusive”:真正}#規則放入一張地圖。關鍵位置字典必須製成JSON字符串作為關鍵。positionMap = {json.dumps (positionKey1): eventPosition1} #將映射到主配置字典ehConf [" eventhubs事件中心。startingPositions "] = json.dumps ehConf [" eventhubs (positionMap)。connectionString "] = < eventhub_connection_string > ehConf [" eventhubs。consumerGroup "] = < eventhub_consumer_group > df =火花\ .readStream \ .format . schema (jsonSchema) (“eventhubs”) \ \ .options (* * conf) \ .load ()

但我仍然收到消息從其他分區。

誰能指導我失蹤的什麼地方?

歡迎來到磚社區:讓學習、網絡和一起慶祝

加入我們的快速增長的數據專業人員和專家的80 k +社區成員,準備發現,幫助和合作而做出有意義的聯係。

點擊在這裏注冊今天,加入!

參與令人興奮的技術討論,加入一個組與你的同事和滿足我們的成員。

Baidu
map