指定初始狀態mapGroupsWithState
在結構化流
您可以指定一個用戶定義的初始狀態為結構化流狀態處理使用flatMapGroupsWithState
或mapGroupsWithState
。這可以讓你避免再處理數據時開始有狀態流沒有一個有效的檢查點。
defmapGroupsWithState(年代:編碼器,U:編碼器)(timeoutConf:GroupStateTimeout,initialState:KeyValueGroupedDataset(K,年代))(函數:(K,迭代器(V),GroupState(年代])= >U):數據集(U]defflatMapGroupsWithState(年代:編碼器,U:編碼器)(outputMode:OutputMode,timeoutConf:GroupStateTimeout,initialState:KeyValueGroupedDataset(K,年代))(函數:(K,迭代器(V),GroupState(年代])= >迭代器(U])
指定一個初始狀態的示例用例flatMapGroupsWithState
接線員:
瓦爾fruitCountFunc=(關鍵:字符串,值:迭代器(字符串),狀態:GroupState(RunningCount])= >{瓦爾數=狀態。getOption。地圖(_。數)。getOrElse(0 l)+valList。大小狀態。更新(新RunningCount(數))迭代器((關鍵,數。toString))}瓦爾fruitCountInitialDS:數據集((字符串,RunningCount)]=Seq((“蘋果”,新RunningCount(1)),(“橙色”,新RunningCount(2)),(“芒果”,新RunningCount(5)),)。托托()瓦爾fruitCountInitial=initialState。groupByKey(x= >x。_1)。mapValues(_。_2)fruitStream。groupByKey(x= >x)。flatMapGroupsWithState(更新,GroupStateTimeout。NoTimeout,fruitCountInitial)(fruitCountFunc)
指定一個初始狀態的示例用例mapGroupsWithState
接線員:
瓦爾fruitCountFunc=(關鍵:字符串,值:迭代器(字符串),狀態:GroupState(RunningCount])= >{瓦爾數=狀態。getOption。地圖(_。數)。getOrElse(0 l)+valList。大小狀態。更新(新RunningCount(數))(關鍵,數。toString)}瓦爾fruitCountInitialDS:數據集((字符串,RunningCount)]=Seq((“蘋果”,新RunningCount(1)),(“橙色”,新RunningCount(2)),(“芒果”,新RunningCount(5)),)。托托()瓦爾fruitCountInitial=initialState。groupByKey(x= >x。_1)。mapValues(_。_2)fruitStream。groupByKey(x= >x)。mapGroupsWithState(GroupStateTimeout。NoTimeout,fruitCountInitial)(fruitCountFunc)