pyspark.sql.functions.session_window

pyspark.sql.functions。 session_window ( timeColumn:ColumnOrName,gapDuration:聯盟(pyspark.sql.column.Column,str] )→pyspark.sql.column.Column

給定一個時間戳列指定生成會話窗口。會話窗口是一個動態的窗口,窗口的長度是根據給定的輸入不同。會話窗口的長度定義為“最新的時間戳輸入會話持續時間+差距”,所以當新輸入綁定到當前會話窗口中,會話窗口可以擴展的結束時間根據新的輸入。Windows可以支持微秒級精度。不支持Windows的幾個月。對於流媒體查詢,您可以使用該函數current_timestamp生成windows處理時間。gapDuration提供字符串,例如“1秒”、“1天12小時”,“2分鍾”。有效區間字符串是“周”,“天”,“小時”,“分”,“秒”,的毫秒、微秒。它也可能是一個列,可以評估差距持續時間動態地根據輸入行。輸出列將結構稱為“session_window”默認情況下嵌套列‘開始’和‘結束’,‘開始’和‘結束’的位置pyspark.sql.types.TimestampType

參數
timeColumn 或str

列名或列使用作為窗口時間的時間戳。必須TimestampType或TimestampNTZType時間列。

gapDuration 或str

Python字符串文字或列指定超時的會話。它可以是靜態值,例如。10分鍾,1秒,或者一個表達式/ UDF指定差距持續時間動態地根據輸入行。

例子

> > >df=火花createDataFrame(((“2016-03-11 09:00:07”,1)))toDF(“日期”,“val”)> > >w=dfgroupBy(session_window(“日期”,“5秒”))gg(總和(“val”)別名(“和”))> > >w選擇(wsession_window開始(“字符串”)別名(“開始”),wsession_window結束(“字符串”)別名(“結束”),“和”)收集()行(開始= 2016-03-11 09:00:07,結束= 2016-03-11 09:00:12,金額= 1))> > >w=dfgroupBy(session_window(“日期”,點燃(“5秒”)))gg(總和(“val”)別名(“和”))> > >w選擇(wsession_window開始(“字符串”)別名(“開始”),wsession_window結束(“字符串”)別名(“結束”),“和”)收集()行(開始= 2016-03-11 09:00:07,結束= 2016-03-11 09:00:12,金額= 1))