本文向您展示如何使用Apache火花函數來生成惟一增加一列的數值。
我們審查三個不同的方法使用。你應該選擇最有效的方法與你的用例。
使用zipWithIndex ()在彈性分布式數據集(抽樣)
的zipWithIndex ()函數隻能在抽樣。你不能直接使用它DataFrame。
轉換您的DataFrame抽樣,適用zipWithIndex ()你的數據,然後將抽樣回DataFrame。
我們將使用以下示例代碼添加惟一的id數字基本表有兩個條目。
% python df =火花。createDataFrame([(“愛麗絲”,“10”)(“蘇珊”,“12”)],[“名稱”,“年齡”])df1 = df.rdd.zipWithIndex () .toDF () df2 = df1.select(坳(“_1。*”)(“_2”).alias上校(increasing_id)) df2.show ()
運行示例代碼,我們得到以下結果:
+ - - - + - - - + - - - - - - - - - - - - - | + |名字年齡| increasing_id | +——+ - - - + - - - - - - - - - - - - -愛麗絲+ | | 10 | 0 | |蘇珊| 12 | 1 | +——+ - - - + - - - - - - - - - - - - - +
使用monotonically_increasing_id ()獨特的,但不是連續的數字
的monotonically_increasing_id ()生成函數單調遞增64位整數。
生成的id數字是保證增加和獨特的,但是這不能保證連續。
我們將使用以下示例代碼將單調遞增數字id添加到一個基本表與兩個條目。
%從pyspark.sql python。功能導入* df_with_increasing_id = df。與Column("monotonically_increasing_id", monotonically_increasing_id()) df_with_increasing_id.show()
運行示例代碼,我們得到以下結果:
+ - - - + - - - + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - | + |名字年齡| monotonically_increasing_id | +——+ - - - + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - 10愛麗絲+ | | | 8589934592 | |蘇珊12 | 25769803776 | | +——+ - - - + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
結合monotonically_increasing_id ()與row_number ()兩列
的row_number ()函數生成連續的數字。
結合monotonically_increasing_id ()生成兩列數據,可用於識別數據條目。
我們將使用以下示例代碼添加單調遞增id號和行號與兩個條目一個基本表。
%從pyspark.sql python。函數從pyspark.sql進口*。窗口導入*窗口= Window.orderBy(坳(monotonically_increasing_id)) df_with_consecutive_increasing_id = df_with_increasing_id。withColumn (increasing_id, row_number () .over(窗口))df_with_consecutive_increasing_id.show ()
運行示例代碼,我們得到以下結果:
+ - - - + - - - + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - | + |名字年齡| monotonically_increasing_id | increasing_id | +——+ - - - + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - -愛麗絲+ | | 10 | 8589934592 | 1 | |蘇珊| 12 | 25769803776 | 2 | +——+ - - - + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - +
如果你需要增加基於上次更新最大值,您可以定義一個先前的最大值,然後從那裏開始計數。
我們要構建的示例代碼,我們就跑。
首先,我們需要定義的值previous_max_value。為此,您通常會從現有的輸出表獲取價值。對於這個示例,我們將它定義為1000。
python previous_max_value = 1000 df_with_consecutive_increasing_id %。與Column("cnsecutiv_increase", col("increasing_id") + lit(previous_max_value)).show()
當這是結合前麵的示例代碼和運行,我們得到以下結果:
+——+ - - - + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - | + |名字年齡| monotonically_increasing_id | increasing_id | cnsecutiv_increase | +——+ - - - + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -愛麗絲+ | | 1001 | | 8589934592 | 1 | |蘇珊| 1002 | 25769803776 | 2 | | +——+ - - - + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +